当前位置: 首页 > news >正文

淄博 做网站微信模板图片

淄博 做网站,微信模板图片,一个新的网站怎么做SEO优化,企业宣传网站制作水善利万物而不争#xff0c;处众人之所恶#xff0c;故几于道#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 … 水善利万物而不争处众人之所恶故几于道 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 fromCollection(waterSensors) public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);ListWaterSensor waterSensors Arrays.asList(new WaterSensor(ws_001, 1577844001L, 45),new WaterSensor(ws_002, 1577844015L, 43),new WaterSensor(ws_003, 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 2. 从本地文件中读取数据 readTextFile(“input/words.txt”)支持相对路径和绝对路径 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(input/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}运行结果 3. 从HDFS中读取数据 readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”) 要先在pom文件中添加hadoop-client依赖 dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version /dependencypublic static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile(hdfs://hadoop101:8020/flink/data/words.txt).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 4. 从Socket中读取数据 socketTextStream(“hadoop101”,9999)这个输入源不支持多个并行度。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据 windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream(hadoop101,9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 5. 从Kafka中读取数据 addSource(new FlinkKafkaConsumer(“flink_source_kafka”,new SimpleStringSchema(),properties)) 第一个参数是topic 第二个参数是序列化器序列化器就是在Kafka和flink之间转换数据 - 官方注释The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。 第三个参数是Kafka的配置。 public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties new Properties();// 设置集群地址properties.setProperty(bootstrap.servers, hadoop101:9092,hadoop102:9092,hadoop103:9092);// 设置所属消费者组properties.setProperty(group.id, flink_consumer_group);env.addSource(new FlinkKafkaConsumer(flink_source_kafka,new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();} }运行结果 6. 自定义Source addSource(new XXXX()) 大多数情况下前面的数据源已经能够满足需要但是难免会存在特殊情况的场合所以flink也提供了能自定义数据源的方式. public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf new Configuration();conf.setInteger(rest.port,1000);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}} }自定义数据源需要定义一个类然后实现SourceFunction接口然后实现其中的两个方法run和cancelrun方法包含具体读数据的逻辑当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止 public class RandomWatersensor implements SourceFunctionWaterSensor {private Boolean running true;Overridepublic void run(SourceContextWaterSensor sourceContext) throws Exception {Random random new Random();while (running){sourceContext.collect(new WaterSensor(sensor random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {running false;}}运行结果 demo2 - 自定义从socket中读取数据 public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource(hadoop102, 9999)).print();env.execute();}public static class MySource implements SourceFunctionWaterSensor {private String host;private int port;private volatile boolean isRunning true;private Socket socket;public MySource(String host, int port) {this.host host;this.port port;}Overridepublic void run(SourceContextWaterSensor ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket new Socket(host, port);BufferedReader reader new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line null;while (isRunning (line reader.readLine()) ! null) {String[] split line.split(,);ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/Overridepublic void cancel() {isRunning false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}} } /* sensor_1,1607527992000,20 sensor_1,1607527993000,40 sensor_1,1607527994000,50*/
http://www.hkea.cn/news/14589554/

相关文章:

  • 中航长江建设工程有限公司网站西宁招聘网站开发
  • 网站设计是干什么的网站原型怎么做
  • 网站建设哪家技术好金华市网站建设最低价
  • 有了源码怎么搭建网站萧山大江东规划国土建设局网站
  • 网站收录少了如何搭建系统平台
  • 宁夏建设厅网站做宣传页的网站
  • 做网站的实验报告网站加载速度
  • python 网站架构如何开发医院
  • 如何自己建公司网站优秀广告案例分析
  • 河南网站推广多少钱wordpress导入媒体失败
  • 网站名称要注册吗网站建设的人员配置
  • 企业网站建立的目的5年的室内设计师收入
  • 门户网站建设思维导图手机站模板
  • 网站注销流程服饰网站模板设计
  • 实搜网站建设企业制作网站哪家好
  • php网站开发中如何网络搭建是什么意思
  • 折扣网站模板专业网页设计软件
  • 做淘客的网站名称零基础考二建有多难
  • 口碑好的购物网站建设wordpress运行环境
  • 湖南省建设厅官网站重庆关键词优化
  • 深圳平湖网站开发导航网站html模板
  • 怎么做门户网站设计方案做设计接单的网站
  • 高端品牌网站建设案例公众号江苏建设信息网站
  • .net 网站 调试wordpress切换div组件
  • 南昌网站建设志博chatgpt 网站
  • 灰色行业老域名做网站不收录yellow免费观看完整
  • 网站删除模块wordpress博客页面无法显示
  • 珠海网站运营网站建设运行
  • 电子商务网站建设移动电商开发品牌形象策划设计公司
  • 微网站 好处wordpress主题演示站