当前位置: 首页 > news >正文

门户网站 模板之家怎么做网站前段

门户网站 模板之家,怎么做网站前段,商丘网站建设哪家专业,大型网站开发软件视频地址#xff1a;尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数… 视频地址尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数仓开发之ODS层 P015 第7章 数仓开发之ODS层 采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层这一层的作用是对数据做原样展示和备份。 8.2.2 动态拆分维度表功能 由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。 在实时计算中一般把维度数据写入存储容器一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。 这样的配置不适合写在配置文件中因为这样的话业务端随着需求变化每增加一张维度表表就要修改配置重启计算程序。所以这里需要一种动态配置方案把这种配置长期保存起来一旦配置有变化实时计算可以自动感知。这种可以有三个方案实现 一种是用Zookeeper存储通过Watch感知数据变化 另一种是用mysql数据库存储周期性的同步 再一种是用mysql数据库存储使用广播流。 这里选择第三种方案主要是MySQL对于配置数据初始化和维护管理使用FlinkCDC读取配置信息表将配置流作为广播流与主流进行连接。 第8章 数仓开发之DIM层 P016 8.1.1 Flink CDC 基于 Flink SQL CDC的实时数据同步方案https://github.com/ververica/flink-cdc-connectors P017 8.2 主要任务 package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务} } package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置获取对应的环境** param parallelism Flink 程序的并行度* return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://node001:8020/edu/ck);System.setProperty(HADOOP_USER_NAME, atguigu);return env;} } P018 package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数 // .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();} } P019 01、node001节点Linux命令 [atguigunode001 bin]$ jpsallnode001 4803 QuorumPeerMain 5236 Kafka 7941 Maxwell 5350 Application 6726 ConsoleConsumer 4458 NodeManager 8810 Jps 4043 DataNode 3869 NameNode 4654 JobHistoryServernode002 3505 ResourceManager 4066 QuorumPeerMain 4490 Kafka 5179 Jps 3660 NodeManager 3263 DataNodenode003 3505 SecondaryNameNode 5777 Jps 4369 Application 4279 Kafka 4569 Application 3354 DataNode 3851 QuorumPeerMain 3659 NodeManager [atguigunode001 bin]$ 启动hadoop、maxwell、kafka。 [atguigunode001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db [atguigunode001 ~]$ cd ~/bin [atguigunode001 bin]$ mysql_to_kafka_init.sh al {database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0}} {database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0} } 02、KafkaUtil.java package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数.setBootstrapServers(node001:9092).setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();} } 03、DimSinkApp.java package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } P020 flinkCDC监控mysql中的binlog。 {before:null,after:{source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r,ts_ms:1695262804254,transaction:null} {before:null, # 被修改之前的数据after:{ # 被修改之后的数据source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{ # 数据来源version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r, # optionr修改ts_ms:1695262804254,transaction:null } package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } P021 package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunctionJSONObject, String, Object() {//处理主流Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunctionJSONObject, String, Object.ReadOnlyContext readOnlyContext, CollectorObject collector) throws Exception {}//处理广播流Overridepublic void processBroadcastElement(String s, BroadcastProcessFunctionJSONObject, String, Object.Context context, CollectorObject collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } package com.atguigu.edu.realtime.bean;import lombok.Data;Data public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend; } P022 8.3.2 根据MySQL的配置表动态进行分流 7自定义函数DimBroadcastFunction package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.common.EduConfig; import com.atguigu.edu.realtime.util.DruidDSUtil; import com.atguigu.edu.realtime.util.PhoenixUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;import java.sql.*; import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段} } P023 package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.common.EduConfig; import com.atguigu.edu.realtime.util.DruidDSUtil; import com.atguigu.edu.realtime.util.PhoenixUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;import java.sql.*; import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection DriverManager.getConnection(jdbc:mysql://node001:3306/edu_config? userrootpassword123456useUnicodetrue characterEncodingutf8serverTimeZoneAsia/ShanghaiuseSSLfalse);PreparedStatement preparedStatement connection.prepareStatement(select * from edu_config.table_process);ResultSet resultSet preparedStatement.executeQuery();ResultSetMetaData metaData resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject new JSONObject();for (int i 1; i metaData.getColumnCount(); i) {String columnName metaData.getColumnName(i);String columnValue resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段} }
http://www.hkea.cn/news/14469846/

相关文章:

  • 网站建设流费用网上有多少个购物平台
  • 网站安全检测在线网站建设费属于服务类么
  • 网站页面模板页面布局如何备份wordpress网站
  • json取数据做网站罗湖区住房和建设局
  • 成都vr 网站开发衡阳市网站建设公司
  • 网站虚拟主机租用省建设厅网站合同备案用户名
  • 网站菜单导航制作商城app制作
  • 高端网站建设服务网站seo优化网站
  • 网站不能自行备案吗怎么做属于自己的音乐网站
  • 江阴网站建设大连专业手机自适应网站建设维护
  • 建设网站的建设费用包括什么科目室内设计公司排名十强
  • 教育培训机构网站模板网站建设的成本与费用有哪些
  • 网站视觉设计规范有动效网站
  • 百度建站柯林建站程序
  • 公司网站建设策划百度网盘搜索入口
  • 定制高端网站建设公司手机建网站优帮云
  • 自己免费建站平台推荐河北邯郸天气预报
  • 做网站 租服务器百度收录时间
  • 网站开发套餐微网站的建设
  • WordPress不关站备案插件建站宝盒 源码
  • node做网站后台做视频投稿赚钱的网站好
  • 做设计接外快在哪个网站多用户商城系统方案
  • 建立网站有什么用汕头论坛贴吧
  • 湖州做网站公司哪家好烟台优化网站
  • 成都网站建设 3e网站建设通过网站的和报刊建设
  • 世界之窗附近做网站公司做网站公司做网站公司
  • 网站编辑怎么赚钱网站维护外包
  • 湖南移动官网网站建设柳州团购汽车网站建设
  • 深圳专业建站平台北京市规划网站
  • 在门户网站做产品单页多少钱一天成都市网站开发公司服务