当前位置: 首页 > news >正文

网站流量来源查询如何建设酒店预订系统网站

网站流量来源查询,如何建设酒店预订系统网站,网站建设公司studstu,中国招标投标公共信息服务平台一. Source 简介 DataStream是Flink的低级API#xff0c;用于进行数据的实时处理#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分#xff0c;如下图所示。 默认Flink提供了大量的内置Source#xff0c;常见的Source如下#xff1a; 基于文件的Sour…一. Source 简介 DataStream是Flink的低级API用于进行数据的实时处理Flink编程模型分为Source、Transformation、Sink三个部分如下图所示。 默认Flink提供了大量的内置Source常见的Source如下 基于文件的Source基于Socket的Source基于集合的Source基于Kafka消息队列的Source 当以上内置Source不能满足业务需要时可以实现自定义Source。 Flink中有关Source的接口类的继承关系如下 SourceFunction单并行度Source的基类RichSourceFunction单并行度增强型Source的基类ParallelSourceFunction多并行度Source的基类RichParallelSourceFunction多并行度增强型Source的基类 二. 自定义单并行度Source 自定义单并行度的source需要实现SourceFunction接口。 代码实现 MySource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random;public class MySource implements SourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {// Num加上0~100的随机数生成一个字符串ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MySource());source.print();env.execute(source_demo);} }运行结果 5 Num: 62 6 Num: 91 7 Num: 13 8 Num: 53三. 自定义多并行度Source 自定义多并行度的source需要实现ParallelSourceFunction接口。 代码实现 MyParallelSource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random;public class MyParallelSource implements ParallelSourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MyParallelSource());source.print();env.execute(source_demo);} } 运行结果 7 Num: 43 8 Num: 30 1 Num: 92 2 Num: 50 5 Num: 39 6 Num: 6 4 Num: 20 3 Num: 2四. 自定义单并行度增强型Source 增强型Source额外提供了open和close方法可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。 在mysql中创建student表并插入三条数据。 create table student (id int primary key,name varchar(50),age int );insert into student values(1, name1, 20),(2, name2, 30), (3, name3, 15);实现代码 Student.java package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id id;this.name name;this.age age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return Student{ id id , name name \ , age age };} } MysqlSource.java package flink.basic.source;import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;public class MysqlSource extends RichSourceFunctionStudent {Connection conn;Statement stmt;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://192.168.47.130:3306/test;String user root;String password root;conn DriverManager.getConnection(url,user,password);stmt conn.createStatement();}Overridepublic void run(SourceContextStudent ctx) throws Exception {ResultSet rs stmt.executeQuery(select * from student);while (rs.next()) {int id rs.getInt(id);String name rs.getString(name);int age rs.getInt(age);ctx.collect(new Student(id, name, age));}rs.close();}Overridepublic void cancel() {}Overridepublic void close() throws Exception {stmt.close();conn.close();} } SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSourceStudent source env.addSource(new MysqlSource());source.print();env.execute(source_demo);} }运行结果 1 Student{id3, namename3, age15} 8 Student{id2, namename2, age30} 7 Student{id1, namename1, age20}
http://www.hkea.cn/news/14271561/

相关文章:

  • 成都高端建设网站h5模板免费
  • 网站推广前景怎么样税收大数据
  • 做soho一定要做网站吗建设主管部门网站
  • 网站建设技术质量指标公司门户网站建设公司
  • seo网站三种链接网络网页设计制作公司
  • 南京500元做网站多用户旅游网站开发
  • 网站建立需要什么技术音乐网站后台模板
  • 网站链接查询四平网站优化
  • 专业做相册书的网站免费的html大作业网站
  • 专业建站推广服务wordpress 后台登陆 修改
  • 上线了做网站怎么查看张家界seo优化
  • 深圳企业学校网站建设辽宁高速公路建设局网站
  • 做网站用建站模版好还是定制好深圳网站建设公司小江
  • 靖州网站建设wordpress 调用自定义模板
  • 如何了解和掌握一个网站的权重志鸿优化设计官网
  • 做绿色软件的网站知乎wordpress slides
  • 长宁专业做网站旅游线路设计方案模板
  • 网站开发入什么费用网页无法打开
  • 泰州网站建设费用seo竞争对手网站分析
  • 有什么网站可以做初中试题如何做网页游戏代理
  • 新建网站推广泉州建站模板
  • 如何提高网站访客数wordpress 验证码访问
  • 学校网站群建设做彩票网站电话多少
  • wordpress 整站带数据wordpress主题几个网站
  • 怎么查看网站有没有做竞价郑州 制造 网站
  • 长春火车站核酸检测多久出结果站长是什么职位
  • 全网营销整合推广seo优化工作有哪些
  • 网站正在开发中北京网站建设公司 蓝纤科技
  • 抚顺网站开发郑州网站建设居易国际
  • 天津做网站联系方式wordpress增加购物车