上海市建设考核中心网站,html教程网站,做网站专业术语,想开个小说网站怎么做1、Flink概述
1.1 Flink是什么
Flink的官网主页地址#xff1a;https://flink.apache.org/ Flink的核心目标是“数据流上有状态的计算”(Stateful Computations over Data Streams)。 具体说明#xff1a;Apache Flink是一个“框架和分布式处理引擎”#xff0c;用于对无界…1、Flink概述
1.1 Flink是什么
Flink的官网主页地址https://flink.apache.org/ Flink的核心目标是“数据流上有状态的计算”(Stateful Computations over Data Streams)。 具体说明Apache Flink是一个“框架和分布式处理引擎”用于对无界和有界数据流进行有状态计算。
1.1.1 无界数据流
有定义流的开始但是没有定义流的结束它们会无休止的产生数据无界流的数据必须持续处理即数据被摄取后需要立即处理。我们不能等到所有数据都到达再处理因为输入时无限的。
1.1.2 有界数据流
有定义流的开始也有定义流的结束有界流可以在摄取所有数据后再进行计算有界流所有的数据可以被排序所有并不需要有序摄取有界流处理通常被称为批处理
1.1.3 有状态流处理
把流处理需要的额外数据保存成一个“状态”然后针对这条数据进行处理并且更新状态这就是所谓的“有状态的流处理”。
状态在内存中优点速度快缺点可靠性差状态在分布式系统中优点可靠性高缺点速度慢
1.1.4 Flink发展历史 1.2 Flink特点
我们处理数据的目标是低延迟、高吞吐、结果的准确性和良好的容错性。 Flink主要特点如下
高吞吐和低延迟每秒处理数百万个事件毫秒级延迟结果的准确性Flink提供了事件时间event-time和处理时间processing-time语义。对于乱序事件流事件时间语义仍然能提供一致且准确的结果。精确一次exactly-once的状态一致性保证可以连接到最常用的外部系统如kafka、Hive、JDBC、HDFS、Redis等高可用本身高可用的设置加上K8SYarn和Mesos的紧密集成再加上从故障中快速恢复和动态扩展任务的能力Flink能做到以极少的停机时间7x24全天候运行。
1.3 Flink和SparkStreaming说实话没有比较的必要
1、Spark是以批处理为根本。 2、Flink是以流处理为根本。
1.4 Flink的应用场景
1、电商和市场营销 2、物联网IOT 3、物流配送和服务业 4、银行和金融业
1.5 Flink分层API 有状态流处理通过底层API处理函数对原始数据加工处理。底层API和DataStreamAPI相集成可以处理复杂的计算。DataStreamAPI流处理和DataSetAPI批处理封装了底层处理函数提供了通用的模块比如转换transformations包括mapflatMap等连接joins聚合aggregations窗口Windows操作等。注意Flink1.12后DataStreamAPI已经实现真正的流批一体所以DataSetAPI已经过时。TableAPI是以表为中心的声明式编程其中表可能会动态变化。TableAPI遵循关系模型表有二维数据结构类似于关系数据库中的表同时API提供可比较的操作例如select、project、join、group by、aggregate等。我们可以在表与DataStream/DataSet之间无缝切换以允许程序将TableAPI与DataStream以及DataSet混用。SQL这一层在语法与表达能力上与TableAPI类似但是是以SQL查询表达式的形式表现程序。SQL抽象与TableAPI交互密切同时SQL查询可以直接在TableAPI定义的表上执行。
2、Flink快速上手
2.1 创建项目
在准备好所有的开发环境之后我们就可以开始开发自己的第一个Flink程序了。首先我们要做的就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。 1、创建工程 1打开IntelliJ IDEA创建一个Maven工程。 2、添加项目依赖
propertiesflink.version1.17.0/flink.version
/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency
/dependencies2.2 WordCount代码编写大数据常用的例子
需求统计一段文字中每个单词出现的频次 环境准备创建一个com.zhm.wordcount包
2.2.1 批处理
批处理的基本思路先逐行读入文件数据然后将每一行文子拆分成单词接着按照单词分组统计每组数据的个数就是对应单词的频次。 1、数据准备 1在工程根目录下新建一个data文件夹并在下面创建文本文件words.txt 2在文件中输入一些单词
hello hello hello
world world
hello world2、代码编写 1在com.zhm.wordcount包下新建一个Demo01_BatchProcess类 /*** ClassName Batch* Description 利用Flink批处理单词统计* Author Zouhuiming* Date 2023/9/3 9:58* Version 1.0*/import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**计算的套路(1) 计算的环境SparkSparkContextMRDriverFlinkExecutionEnvironment(2) 把要计算的数据封装为计算模型SparkRDDSpark CoreDateFrame|DataSetSparkSQLDStreamSparkStreamMRk-VFlink:DataSource(3)调用计算APIRDD.转换算子()MR:自己去编写Mapper、ReducerFlink:DataSource.算子()*/
public class Demo01_BatchProcess {public static void main(String[] args) throws Exception {//创建支持Flink计算的环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();//使用环境去读取数据封装为计算模型DataSourceString dataSource env.readTextFile(data/words.txt);//调用计算APIdataSource.flatMap(new FlatMapFunctionString, Tuple2String,Integer() {Overridepublic void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {String[] split s.split( );for (String s1 : split) {collector.collect(new Tuple2String,Integer(s1,1));}}}).groupBy(0).sum(1).print();}
}
运行结果 注意这种实现是基于DataSetAPI的也就是我们对数据的处理转换是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构批量的数据集本质上也是流没有必要用两套不同的API来实现。所以从Flink1.12开始官方推荐的做法是直接使用DataStreamAPI在提交任务时通过将执行模式设为BATCH来进行批处理
bin/flink run -Dexecution.runtime-modeBATCH BatchWordCount.jar这样DataSetAPI就没有用了在实际应用中我们只要维护一套DataStreamAPI就可以。这里只是为了方便大家理解我们依然用DataSetAPI做了批处理的实现。
2.2.2 流处理
对于Flink而言流才是整个处理逻辑的底层核心所以流批一体之后的DataStreamAPI更加强大可以直接处理批处理和流处理的所有场景。 下面我们就针对不同类型的的输入数据源用具体的代码来实现流处理。 1、读取文件有界流 我们同样试图读取文档words.txt中的数据并统计每个单词出现的频次。整体思路与之前的批处理非常类似代码模式也基本一致。 在com.zhm.wordcount包下新建一个Demo02_BoundedStreamProcess类
package com.zhm.wordcount;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** ClassName Demo02_BoundedStreamProcess* Description 有界流* Author Zouhuiming* Date 2023/9/3 10:26* Version 1.0*/public class Demo02_BoundedStreamProcess {public static void main(String[] args) throws Exception {//1、创建支持Flink计算的环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//1.1 设置一个线程处理这个流默认是根据你的cpu数和单词种类个数取最小值
// env.setParallelism(1);//2、获取数据源FileSourceString source FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(data/words.txt)).build();//3、利用环境将数据源的数据封装为计算模型DataStreamSourceString streamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), myfile);//4、调用API对数据进行计算//4.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型word,1streamSource.flatMap(new FlatMapFunctionString, Tuple2String,Integer() {Overridepublic void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {String[] split s.split( );for (String s1 : split) {collector.collect(new Tuple2(s1,1));}}//4.2 根据word分组}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}//4.3 根据分组之后按照元组中的第二列聚相加}).sum(1)// 4.4 打印结果.print();//5、提交jobenv.execute();}
}
运行结果 和批处理程序BatchWordCount的不同
创建执行环境的不同流处理程序使用的是StreamExecutionEnvironment。转换处理之后得到的数据对象类型不同分组操作调用的方法是keyBy方法可以传入一个匿名函数作为键选择器KeySelector指定当前分组的key是什么。代码末尾需要调用env的execute方法开始执行任务。
2、读取Socket文本流无界流 在实际的生产环境中真正的数据流其实是无界的有开始却没有结束这就要求我们需要持续的处理捕获的数据。为了模拟这种场景可以监听Socket端口然后向该端口不断地发生数据。 1将StreamWordCount代码中读取文件数据的readTextFile方法替换成读取Socket文本流的方法socketTextStream。具体代码实现如下:
package com.zhm.wordcount;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** ClassName Demo03_UnBoundedStreamProcess* Description 无界流* Author Zouhuiming* Date 2023/9/3 10:39* Version 1.0*/
public class Demo03_UnBoundedStreamProcess {public static void main(String[] args) throws Exception {//1、创建支持Flink计算的环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//1.1 设置一个线程处理这个流env.setParallelism(1);//2、获取数据源DataStreamSourceString streamSource env.socketTextStream(hadoop102, 9999);//3.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型word,1streamSource.flatMap(new FlatMapFunctionString, Tuple2String,Integer() {Overridepublic void flatMap(String s, CollectorTuple2String, Integer collector) throws Exception {String[] split s.split( );for (String s1 : split) {collector.collect(new Tuple2(s1,1));}}//3.2 根据word分组}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}//3.3 根据分组之后按照元组中的第二列聚相加}).sum(1)// 3.4 打印结果.print();//4、提交jobenv.execute();}
}2在Linux环境的主机hadoop102上执行下列命令发送数据进行测试前提是要安装netcat
nc -lk hadoop102 99993启动Demo03_UnBoundedStreamProcess程序 我们会发现程序启动之后没有任何输出、也不会退出。这是正常的因为Flink的流处理是事件驱动的当前程序会一直处于监听状态只有接受数据才会执行任务、输出统计结果。
4从hadoop102发送数据 5观察idea控制台 说明Flink还具有一个类型提前系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的–只告诉Flink当前的元素由“船头、船身、船尾”构成根本无法重建出“大船”的模样这时就需要显示地提供类型信息才能使得应用程序正常工作或提高其性能。 因为对于flatMap里传入的Lambda表达式系统只能推断出返回的是Tuple2类型而无法得到TupleString,Long。只有显示地告诉系统当前的返回类型才能正确的解析出完整数据。
2.2.3 执行模式
从Flink 1.12开始官方推荐的做法是直接使用DataStream API在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。
// 流处理环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamAPI执行模式包括流执行模式、批执行模式和自动模式。
流执行模式Streaming 这是DataStreamAPI最经典的模式一边用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式。批执行模式Batch 专门用于批处理的执行模式自动模式 在这种模式下将由程序根据输入数据源是否有界来自动选择执行模式。 批执行模式的使用主要有两种方式 1通过命令行配置
bin/flink run -Dexecution.runtime-modeBATCH ...在提交作业时增加execution.runtime-mode参数指定值为BATCH。
2通过代码设置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);在代码中直接基于执行环境调用setRuntimeMode方法传入BATCH模式。 实际应用中一般不会在代码中配置而是使用命令行这样更加灵活。
2.2.4 本地WebUI
在Idea本地运行程序时可以通过添加本地WebUI依赖使用WebUI界面查看Job的运行情况。 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency添加后在代码中可以指定绑定的端口:
Configuration conf new Configuration();conf.setInteger(rest.port, 3333);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);之后在程序启动后打开本地浏览器访问localhost:3333即可查看job的运行情况。