如何应对快速增长的状态?Flink 状态 TTL 概述

状态),并且提供了多个设置参数,可以灵活地设定时间戳更新的时机、过期状态的可见性等,以应对不同的需求场景。如果状态过期,还会根据可见性参数,来决定是否返回已过期但还未清理的状态等等。的算法来实现,从而减少状态清理对性能的影响。和增量清理功能的状态对象。来创建的作业,用户无法显式地管理状态),导致系统越来越不稳定。作业,两种机制可以结合使用,以应对逐渐增加的状态带来的挑战。

在流计算作业中,经常会遇到状态数不断累积,导致状态数不断增加的情况。例如,作业定义了一个极长的时间窗口1password过期以后功能限制,或者对动态表应用无限范围的 GROUP BY 语句,并执行诸如无时间窗口限制的双流 JOIN 等操作。对于这些情况,老版本的 Flink 不能很好的应对,经常会造成堆内存 OOM,或者堆外内存()的量不断增加,超过了容器的配额限制,导致作业频繁崩溃以及业务的非正常运作。

从 Flink 1.6 版本开始,社区引入了 State TTL 特性,它允许随着时间的推移自动清理作业中定义的 Keyed 状态(通常,Flink 中的大多数状态都是 Keyed 状态,只有少数状态会无处不在,所以本文中的“状态”指的是Keyed状态),并提供了多个设置参数,可以灵活设置时间戳更新的时机、过期状态的可见性等来处理不同的需求场景。

本质上,State TTL 函数为每个 Flink 的 Keyed 状态添加了一个“时间戳”,Flink 在状态创建、写入或读取(可选)时更新这个时间戳,并确定状态是否已过期。如果状态过期,它还会根据可见性参数决定是否返回过期但未清理的状态,依此类推。状态清理不是即时的,而是使用惰性算法实现的,从而减少了状态清理对性能的影响。

最新的 Flink 1.8 版本进一步完善了 State TTL 功能,增加了几个新特性。本文将介绍这些特性以及 Flink 内部对 State TTL 的实现。

状态 TTL 函数的使用

要熟悉一个特性,最直观的方法是了解它的用法。中,用法示例如下:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

如您所见1password过期以后功能限制,要使用 State TTL 功能,您必须首先定义一个对象。该对象可以通过构造函数模式 ( ) 创建。典型用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Type)和状态可见性(State)。下面将介绍这两个函数的含义。文章中有详细描述。在构造对象时,可以在后续声明的状态描述符(State)中启用State TTL功能。

从上面的代码也可以看出,State TTL函数指定的过期时间并不是全局生效的,而是绑定到特定的状态。换句话说,如果要对所有状态生效,则需要为所有使用的状态定义传入对象。对 Flink 源码感兴趣的同学可以尝试为 Flink 添加一个默认选项,实现起来非常简单,这里不再赘述。

更多State TTL使用案例请参考官方flink--state-ttl-test包,提供了很多测试用例供参考。

参数说明State TTL的实现原理

为了理解 Flink 代码中 State TTL 函数的实现,也可以按照类中的模式进行。可以看出,State TTL的实现代码主要集中在flink-模块的org..flink..state.ttl包中,flink--的org..flink...state.ttl包中模块。

首先,我们来看看 flink- 是如何定义和实现 TTL 功能的。有几个类需要特别注意:

种类

该类是一个包装类,可以给任意值对象添加时间戳,并且可以获取传入的对象和时间戳。但需要注意的是,一旦初始化,所有参数都不能更改。它是State TTL状态保存的基本单元,可以通过实用程序类提供的(value, )方法将一个普通的值对象包装成一个对象。

淘宝多久以后评价过期_天正建筑80过期限制去除补丁_1password过期以后功能限制

public class TtlValue implements Serializable {
	private final T userValue;
	private final long lastAccessTimestamp;
	public TtlValue(T userValue, long lastAccessTimestamp) {
		this.userValue = userValue;
		this.lastAccessTimestamp = lastAccessTimestamp;
	}
	public T getUserValue() {
		return userValue;
	}
	public long getLastAccessTimestamp() {
		return lastAccessTimestamp;
	}
}

和子类

这是一个抽象包装类。将 Flink 的原始状态(State)和用户设置的对象传入该类的构造函数后,该类的几个布尔常量会根据前面介绍的多个参数进行赋值,例如,表示是否在读取记录时更新,表示是否允许返回过期状态等,从而返回一个支持TTL的状态对象。

它有几个子类,例如它为/List/Map///Value等六种常见状态类型提供了具体的实现。这个抽象类还提供了几个工具方法,比如判断状态值是否过期,将普通值包装成带时间戳的状态值等。它还提供了检查TTL是否过期和过期后增量清理的逻辑。

类的子类

比如在下面的方法中,会先调用传入的对象来获取对象,该对象是带有时间戳的正常状态,然后判断是否为null,是否过期。如果过期,就会调用传入的对象做,这是一个对象,和Java自己的差不多,只是只允许抛出给定的异常。前面说过,这个方法会根据之前传入的参数来决定,读取时是否更新时间戳,过期后是否返回过期状态等。对象负责处理更新时间戳等操作。

 TtlValue getWrappedWithTtlCheckAndUpdate(
    SupplierWithException, SE> getter,
    ThrowingConsumer, CE> updater,
    ThrowingRunnable stateClear)    // 增量清理状态的 Runnable 类(经过 Flink 封装,允许在运行时抛出异常
    throws SE, CE, CLE {
    TtlValue ttlValue = getter.get();
    if (ttlValue == null) {
        return null;
    } else if (expired(ttlValue)) {
        stateClear.run();
        if (!returnExpired) {
            return null;
        }
    } else if (updateTsOnRead) {
        updater.accept(rewrapWithNewTs(ttlValue));
    }
    return ttlValue;
}

我们再举个例子,看看上面方法的调用:

private TtlValue getWrapped(UK key) throws Exception {
   accessCallback.run();
   return getWrappedWithTtlCheckAndUpdate(
      () -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}

可以看出 , , 和对象的定义非常简单,用表达式来清楚地描述它们各自的动作。表示原始的 Flink 状态(State)。可以看到,具体的操作还是在原来的状态对象上进行的。这个类只是一个装饰器,在原始状态对象上增加了时间戳、过期判断等逻辑。

然后看put方法:

@Override
public void put(UK key, UV value) throws Exception {
    accessCallback.run();
    original.put(key, wrapWithTs(value));
}

这是一个回调对象,用于实现过期状态的增量清理逻辑,在调用每个 put 或 get 方法之前都会执行一次。限于篇幅,此处不再赘述。有兴趣的同学可以参考类的()方法。具体的实现逻辑也在本文下方的示例代码中给出。

下面两张图是常见的继承关系图。可以明显看出支持TTL的State对象和普通的State对象并没有太大的区别,只是增加了一个辅助方法来扩展TTL的特性。. 另一个区别是它只是一个包装类,需要传入其他State对象来完成它的功能。

1password过期以后功能限制_天正建筑80过期限制去除补丁_淘宝多久以后评价过期

类继承图

常见继承图

种类

这个类主要用于初始化上面提到的类。它包含了实例化 TTL 状态类所需的所有参数,例如被包装的普通状态对象 () 及其所需的,对象(上面提到),对象( 用于获取当前时间戳。默认情况下,它只是.()的封装,用户也可以自定义,具体清理上面已经讲过了)。

对象可以由一个类动态创建,这是一个工厂模式类:

private IS createValueState() throws Exception {
    ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>(    // 创建一个 TtlValue 类型的 State Descriptor, 可以看到它是一个复合类型
        stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
    return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}
... ... ...
private  TtlStateContext
    createTtlStateContext(StateDescriptor ttlDescriptor) throws Exception {
    ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig());     // 从给定的 State Descriptor 获取 TTL 时间
    OIS originalState = (OIS) stateBackend.createInternalState(   // 创建原始状态
        namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
    return new TtlStateContext<>(    // 将原始状态包装到 TtlStateContext 类型中,用于随后生成具体的 TTL State 对象,例如 TtlValueState
        originalState, ttlConfig, timeProvider, (TypeSerializer) stateDesc.getSerializer(),
        registerTtlIncrementalCleanupCallback((InternalKvState) originalState));
}
private Runnable registerTtlIncrementalCleanupCallback(InternalKvState originalState) {
    StateTtlConfig.IncrementalCleanupStrategy config =
        ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
    boolean cleanupConfigured = config != null && incrementalCleanup != null;
    boolean isCleanupActive = cleanupConfigured &&
        isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
    Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };    // stateAccessed  执行具体的清理逻辑
    if (isCleanupActive && config.runCleanupForEveryRecord()) {
        stateBackend.registerKeySelectionListener(stub -> callback.run());
    }
    return callback;
}

可以看出,初始化后,它可以将Flink的任何普通状态对象(只要实现了接口,比如等)转换为支持TTL和增量清理功能的状态对象。这样,在未来的 Flink 状态调用过程中,只要调用状态的 get/put/ 等通用方法,就会自动判断失败状态,清理等,而 Flink 不需要知道背后的实现逻辑,只要把这些状态对象都可以正常使用。这种封装方式也体现了 Flink 的可扩展性,避免了实现细节对上层调用逻辑的干扰。

接下来,我们简单看看 Flink 如何在 . Flink 提供了一个实现类的类,这样在后台进行操作时,就可以过滤掉那些无效的 Keys 和 。这里的名称 r 是用 C++ 编写的原生代码,而 Flink 在这里使用的是 JNI。打电话的方式。具体实现细节,代码不在 Flink 的 Git 仓库中,而是放在为 Flink 定制的包库中。详细实现请参考Git提交记录1和2。

总结

Flink 的 State TTL 特性的引入解决了长期困扰用户的问题:随着状态数量的增加,旧状态无法及时清除(尤其是通过 Flink Table/SQL API 创建的作业,用户无法显示it. 状态管理),导致系统越来越不稳定。目前 State TTL 仅对 Time 时间模式有效,但通过与开发者的沟通,Flink 近期也会支持 Event Time 的 State TTL 特性。

State TTL特性是基于状态后端的底层状态实现的,不同于Table模块基于Timer机制实现的机制,非常有限,当Timer数量过多时,会也会对内存本身造成巨大压力,但好处是可以实时清理状态,而不用像State TTL那样每次增量清理一小部分或在后台异步清理。所以对于 Table/SQL 作业,可以结合使用这两种机制来应对增加状态的挑战。

参考文章

Flink 1.8 :带状态

Flink 的状态 TTL:如何限制状态

[FLINK-3089] 状态 API 数据(状态 TTL)

推荐内容

了解更多

发布时间: