推广

Flink检查点机制与状态管理

iseeyu2年前 (2024-02-21)推广132

检查点机制

1.2 开启检查点

默认情况下,检查点机制是关闭的,需要在程序中进行开启:

// 开启检查点机制,并指定状态检查点之间的时间间隔
env.enableCheckpointing(1000); 

// 其他可选配置如下:
// 设置语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置两个检查点之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置执行Checkpoint操作时的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置最大并发执行的检查点的数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 将检查点持久化到外部存储
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存点时,是否将作业回退到该检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

1.3 保存点机制

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许你通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。

1.4 RichFunction 检查点实战

public class OperatorWarning implements CheckpointedFunction {
    // 非正常数据
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
   
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意这里获取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
                getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
        // 如果发生重启,则需要从快照中将状态进行恢复
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在进行快照时,将数据存储到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }
}

2 状态管理

2.1 算子状态

算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访到。官方文档上对 Operator State 的解释是:each operator state is bound to one parallel operator instance,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:

算子状态

2.2 键控状态

键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(…) 来得到 KeyedStream 。

键控状态

2.3 监控状态编程

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

  • ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
  • ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。
  • ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
  • AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
  • FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
  • MapState:维护 Map 类型的状态。
 @Override
    public void open(Configuration parameters) {
        // 通过状态名称(句柄)获取状态实例,如果不存在则会自动创建
//        abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<>("abnormalData", Long.class));

        StateTtlConfig ttlConfig = StateTtlConfig
                // 设置有效期为 10 秒
                .newBuilder(Time.seconds(10))
                // 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                /*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
                 代表即使值过期了,但如果还没有被物理删除,就是可见的*/
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
        descriptor.enableTimeToLive(ttlConfig);
        abnormalData = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        // 如果输入值超过阈值,则记录该次不正常的数据信息
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }

        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的数据出现达到一定次数,则输出报警信息
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超过指定阈值 ", list));
            // 报警信息输出后,清空状态
            abnormalData.clear();
        }
    }

2.4 算子状态编程

相比于键控状态,算子状态目前支持的存储类型只有以下三种:

  • ListState:存储列表类型的状态。
  • UnionListState:存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
  • BroadcastState:用于广播的算子状态。
 @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意这里获取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
                getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
        // 如果发生重启,则需要从快照中将状态进行恢复
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

备注:一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态

3 状态后端

3.1 状态管理实现方式

状态管理的实现方式

  • MemoryStateBackend
    默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

  • FsStateBackend
    基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

  • RocksDBStateBackend
    RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

3.2 配置方式

  • 基于代码方式进行配置,只对当前作业生效:
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

// 配置 RocksDBStateBackend 时,需要额外导入下面的依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.12</version>
</dependency>
  • 基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://www.0291.com.cn/post/56964.html

相关文章

一个缜密的广告战略才会使你锋芒毕露。

一个缜密的广告战略才会使你锋芒毕露。

战略,请铭记,战略才是最要害的,在产品严峻同质化的年代,且市场竞争白热化的状态下,一个缜密的广告战略才会使你锋芒毕露。缜密的广告战略要重视6大问题,无人知道该向哪处去? 不管是人生仍是一个项目策划,刚下手必定都是惘然的,纷然杂陈却不知从什么地方着手。不怕,条条路途通罗马,每一条路都能...

全新一代房地产销售管理系统:夯实经营管理(营销管理)

全新一代房地产销售管理系统:夯实经营管理(营销管理)

无论是从宏观经济还是从地产行业的发展趋势来看,传统高杠杆、快周转的商业模式已经无法支持企业持续发展,对于当前所面临的挑战与机遇,对于企业而言,需要不停迭代和更新认知,才能适应新的发展方式、创造新的商业模式。 信客云一直以用户需求为核心,利用数字化工具强化内功、提升经营管理能力,在客研方面...

551.上行7|如何做好赚钱的事?这里有些方法论

551.上行7|如何做好赚钱的事?这里有些方法论

#头条创作挑战赛#很多人都说,钱不是万能的,充斥着酸葡萄心理,但是当今这个社会,没有钱是万万不能。但对于高人而言,他们想的却是拥有一只又一只无需喂食的母鸡,而不是鸡蛋,境界高下立判。从社会财富正太分布的形态来看,“赚到钱”的人永远是少数人,类似于金字塔的塔尖。问题是,我们如...

拼多多新人砍价网站在线刷,让你购物更省钱

拼多多新人砍价网站在线刷,让你购物更省钱

对于很多拼多多的新手来说,如何砍价才能获得更多的优惠,是一个让人头疼的问题。那么,有没有什么好的方法可以让我们在拼多多购物时,更省钱呢?答案自然是有的。今天,我就要为大家揭秘一个拼多多新人砍价网站在线刷的技巧,让你轻松购物更省钱。 一、为什么要使用砍价网站 拼多多作为一个社交电商...

淘宝开店卖零食需要什么证,淘宝小零食店铺(淘宝c店能卖零食吗)

淘宝开店卖零食需要什么证,淘宝小零食店铺(淘宝c店能卖零食吗)

商户出具的质量保证书。如果是进口食品还须商品出入境检验检疫合格证明、《卫生证书》复印件以及报关单。如果是酒类的销售还需要准备《酒类零售许可证》或者是《酒类流通备案登记表》的复印件。...

营商保商品合规工具是否免费

营商保商品合规工具是否免费

通过一站式风险治理平台“营商保”,所有参加天猫双11大促的商家均可享受免费合规检测。...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片