当前位置: 首页 > news >正文

图书馆网站建设方案网页设计代码模板百度模板

图书馆网站建设方案,网页设计代码模板百度模板,国家企业信用信息公示系统贵州,学生创意设计作品说明一、目的 由于部分数据类型频率为1s#xff0c;从而数据规模特别大#xff0c;因此完整的JSON放在Hive中解析起来#xff0c;尤其是在单机环境下#xff0c;效率特别慢#xff0c;无法满足业务需求。 而Flume的拦截器并不能很好的转换数据#xff0c;因为只能采用Java方…一、目的 由于部分数据类型频率为1s从而数据规模特别大因此完整的JSON放在Hive中解析起来尤其是在单机环境下效率特别慢无法满足业务需求。 而Flume的拦截器并不能很好的转换数据因为只能采用Java方式从Kafka的主题A中采集数据并解析字段然后写入到放在Kafka主题B中 二 、原始数据格式 JSON格式比较正常对象中包含数组 {     deviceNo: 39,     sourceDeviceType: null,     sn: null,     model: null,     createTime: 2024-09-03 14:10:00,     data: {         cycle: 300,         evaluationList: [{             laneNo: 1,             laneType: null,             volume: 3,             queueLenMax: 11.43,             sampleNum: 0,             stopAvg: 0.54,             delayAvg: 0.0,             passRate: 0.0,             travelDist: 140.0,             travelTimeAvg: 0.0         },         {             laneNo: 2,             laneType: null,             volume: 7,             queueLenMax: 23.18,             sampleNum: 0,             stopAvg: 0.47,             delayAvg: 10.57,             passRate: 0.0,             travelDist: 140.0,             travelTimeAvg: 0.0         },         {             laneNo: 3,             laneType: null,             volume: 9,             queueLenMax: 11.54,             sampleNum: 0,             stopAvg: 0.18,             delayAvg: 9.67,             passRate: 0.0,             travelDist: 140.0,             travelTimeAvg: 0.0         },         {             laneNo: 4,             laneType: null,             volume: 6,             queueLenMax: 11.36,             sampleNum: 0,             stopAvg: 0.27,             delayAvg: 6.83,             passRate: 0.0,             travelDist: 140.0,             travelTimeAvg: 0.0         }]     } } 三、Java代码 package com.kgc;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; import java.util.Properties;public class KafkaKafkaEvaluation {// 添加 Kafka Producer 配置private static Properties producerProps() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.0.70:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, -1);props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);return props;}public static void main(String[] args) {Properties prop new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.0.70:9092);prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 每一个消费都要定义不同的Group_IDprop.put(ConsumerConfig.GROUP_ID_CONFIG, evaluation_group);KafkaConsumerString, String consumer new KafkaConsumer(prop);consumer.subscribe(Collections.singleton(topic_internal_data_evaluation));ObjectMapper mapper new ObjectMapper();// 初始化 Kafka ProducerKafkaProducerString, String producer new KafkaProducer(producerProps());while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {try {JsonNode rootNode mapper.readTree(record.value());System.out.println(原始数据rootNode);String device_no rootNode.get(deviceNo).asText();String source_device_type rootNode.get(sourceDeviceType).asText();String sn rootNode.get(sn).asText();String model rootNode.get(model).asText();String create_time rootNode.get(createTime).asText();String cycle rootNode.get(data).get(cycle).asText();JsonNode evaluationList rootNode.get(data).get(evaluationList);for (JsonNode evaluationItem : evaluationList) {String lane_no evaluationItem.get(laneNo).asText();String lane_type evaluationItem.get(laneType).asText();String volume evaluationItem.get(volume).asText();String queue_len_max evaluationItem.get(queueLenMax).asText();String sample_num evaluationItem.get(sampleNum).asText();String stop_avg evaluationItem.get(stopAvg).asText();String delay_avg evaluationItem.get(delayAvg).asText();String pass_rate evaluationItem.get(passRate).asText();String travel_dist evaluationItem.get(travelDist).asText();String travel_time_avg evaluationItem.get(travelTimeAvg).asText();String outputLine String.format(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);// 发送数据到 KafkaProducerRecordString, String producerRecord new ProducerRecord(topic_db_data_evaluation, record.key(), outputLine);producer.send(producerRecord, (RecordMetadata metadata, Exception e) - {if (e ! null) {e.printStackTrace();} else {System.out.println(The offset of the record we just sent is: metadata.offset());}});}} catch (Exception e) {e.printStackTrace();}}consumer.commitAsync();}}}1、服务器IP都是   192.168.0.70 2、消费Kafka主题数据源topic_internal_data_evaluation 3、生产Kafka主题目标源topic_db_data_evaluation 4、注意字段顺序与ODS层表结构字段顺序一致 四、开启Kafka主题topic_db_data_evaluation消费者 [rootlocalhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning 五、运行测试 1、启动项目 2、消费者输出数据 然后再用Flume采集写入HDFS就行了不过ODS层表结构需要转变 六、ODS层新表结构 create external table if not exists hurys_dc_ods.ods_evaluation(device_no string COMMENT 设备编号,source_device_type string COMMENT 设备类型,sn string COMMENT 设备序列号 ,model string COMMENT 设备型号,create_time timestamp COMMENT 创建时间,cycle int COMMENT 评价数据周期,lane_no int COMMENT 车道编号,lane_type int COMMENT 车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道,volume int COMMENT 车道内过停止线流量辆,queue_len_max float COMMENT 车道内最大排队长度m,sample_num int COMMENT 评价数据计算样本量,stop_avg float COMMENT 车道内平均停车次数次,delay_avg float COMMENT 车道内平均延误时间s,pass_rate float COMMENT 车道内一次通过率,travel_dist float COMMENT 车道内检测行程距离m,travel_time_avg float COMMENT 车道内平均行程时间 ) comment 评价数据外部表——静态分区 partitioned by (day string) row format delimited fields terminated by , stored as SequenceFile ; 七、Flume采集配置文件 八、运行Flume任务检查HDFS文件、以及ODS表数据 --刷新表分区 msck repair table ods_evaluation; --查看表分区 show partitions hurys_dc_ods.ods_evaluation; --查看表数据 select * from hurys_dc_ods.ods_evaluation where day2024-09-03; 搞定这样就不需要在Hive中解析JSON数据了
http://www.hkea.cn/news/14427202/

相关文章:

  • 网站建设小程序公众号销售有哪些网站可以做网站游戏
  • 广元市网站建设深圳建立企业网站
  • 海沧区建设局网站wordpress文章页面微信分享代码
  • 能免费做婚礼邀请函的网站3d网站设计
  • 大连做网站的公司有哪些南京外贸网站建设系统
  • 做网站百科网页制作教程和素材
  • 专业做农牧应聘的网站罗湖网站建
  • 消防微型建设标准的网站是多少专做机械零配件的网站
  • 南京网站设计制作网络营销是做什么
  • 邯郸渊博网络有限公司石景山网站seo优化排名
  • 音乐网站模板下载建站哪家好社区
  • 建网站衡水哪家强?沈阳专业建站
  • 测速网站怎么做wordpress数据库添加用户
  • 织梦模板 行业网站百度企业查公司名录
  • 桂林网站设计公司怎么选择丹徒网站建设
  • wordpress菜单背景沈阳网站的优化
  • 网站建设移动网络建e网是什么软件
  • 网站统计如何做广告制作公司员工提成
  • 网站怎么制作教程个人网站论文设计内容简介
  • 门户网站改版steam交易链接怎么用
  • 宝安商城网站建设哪家便宜汕头澄海网站建设
  • 各大网站收录提交入口如何让网站关键词搜录
  • 网站规划步骤网站轮播图片怎么做
  • 怎么知道自己的网站被k软件推广赚钱
  • 海外购物网站排名安徽设计公司排名
  • 海外网站推广有网但是网页打不开是什么原因
  • 平台网站很难做双井网站建设公司
  • mc建筑网站有什么国企是做网站的
  • 大连自助建站如何做好网络宣传工作
  • vi设计网站大全wordpress rss解析