Flink状态和容错源码解析

 更新时间:2022年12月01日 11:10:48   作者:xiangel  
这篇文章主要为大家介绍了Flink状态和容错源码示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

  • 计算模型
    • DataStream基础框架
    • 事件时间和窗口
    • 状态和容错
  • 部署&调度
  • 存储体系
  • 底层支撑

Flink中提供了State(状态)这个概念来保存中间计算结果和缓存数据,按照不同的场景,Flink提供了多种不同类型的State,同时为了实现Exactly once的语义,Flink参考Chandy-Lamport算法实现了Asynchronous Barrier Snapshotting算法(简称ABS),本篇我们来了解Flink状态的底层实现及如何进行快照处理。

概述

Flink中提供了2种State,一种是Keyed State,是用在Keyed DataStream(即每条记录是有一个key)。即每个key对应有一个状态信息,用于数据处理场景中需要按key记录状态的场景,如风控场景下记录用户的状态,window操作中记录窗口的数据信息也是用keyed State来实现的。另一种是Operator State,即算子的状态,如在KafkaSource中记录消费的偏移量。本篇将从状态的管理、数据的存储和备份恢复等来介绍底层机制。

State

Keyed State

Keyed State按照存储数据类型的不同,分为如下几类

  • ValueState: 保存一个值,支持查询和更新
  • ListState: 保存一个元素列表
  • ReducingState: 保存一个单值,表示添加到状态的所有数据的聚合,有定义一个ReduceFunction来进行聚合处理
  • AggregatingState<IN, OUT>: 保留一个单值,也是添加到状态的所有数据的聚合,和ReducingState不同的是,输入数据和结果值的类型可以不同。
  • MapState: 维护的一个映射列表 创建这些状态时需要同时定义一个StateDescriptor,这里面定义一个状态的名字(唯一名称),通过StateDescriptor就可以来引用具体状态实例。

状态实例管理及数据存储

从Keyed State的定义来看,这里的关系应该是一个key到State的映射,而在使用时却没有看到这个key,另外具体的状态是如何保存的,本节我们来深入分析

从上图可以看出,各种不同的State,是通过KeyedStateStore来获取到的,KeyedStateStore只是一个代理类,其底层是调用KeyedStateBackend来负责具体的处理,具体的实现类有如下2个

  • HeapKeyedStateBackend: 基于内存的StateBackend,把数据保存在Java的Heap里面
  • RocksDBKeyedStateBackend: 状态数据保存在RocksDB中

下面我们从如下几个功能点来看具体各个StateBackend的实现

  • 各个实例的管理
  • 内部数据保存
  • 数据过期处理
  • 数据快照处理
  • 数据重分布

HeapKeyedStateBackend

基于内存的StateBackend,将数据保存在Java的Heap中,适合小数据状态场景

各个实例的管理

private final Map<String, StateTable<K, ?, ?>> registeredKVStates;

使用一个Map来保存各个不同的KeyedState,key为定义的名字,StateTable中存储的具体的数据。StateTable的内部在下面内部数据保存中介绍

  • 内部数据保存 内部数据存储是通过StateTable来管理的,里面定义了一个StateMap的数组来存储具体的数据,具体的数据通过计算key的KeyGroupIndex来确定把数据存放到哪个StateMap。
    • StateMap:存储数据,具体的实现有CopyOnWriteStateMap和CopyOnWriteSkipListStateMap2个。
    • KeyGroup:将key分组管理,这样方便在后续通过savepoint来恢复任务时如果调整了并行度,这样方便对key按KeyGroup重新分布。keyGroup的计算为将key做hash处理后按最大并行度(maxParallelism)取余
  • 数据过期处理 如果有设置了数据过期处理,这种生成的State会每个value都带上一个时间戳数据,如MapState是每个value都带一个时间戳,具体的实例类型也会不同,对应为如下几个
    • TtlValueState
    • TtlListState
    • TtlReducingState
    • TtlAggregatingState
    • TtlMapState 具体数据的清理处理通过TtlIncrementalCleanup类来实现
  • 数据快照处理 HeapKeyedStateBackend的snapshot处理由类HeapSnapshotStrategy来负责,调用的方法为asyncSnapshot(),只支持全量的快照处理。
  • 数据重分布 在实际的数据处理中由于并行度设置的不合理,在日常运维过程中会涉及到并行度的调整,算子的并行度调整后,那对key的分布也会进行调整,这样就会导致keyedState的数据进行重分布。Flink中引入了一个KeyGroup的概念,其对key做了个分组管理,KeyGroup的个数为最大并行度,具体实现为将key进行hash后然分类(hash值对KeyGroup数取余)到其中一个KeyGroup中。

RocksDBKeyedStateBackend

使用RocksDB来存储状态,这个State backend可以存储非常大的状态,如果超过了内存可以split到磁盘上。同样我们分如下几个阶段来了解相关具体的实现

  • 各个实例的管理 通过如下的Map来存储定义的各个状态信息,和前面HeapKeyedStateBackend类似,key为自己定义的名称
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation;
  • 内部数据保存 从上面可以看到每个key对应的value是RocksDbKvStateInfo,这个其中有如下2个属性 ColumnFamilyHandle:rocksdb库中的类,提供了一个handle对应到rocksdb的ColumnFamily。
public static class RocksDbKvStateInfo implements AutoCloseable {
        public final ColumnFamilyHandle columnFamilyHandle;
        public final RegisteredStateMetaInfoBase metaInfo;
}
  • 数据过期处理 通过RocksDB的CompactionFilter[1]功能来实现数据过期的处理,RocksDB项目中专门有个FlinkCompactionFilter的类用于Flink项目 CompactionFilter提供了一种在rocksdb进行compaction时候,根据自定义逻辑去删除/修改key/value对的方法。如根据业务的ttl属性删除过期keys。
  • 数据快照处理 RocksDBKeyedStateBackend支持2种快照处理方式,全量和增量,由于RocksDB底层是使用lsm树来进行存储的,所以比较方便实现增量数据的获取
  • 数据恢复处理 和前面HeapKeyedStateBackend的类似,这里就不再展开了。

OperatorState

接下面我们深入了解下OperatorState,在实际使用中如我们消费了kafka的数据需要记录kafka消费的offset,还有一些场景需要将一些信息分发到所有的任务。这里需要使用到一类新的状态处理,这种状态是与每个算子绑定的,Flink提供了如下3个类来支持

  • ListState:通过一个list来保存相关的状态信息,如果并行度调整了,其中的数据按新的并行度重新进行分布处理;
  • UnionListState: 保存数据时和ListState类似,但是在出错和从savepoint恢复数据时的策略会不一样,其会将所有的状态通过广播的方式发给下游的每个任务,然后下游的任务来选择自己需要的部分;
  • BroadcastState: 用于每个任务上的状态都要一样的场景,这个在数据恢复时是把状态复制到所有的任务。

类似KeyedState,这里也是通过一个Backend来管理对应的状态数据,其接口定义为:OperatorStateStore。其实现类目前只有一个:DefaultOperatorStateBackend,我们也通过以下几个部分来分别了解DefaultOperatorStateBackend的实现

  • 各个实例的管理
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

这里通过2个Map来分别管理ListState(含UnionListState,后面都统一使用ListState来代指)和BroadcastState。 2. 内部数据保存 ListState的实现类为PartitionableListState,底层通过一个ArrayList来保存数据。BroadcastState定义的接口是kv的,所以其实现类HeapBroadcastState使用Map来存储相应的数据。 3. 数据过期处理 operatorState不涉及到数据过期的处理 4. 数据快照处理 DefaultOperatorStateBackendSnapshotStrategy类来负责具体的快照处理,调用的方法为asyncSnapshot,分别对ListState和BroadcastState进行快照处理 5. 数据重分布 数据重分布的策略前面介绍各个State时已经介绍了,这里就不再重复介绍了。

上层封装

Flink中对状态的backend和checkpoint存储策略进行了封装定义。 StateBackend:定义一个Streaming应用的state如何在集群中的本地存储。不同的实现使用不同的数据结构来存储对应的状态。其具体实现有如下2个:

  • HashMapStateBackend:将状态保存在TaskManager的内存中(JVM heap),其底层KeyedStateBackend使用的是HeapKeyedStateBackend,OperatorStateBackend使用的是DefaultOperatorStateBackend
  • EmbeddedRocksDBStateBackend:状态保存在一个嵌入的Rockdb实例中,其底层实现KeyedStateBackend使用的是RocksDBKeyedStateBackend,OperatorStateBackend使用的和HashMapStateBackend一致,也是DefaultOperatorStateBackend 说明:原来的RocksDBStateBackend、FsStateBackend和MemoryStateBackend不建议使用了。 另外对checkpoint的存储,CheckpointStorage接口定义了在Streaming应用中StateBackend在容错性方面如何存储其状态数据。其实现有如下2个
  • JobManagerCheckpointStorage:将checkpoints状态数据存储到JobManager的内存中,savepoints保存到文件系统
  • FileSystemCheckpointStorage:将checkpoints状态存储到文件系统,如保存到HDFS上。

总结

本篇我们了解了Flink的相关的状态的内容和checkpoint的保存策略。

  • State分为KeyedState和OperatorState,KeyedState主要存储针对记录级别的状态,如window操作时的状态。OperatorState主要存储针对算子的状态,如消费kafka的offset信息等;每类状态分别提供了不同种类的状态类来支持不同场景的状态保存需求
  • Flink底层通过StateBackend来保存对应的状态数据,主要有通过内存保存和使用RocksDB保存这2类,另外在其具体实现中为了方便并行度调整后对状态的重新拆分处理,引入了KeyGroup的概念
  • 在用户使用层,对上述2种状态进行了封装,接口为:StateBackend,来管理KeyedState和OperatorState,另外将checkpoint的存储策略从原来的StateBackend中拆分出来。目前支持有基于内存和外部存储的2类checkpoint存储策略。 最后由于checkpoint的触发以及任务恢复的处理与整体计算处理比较紧密,这块等介绍完Flink部署模式后再来详细梳理checkpoint的处理过程。

参考文档

以上就是Flink状态和容错源码解析的详细内容,更多关于Flink状态容错的资料请关注脚本之家其它相关文章!

相关文章

  • 解决 IDEA 创建 Gradle 项目没有src目录问题

    解决 IDEA 创建 Gradle 项目没有src目录问题

    这篇文章主要介绍了解决 IDEA 创建 Gradle 项目没有src目录问题,本文图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-06-06
  • spring-cloud入门之spring-cloud-config(配置中心)

    spring-cloud入门之spring-cloud-config(配置中心)

    这篇文章主要介绍了spring-cloud入门之spring-cloud-config(配置中心),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • Java使用Queryable-pageable实现分页效果

    Java使用Queryable-pageable实现分页效果

    这篇文章主要为大家介绍了Java如何使用Queryable-pageable从而实现分页效果,文中的示例代码简洁易懂,感兴趣的小伙伴可以动手尝试一下
    2022-06-06
  • 如何区分JAVA中的equals与==

    如何区分JAVA中的equals与==

    这篇文章主要介绍了如何区分JAVA中的equals与==,文章简单易懂,实例代码帮助大家更好的参考学习,感兴趣的朋友可以了解下
    2020-06-06
  • IDEA使用Tomcat运行web项目教程分享

    IDEA使用Tomcat运行web项目教程分享

    在非Spring Boot项目中运行Nacos示例,需要手动配置Tomcat容器,本文介绍了如何在IDEA中配置Tomcat,并详细解决了配置过程中可能遇到的异常情况,步骤包括修改IDEA项目结构、添加Web模块、配置Artifacts和Tomcat Server
    2024-10-10
  • JavaScript中new运算符的实现过程解析

    JavaScript中new运算符的实现过程解析

    这篇文章主要介绍了JavaScript中new运算符的实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • SpringBoot集成Swagger2生成接口文档的方法示例

    SpringBoot集成Swagger2生成接口文档的方法示例

    我们提供Restful接口的时候,API文档是尤为的重要,它承载着对接口的定义,描述等,本文主要介绍了SpringBoot集成Swagger2生成接口文档的方法示例,需要的朋友们下面随着小编来一起学习学习吧
    2018-12-12
  • Java基础-封装和继承

    Java基础-封装和继承

    这篇文章主要介绍了Java面向对象编程(封装/继承/多态)实例解析的相关内容,具有一定参考价值,需要的朋友可以了解下希望可以帮助到你
    2021-07-07
  • SpringMVC xml文件路径在web.xml中的配置方式

    SpringMVC xml文件路径在web.xml中的配置方式

    这篇文章主要介绍了SpringMVC xml文件路径在web.xml中的配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • java token生成和校验的实例代码

    java token生成和校验的实例代码

    这篇文章主要介绍了java token生成和校验的实例代码,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09

最新评论