当前位置: 首页 > news >正文

个人博客网站html模板荆门今日头条新闻发布

个人博客网站html模板,荆门今日头条新闻发布,axure 做网站原型图,菠菜网站怎么做推广一、简介 flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处…

一、简介

flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单

如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现

二、pom.xml 文件

注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
在这里插入图片描述

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.ye</groupId><artifactId>flink-study</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.0</flink.version><!--<flink.version>1.12.7</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
<!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ye.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

三、自定义数据源

使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串

public class TimerSinkRich extends RichSourceFunction<String> {private  ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();private boolean flag = true;private Timer timer;private TimerTask timerTask;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask = new TimerTask() {@Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag){if(queue.size()>0){ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if(null!=timer) timer.cancel();if(null!=timerTask) timerTask.cancel();// 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败if(queue.size()<=0) flag = false;}
}

四、flink 加载数据源并启动

public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");}
}

本地测试成功

在这里插入图片描述

五、上传 flink 集群

1、flink 1.16.0

启动成功
在这里插入图片描述
撤销任务成功
在这里插入图片描述
solt 也成功释放
在这里插入图片描述

2、flink 1.12.7

启动成功
在这里插入图片描述
撤销任务当然也没问题,同样能正常释放 slot
在这里插入图片描述

当然你也可以不要 open() 方法

public class DiySinkRich extends RichSourceFunction<String> {private TimerTask timerTask;private Timer timer;private boolean flag = true;private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();@Overridepublic void run(SourceFunction.SourceContext<String> ctx) throws Exception {timerTask = new TimerTask() {@Overridepublic void run() {Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() > 0) {ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if (timer != null) timer.cancel();if (timerTask != null) timerTask.cancel();if (queue.size() == 0) flag = false;}
}

以上就是 flink 定时加载数据源的简单实例

http://www.hkea.cn/news/554568/

相关文章:

  • 茂名整站优化百度问答首页
  • 手机网站搭建网络宣传方式
  • 2003网站建设网站seo哪家公司好
  • 成都学校网站制作2022年国际十大新闻
  • 工厂外贸网站建设台州网络推广
  • 酒店网站建设方案策划百度seo怎么做网站内容优化
  • 网站更改公司需要重新备案吗搜索网页内容
  • 现在做网站还用dw做模板了吗成人电脑速成培训班
  • 做app要不要建网站刚开的店铺怎么做推广
  • 做生存分析的网站有哪些专业的网站优化公司
  • 网站双倍浮动百度联盟app
  • 北京网站设计确保代码符合w3c广州网络营销的推广
  • 做网站实名认证有什么用百度移动端模拟点击排名
  • 知更鸟wordpress 怎样沈阳百度seo关键词优化排名
  • 携程网站模板互联网营销策略有哪些
  • 做网站内链什么意思上海排名优化seobwyseo
  • 四川做直销会员网站百度网盘帐号登录入口
  • 做百度竞价对网站有无要求网站推广排名服务
  • 建设工程合同包括成都网站改版优化
  • 深圳不加班的互联网公司整站seo优化
  • 中国做的很好的食品网站肇庆疫情最新消息
  • 做时时彩网站微信seo关键词有话要多少钱
  • 陇南市建设局网站商务软文写作
  • 做学术研究的网站营销方案怎么写?
  • 专业网站设计公司有哪些秒收录关键词代发
  • 织梦网站模板源码下载真实有效的优化排名
  • 网站建设过程中什么最重要磁力链bt磁力天堂
  • html5企业网站案例鹤壁搜索引擎优化
  • 网站建设平台简介链接交换平台
  • 照片展示网站模板宁波seo咨询