在流计算作业中,经常会遇到状态数不断累积,导致状态数不断增加的情况。例如,作业定义了一个极长的时间窗口1password过期以后功能限制,或者对动态表应用无限范围的 GROUP BY 语句,并执行诸如无时间窗口限制的双流 JOIN 等操作。对于这些情况,老版本的 Flink 不能很好的应对,经常会造成堆内存 OOM,或者堆外内存()的量不断增加,超过了容器的配额限制,导致作业频繁崩溃以及业务的非正常运作。
从 Flink 1.6 版本开始,社区引入了 State TTL 特性,它允许随着时间的推移自动清理作业中定义的 Keyed 状态(通常,Flink 中的大多数状态都是 Keyed 状态,只有少数状态会无处不在,所以本文中的“状态”指的是Keyed状态),并提供了多个设置参数,可以灵活设置时间戳更新的时机、过期状态的可见性等来处理不同的需求场景。
本质上,State TTL 函数为每个 Flink 的 Keyed 状态添加了一个“时间戳”,Flink 在状态创建、写入或读取(可选)时更新这个时间戳,并确定状态是否已过期。如果状态过期,它还会根据可见性参数决定是否返回过期但未清理的状态,依此类推。状态清理不是即时的,而是使用惰性算法实现的,从而减少了状态清理对性能的影响。
最新的 Flink 1.8 版本进一步完善了 State TTL 功能,增加了几个新特性。本文将介绍这些特性以及 Flink 内部对 State TTL 的实现。
状态 TTL 函数的使用
要熟悉一个特性,最直观的方法是了解它的用法。中,用法示例如下:
import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptorstateDescriptor = new ValueStateDescriptor<>("text state", String.class); stateDescriptor.enableTimeToLive(ttlConfig);
如您所见1password过期以后功能限制,要使用 State TTL 功能,您必须首先定义一个对象。该对象可以通过构造函数模式 ( ) 创建。典型用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Type)和状态可见性(State)。下面将介绍这两个函数的含义。文章中有详细描述。在构造对象时,可以在后续声明的状态描述符(State)中启用State TTL功能。
从上面的代码也可以看出,State TTL函数指定的过期时间并不是全局生效的,而是绑定到特定的状态。换句话说,如果要对所有状态生效,则需要为所有使用的状态定义传入对象。对 Flink 源码感兴趣的同学可以尝试为 Flink 添加一个默认选项,实现起来非常简单,这里不再赘述。
更多State TTL使用案例请参考官方flink--state-ttl-test包,提供了很多测试用例供参考。
参数说明State TTL的实现原理
为了理解 Flink 代码中 State TTL 函数的实现,也可以按照类中的模式进行。可以看出,State TTL的实现代码主要集中在flink-模块的org..flink..state.ttl包中,flink--的org..flink...state.ttl包中模块。
首先,我们来看看 flink- 是如何定义和实现 TTL 功能的。有几个类需要特别注意:
种类
该类是一个包装类,可以给任意值对象添加时间戳,并且可以获取传入的对象和时间戳。但需要注意的是,一旦初始化,所有参数都不能更改。它是State TTL状态保存的基本单元,可以通过实用程序类提供的(value, )方法将一个普通的值对象包装成一个对象。
public class TtlValueimplements Serializable { private final T userValue; private final long lastAccessTimestamp; public TtlValue(T userValue, long lastAccessTimestamp) { this.userValue = userValue; this.lastAccessTimestamp = lastAccessTimestamp; } public T getUserValue() { return userValue; } public long getLastAccessTimestamp() { return lastAccessTimestamp; } }
和子类
这是一个抽象包装类。将 Flink 的原始状态(State)和用户设置的对象传入该类的构造函数后,该类的几个布尔常量会根据前面介绍的多个参数进行赋值,例如,表示是否在读取记录时更新,表示是否允许返回过期状态等,从而返回一个支持TTL的状态对象。
它有几个子类,例如它为/List/Map///Value等六种常见状态类型提供了具体的实现。这个抽象类还提供了几个工具方法,比如判断状态值是否过期,将普通值包装成带时间戳的状态值等。它还提供了检查TTL是否过期和过期后增量清理的逻辑。
类的子类
比如在下面的方法中,会先调用传入的对象来获取对象,该对象是带有时间戳的正常状态,然后判断是否为null,是否过期。如果过期,就会调用传入的对象做,这是一个对象,和Java自己的差不多,只是只允许抛出给定的异常。前面说过,这个方法会根据之前传入的参数来决定,读取时是否更新时间戳,过期后是否返回过期状态等。对象负责处理更新时间戳等操作。
TtlValue getWrappedWithTtlCheckAndUpdate( SupplierWithException , SE> getter, ThrowingConsumer , CE> updater, ThrowingRunnable stateClear) // 增量清理状态的 Runnable 类(经过 Flink 封装,允许在运行时抛出异常 throws SE, CE, CLE { TtlValue ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; }
我们再举个例子,看看上面方法的调用:
private TtlValuegetWrapped(UK key) throws Exception { accessCallback.run(); return getWrappedWithTtlCheckAndUpdate( () -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); }
可以看出 , , 和对象的定义非常简单,用表达式来清楚地描述它们各自的动作。表示原始的 Flink 状态(State)。可以看到,具体的操作还是在原来的状态对象上进行的。这个类只是一个装饰器,在原始状态对象上增加了时间戳、过期判断等逻辑。
然后看put方法:
@Override public void put(UK key, UV value) throws Exception { accessCallback.run(); original.put(key, wrapWithTs(value)); }
这是一个回调对象,用于实现过期状态的增量清理逻辑,在调用每个 put 或 get 方法之前都会执行一次。限于篇幅,此处不再赘述。有兴趣的同学可以参考类的()方法。具体的实现逻辑也在本文下方的示例代码中给出。
下面两张图是常见的继承关系图。可以明显看出支持TTL的State对象和普通的State对象并没有太大的区别,只是增加了一个辅助方法来扩展TTL的特性。. 另一个区别是它只是一个包装类,需要传入其他State对象来完成它的功能。
类继承图
常见继承图
种类
这个类主要用于初始化上面提到的类。它包含了实例化 TTL 状态类所需的所有参数,例如被包装的普通状态对象 () 及其所需的,对象(上面提到),对象( 用于获取当前时间戳。默认情况下,它只是.()的封装,用户也可以自定义,具体清理上面已经讲过了)。
对象可以由一个类动态创建,这是一个工厂模式类:
private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( // 创建一个 TtlValue 类型的 State Descriptor, 可以看到它是一个复合类型 stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor)); } ... ... ... private TtlStateContext createTtlStateContext(StateDescriptor ttlDescriptor) throws Exception { ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // 从给定的 State Descriptor 获取 TTL 时间 OIS originalState = (OIS) stateBackend.createInternalState( // 创建原始状态 namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()); return new TtlStateContext<>( // 将原始状态包装到 TtlStateContext 类型中,用于随后生成具体的 TTL State 对象,例如 TtlValueState originalState, ttlConfig, timeProvider, (TypeSerializer ) stateDesc.getSerializer(), registerTtlIncrementalCleanupCallback((InternalKvState, ?, ?>) originalState)); } private Runnable registerTtlIncrementalCleanupCallback(InternalKvState, ?, ?> originalState) { StateTtlConfig.IncrementalCleanupStrategy config = ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy(); boolean cleanupConfigured = config != null && incrementalCleanup != null; boolean isCleanupActive = cleanupConfigured && isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize()); Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { }; // stateAccessed 执行具体的清理逻辑 if (isCleanupActive && config.runCleanupForEveryRecord()) { stateBackend.registerKeySelectionListener(stub -> callback.run()); } return callback; }
可以看出,初始化后,它可以将Flink的任何普通状态对象(只要实现了接口,比如等)转换为支持TTL和增量清理功能的状态对象。这样,在未来的 Flink 状态调用过程中,只要调用状态的 get/put/ 等通用方法,就会自动判断失败状态,清理等,而 Flink 不需要知道背后的实现逻辑,只要把这些状态对象都可以正常使用。这种封装方式也体现了 Flink 的可扩展性,避免了实现细节对上层调用逻辑的干扰。
接下来,我们简单看看 Flink 如何在 . Flink 提供了一个实现类的类,这样在后台进行操作时,就可以过滤掉那些无效的 Keys 和 。这里的名称 r 是用 C++ 编写的原生代码,而 Flink 在这里使用的是 JNI。打电话的方式。具体实现细节,代码不在 Flink 的 Git 仓库中,而是放在为 Flink 定制的包库中。详细实现请参考Git提交记录1和2。
总结
Flink 的 State TTL 特性的引入解决了长期困扰用户的问题:随着状态数量的增加,旧状态无法及时清除(尤其是通过 Flink Table/SQL API 创建的作业,用户无法显示it. 状态管理),导致系统越来越不稳定。目前 State TTL 仅对 Time 时间模式有效,但通过与开发者的沟通,Flink 近期也会支持 Event Time 的 State TTL 特性。
State TTL特性是基于状态后端的底层状态实现的,不同于Table模块基于Timer机制实现的机制,非常有限,当Timer数量过多时,会也会对内存本身造成巨大压力,但好处是可以实时清理状态,而不用像State TTL那样每次增量清理一小部分或在后台异步清理。所以对于 Table/SQL 作业,可以结合使用这两种机制来应对增加状态的挑战。
参考文章
Flink 1.8 :带状态
Flink 的状态 TTL:如何限制状态
[FLINK-3089] 状态 API 数据(状态 TTL)
推荐内容
了解更多