行业网站推广,网站收录怎么设置,网络营销的工具和方法,wordpress 链接管理文章目录 背景主要步骤代码 背景
通常使用flink 提供的高级算子来编写flink 任务#xff0c;对底层不是很了解#xff0c;尤其是如何生成作业图的细节 下面通过构造一个有向无环图#xff0c;来实际看一下
主要步骤
1.增加source 2.增加operator 3. 增加一条边#xff0… 文章目录 背景主要步骤代码 背景
通常使用flink 提供的高级算子来编写flink 任务对底层不是很了解尤其是如何生成作业图的细节 下面通过构造一个有向无环图来实际看一下
主要步骤
1.增加source 2.增加operator 3. 增加一条边连接source和operator 4. 增加sink 5. 增加一条边连接operator和sink
代码 // Step 1: Create basic configurationsConfiguration configuration new Configuration();ExecutionConfig executionConfig new ExecutionConfig();CheckpointConfig checkpointConfig new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunctionLong, String generatorFunction index - Number: index;DataGeneratorSourceString source new DataGeneratorSource(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactoryString sourceOperatorFactory new SourceOperatorFactory(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, sourceNode, sourceDescription, sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), sourceSlot);// Step 4: Add a map operator to transform the dataStreamMapString, String mapOperator new StreamMap(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactoryString mapOperatorFactory SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, mapNode, mapDescription, mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), mapSlot);// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMapString, String sinkOperator new StreamMap(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactoryString sinkOperatorFactory SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, sinkNode, sinkDescription, sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), sinkSlot);// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result miniCluster.executeJobBlocking(jobGraph);System.out.println(Job completed with result: result);// Step 12: Stop the MiniClusterminiCluster.close();