做网站组织架构,在线设计平台软件,创新的网站建设公司,曲靖网站设计⭐简单说两句⭐ ✨ 正在努力的小叮当~ #x1f496; 超级爱分享#xff0c;分享各种有趣干货#xff01; #x1f469;#x1f4bb; 提供#xff1a;模拟面试 | 简历诊断 | 独家简历模板 #x1f308; 感谢关注#xff0c;关注了你就是我的超级粉丝啦#xff01; 超级爱分享分享各种有趣干货 提供模拟面试 | 简历诊断 | 独家简历模板 感谢关注关注了你就是我的超级粉丝啦 以下内容仅对你可见~ 作者小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主 CSDN个人主页小叮当撩代码 GZH哆啦A梦撩代码 欢迎关注点赞收藏⭐️留言 Flink状态 Flink中的State State概念
在 Flink 中状态是流处理程序中非常重要的一部分它允许你保存和访问数据以实现复杂的计算逻辑。
可以简单理解为 历史计算结果
Flink中的算子任务的State分类通常分为两类
1️⃣ 有状态
有状态需要考虑历史的数据相同的输入可能会得到不同的输出
比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)
2️⃣ 无状态
无状态简单说就是不需要考虑历史的数据相同的输入得到相同的结果
比如map、filter、flatmap算子都属于无状态不需要依赖其他数据
✅ Flink默认已经支持了无状态和有状态计算!
状态分类
Flink中有两种基本类型的状态托管状态Managed State和原生状态Raw State
Managed State是由Flink管理的Flink帮忙存储、恢复和优化
Raw State是开发者自己管理的需要自己序列化
❇️通常情况下我们采用托管状态来实现我们的需求
托管状态
Flink 中一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的 slot 在计算资源上是物理隔离的所以Flink 能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。
很多有状态的操作比如聚合、窗口都是要先做 keyBy 进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前 key 有效所以状态也应该按照 key 彼此隔离。在这种情况下状态的访问方式又会有所不同。
所以我们又可以将托管状态分为两类算子状态和按键分区状态。
键控状态Keyed State
详细内容可以瞅瞅官网https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/state/
Flink 为每个键值维护一个状态实例并将具有相同键的所有数据都分区到同一个算子任务中这个任务会维护和处理这个key对应的状态。当任务处理一条数据时它会自动将状态的访问范围限定为当前数据的key。因此具有相同key的所有数据都会访问相同的状态。
需要注意的是键控状态只能在 KeyedStream 上进行使用可以通过 stream.keyBy(…) 来得到 KeyedStream 。 Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State)
ValueState存储单值类型的状态。可以使用 update(T) 进行更新并通过 T value() 进行检索。
ListState存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素并通过 get() 获得整个列表。
ReducingState用于存储经过 ReduceFunction 计算后的结果使用 add(T) 增加元素。
AggregatingState用于存储经过 AggregatingState 计算后的结果使用 add(IN) 添加元素。
FoldingState已被标识为废弃会在未来版本中移除官方推荐使用 AggregatingState 代替。
MapState维护 Map 类型的状态。 Code实操
例子1
使用KeyState中的ValueState来模拟实现maxBy 代码清单 import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author tiancx*/
public class StateMaxByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStreamTuple2String, Integer source env.fromElements(Tuple2.of(北京, 1),Tuple2.of(上海, 2),Tuple2.of(广州, 3),Tuple2.of(北京, 4),Tuple2.of(上海, 5),Tuple2.of(广州, 6),Tuple2.of(北京, 3)).keyBy(t - t.f0);source.map(new RichMapFunctionTuple2String, Integer, Tuple3String, Integer, Integer() {//定义状态用于存储最大值ValueStateInteger maxValueState null;//进行初始化Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptorInteger descriptor new ValueStateDescriptor(maxValueState, Integer.class);maxValueState getRuntimeContext().getState(descriptor);}Overridepublic Tuple3String, Integer, Integer map(Tuple2String, Integer value) throws Exception {//获取当前值Integer currentVal value.f1;Integer currentMax maxValueState.value();if (currentMax null || currentVal currentMax) {maxValueState.update(currentVal);}return Tuple3.of(value.f0, value.f1, maxValueState.value());}}).print();env.execute();}
}运行看结果 例子2
如果一个人的体温超过阈值38度超过3次及以上则输出: 姓名 [温度1,温度2,温度3] 代码清单 import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.List;/*** author tiancx*/
public class StateDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSourceString stream env.socketTextStream(localhost, 9999);DataStreamTuple2String, Integer source stream.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] split value.split( );return Tuple2.of(split[0], Integer.parseInt(split[1]));}}).keyBy(t - t.f0);source.flatMap(new RichFlatMapFunctionTuple2String, Integer, Tuple2String, ListInteger() {ListStateInteger listState null;//存放超过38度的次数ValueStateInteger valueState null;Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptorInteger listStateDescriptor new ListStateDescriptorInteger(listState, Integer.class);ValueStateDescriptorInteger descriptor new ValueStateDescriptor(valueState, Integer.class);listState getRuntimeContext().getListState(listStateDescriptor);valueState getRuntimeContext().getState(descriptor);}Overridepublic void flatMap(Tuple2String, Integer value, CollectorTuple2String, ListInteger out) throws Exception {System.out.println(进入flatMap);Integer val value.f1;if (valueState.value() null) {valueState.update(0);}if (val 38) {listState.add(val);valueState.update(valueState.value() 1);}if (valueState.value() 3) {ListInteger list (ListInteger) listState.get();out.collect(Tuple2.of(value.f0, list));listState.clear();valueState.clear();}}}).print();env.execute();}
} 输入 运行结果 算子状态OperatorState
算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。算子状态跟数据的 key 无关所以不同 key 的数据只要被分发到同一个并行子任务就会访问到同一个 Operator State。
算 子 状 态 也 支 持 不 同 的 结 构 类 型 主 要 有 三 种 ListState 、 UnionListState 和BroadcastState。 code实操
例子1
在 map 算子中计算数据的个数 代码清单 import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** author tiancx*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString stream env.socketTextStream(localhost, 9999);stream.map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunctionString, Long, CheckpointedFunction {private Long count 0L;private ListStateLong listState;Overridepublic Long map(String value) throws Exception {return count;}/*** 本地变量持久化将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法** param context the context for drawing a snapshot of the operator* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(MyCountMapFunction.snapshotState);listState.clear();listState.add(count);}/*** 初始化本地变量程序启动和恢复时从状态中把数据添加到本地变量每个子任务调用一次** param context the context for initializing the operator* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(MyCountMapFunction.initializeState);//从上下文初始化状态listState context.getOperatorStateStore().getListState(new ListStateDescriptor(listState, Types.LONG()));//从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Long aLong : listState.get()) {count aLong;}}}}
}输入 运行结果 【都看到这了点点赞点点关注呗爱你们】 ✨ 正在努力的小叮当~ 超级爱分享分享各种有趣干货 提供模拟面试 | 简历诊断 | 独家简历模板 感谢关注关注了你就是我的超级粉丝啦 以下内容仅对你可见~
作者小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页小叮当撩代码
GZH哆啦A梦撩代码
欢迎关注点赞收藏⭐️留言