当前位置: 首页 > news >正文

网站建设费包括什么wordpress滑动切换

网站建设费包括什么,wordpress滑动切换,烟台网站推广广州公司,本人已履行网站备案信息目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、lat…目录 1、添加pom依赖 2、API使用说明 3、这是一个完整的入门案例 4、Kafka消息应该如何解析 4.1、只获取Kafka消息的value部分 ​4.2、获取完整Kafka消息(key、value、Metadata) 4.3、自定义Kafka消息解析器 5、起始消费位点应该如何设置 ​5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分区扩容了该怎么办 —— 动态分区检查 7、在加载KafkaSource时提取事件时间添加水位线 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 1、添加pom依赖 我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka 该工具实现了一个消费者FlinkKafkaConsumer可以用它来读取kafka的数据 如果想使用这个通用的Kafka连接工具需要引入jar依赖 !-- 引入 kafka连接器依赖-- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.0/version /dependency 2、API使用说明 官网链接Apache Kafka 连接器 语法说明:  // 1.初始化 KafkaSource 实例 KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点) .setTopics(input-topic) // 必填指定要消费的topic.setGroupId(my-group) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据转换为flink数据类型).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build(); // 2.通过 fromSource KafkaSource 获取 DataStreamSource env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source); 3、这是一个完整的入门案例 开发语言java1.8 flink版本flink1.17.0 public class ReadKafka {public static void main(String[] args) throws Exception {newAPI();}public static void newAPI() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(20230810) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建).setValueOnlyDeserializer(new SimpleStringSchema()) // 必填指定反序列化器(用来解析kafka消息数据).setStartingOffsets(OffsetsInitializer.earliest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source,WatermarkStrategy.noWatermarks(),Kafka Source).print();// 3.触发程序执行env.execute();} }4、Kafka消息应该如何解析 代码中需要提供一个反序列化器Deserializer来对 Kafka 的消息进行解析 反序列化器的功能 将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象) 反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定 下面介绍两种常用Kafka消息解析器 KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) 1、返回完整的Kafka消息将JSON字符串反序列化为ObjectNode对象 2、可以选择是否返回Kafak消息的Metadata信息true-返回false-不返回 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 1、只返回Kafka消息中的value部分  4.1、只获取Kafka消息的value部分 4.2、获取完整Kafka消息(key、value、Metadata) kafak消息格式 key   {nation:蜀国} value  {ID:整数} public static void ParseMessageJSONKeyValue() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092) // 必填指定broker连接信息 (为保证高可用,建议多指定几个节点).setTopics(9527) // 必填指定要消费的topic.setGroupId(FlinkConsumer) // 必填指定消费者的groupid(不存在时会自动创建)// 必填指定反序列化器(将kafak消息解析为ObjectNodejson对象).setDeserializer(KafkaRecordDeserializationSchema.of(// includeMetadata (true:返回Kafak元数据信息 false:不返回)new JSONKeyValueDeserializationSchema(true))).setStartingOffsets(OffsetsInitializer.latest()) // 可选指定启动任务时的消费位点不指定时将默认使用 OffsetsInitializer.earliest().build();env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source).print();// 3.触发程序执行env.execute();}运行结果     常见报错  Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic 9527, partition 0, leaderEpoch 0, offset 1064, CreateTime 1691668775938, serialized key size 4, serialized value size 9, headers RecordHeaders(headers [], isReadOnly false), key [B5e9eaab8, value [B67390400).at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)... 14 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token xxxx: was expecting (JSON String, Number, Array, Object or token null, true or false)at [Source: (byte[])xxxx; line: 1, column: 5] 报错原因 出现这个报错一般是使用flink读取fafka时使用JSONKeyValueDeserializationSchema 来解析消息时kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误 例如下面这个格式就会造成解析错误  key1000value你好 那应该怎么解决呢 1、如果有权限修改Kafka消息格式可以将Kafka消息keyvalue内容修改为Json格式 2、如果没有权限修改Kafka消息格式(比如线上环境修改比较困难)可以重新实现 JSONKeyValueDeserializationSchema类根据所需格式来解析Kafka消息(可以参考源码) 4.3、自定义Kafka消息解析器 生产中对Kafka消息及解析的格式总是各种各样的当flink预定义的解析器满足不了业务需求时可以通过自定义kafka消息解析器来完成业务的支持 例如当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时只返回了 offset、topic、partition 三个字段信息现在需要kafka生产者写入数据时的timestamp就可以通过自定义kafka消息解析器来完成 代码示例 // TODO 自定义Kafka消息解析器在 metadata 中增加 timestamp字段 public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchemaObjectNode{private static final long serialVersionUID 1509391548173891955L;private final boolean includeMetadata;private ObjectMapper mapper;public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {this.includeMetadata includeMetadata;}Overridepublic void open(DeserializationSchema.InitializationContext context) throws Exception {mapper JacksonMapperFactory.createObjectMapper();}Overridepublic ObjectNode deserialize(ConsumerRecordbyte[], byte[] record) throws Exception {ObjectNode node mapper.createObjectNode();if (record.key() ! null) {node.set(key, mapper.readValue(record.key(), JsonNode.class));}if (record.value() ! null) {node.set(value, mapper.readValue(record.value(), JsonNode.class));}if (includeMetadata) {node.putObject(metadata).put(offset, record.offset()).put(topic, record.topic()).put(partition, record.partition())// 添加 timestamp 字段.put(timestamp,record.timestamp());}return node;}Overridepublic boolean isEndOfStream(ObjectNode nextElement) {return false;}Overridepublic TypeInformationObjectNode getProducedType() {return getForClass(ObjectNode.class);}}运行结果 5、起始消费位点应该如何设置 起始消费位点说明 起始消费位点是指 启动flink任务时应该从哪个位置开始读取Kafka的消息    下面介绍下常用的三个设置     OffsetsInitializer.earliest()  从最早位点开始消 这里的最早指的是Kafka消息保存的时长(默认为7天生成环境各公司略有不同) 该这设置为默认设置当不指定OffsetsInitializer.xxx时默认为earliest()  OffsetsInitializer.latest()    从最末尾位点开始消费 这里的最末尾指的是flink任务启动时间点之后生产的消息 OffsetsInitializer.timestamp(时间戳) 从时间戳大于等于指定时间戳毫秒的数据开始消费 下面用案例说明下三种设置的效果kafak生成10条数据如下 5.1、earliest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最早位置开始消费该设置为默认设置.setStartingOffsets(OffsetsInitializer.earliest()).build(); 运行结果 5.2、latest() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build(); 运行结果 5.3、timestamp() 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(23230811).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从指定时间戳后开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L)).build(); 运行结果 6、Kafka分区扩容了该怎么办 —— 动态分区检查 在flink1.13的时候如果Kafka分区扩容了只有通过重启flink任务才能消费到新增分区的数据小编就曾遇到过上游业务部门的kafka分区扩容了并没有通知下游使用方导致实时指标异常甚至丢失了数据。 在flink1.17的时候可以通过开启动态分区检查来实现不用重启flink任务就能消费到新增分区的数据 开启分区检查(默认不开启) KafkaSource.builder().setProperty(partition.discovery.interval.ms, 10000); // 每 10 秒检查一次新分区 代码示例 KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest())// 开启动态分区检查默认不开启.setProperty(partition.discovery.interval.ms, 10000) // 每 10 秒检查一次新分区.build(); 7、在加载KafkaSource时提取事件时间添加水位线 可以在 fromSource(source,WatermarkStrategy,sourceName) 时提取事件时间和制定水位线生成策略 注意当不指定事件时间提取器时Kafka Source 使用 Kafka 消息中的时间戳作为事件时间 7.1、使用内置的单调递增的水位线生成器 kafka timestamp 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器默认使用 kafka的timestamp作为事件时间WatermarkStrategy.forMonotonousTimestamps(),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果 7.2、使用内置的单调递增的水位线生成器 kafka 消息中的 ID字段 为事件时间 代码示例 // 在读取Kafka消息时提取事件时间插入水位线public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.读取kafka数据KafkaSourceObjectNode source KafkaSource.ObjectNodebuilder().setBootstrapServers(worker01:9092).setTopics(9527).setGroupId(FlinkConsumer)// 将kafka消息解析为Json对象并返回元数据.setDeserializer(KafkaRecordDeserializationSchema.of(new MyJSONKeyValueDeserializationSchema(true)))// 设置起始消费位点从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(source,// 使用内置的单调递增的水位线生成器使用 kafka消息中的ID字段作为事件时间WatermarkStrategy.ObjectNodeforMonotonousTimestamps()// 提取 Kafka消息中的 ID字段作为 事件时间.withTimestampAssigner((json, timestamp) - Long.parseLong(json.get(value).get(ID).toString())),Kafka Source)// 通过 ProcessFunction 查看提取的事件时间和水位线信息.process(new ProcessFunctionObjectNode, String() {Overridepublic void processElement(ObjectNode kafkaJson, ProcessFunctionObjectNode, String.Context ctx, CollectorString out) throws Exception {// 当前处理时间long currentProcessingTime ctx.timerService().currentProcessingTime();// 当前水位线long currentWatermark ctx.timerService().currentWatermark();StringBuffer record new StringBuffer();record.append(\n);record.append(kafkaJson \n);record.append(currentProcessingTime currentProcessingTime \n);record.append(currentWatermark currentWatermark \n);record.append(kafka-ID Long.parseLong(kafkaJson.get(value).get(ID).toString()) \n);record.append(kafka-timestamp Long.parseLong(kafkaJson.get(metadata).get(timestamp).toString()) \n);out.collect(record.toString());}}).print();// 3.触发程序执行env.execute();}运行结果
http://www.hkea.cn/news/14523501/

相关文章:

  • 衡阳县专业做淘宝网站食品品牌网站策划
  • 南宁响应式网站制作嘉峪关市建设局公示公告网站
  • 建设部门的网站京东建站模板
  • 如何做医美机构网站观察分析美克美家网站建设
  • 搭建一个商城网站微信网页视频怎么下载
  • 成都专业网站建设优化团队建设银行官网首页登录入口
  • 我想开个网站长沙网站建设建
  • 学校营销型网站楼盘网站建设方案
  • 网站建设工作室北京小俊哥九江做网站的
  • 广州设计网站公司江苏省交通厅门户网站建设管理
  • 企业招聘网站排行榜广州海珠做网站的公司
  • 深圳住房和建设局网站哪里预约保定建设工程信息网站
  • 用什么软件做网站模板手机软件用什么语言开发
  • 找程序员做网站wordpress 搜索小工具
  • 网站做网站反向代理违法wordpress被和谐
  • 衡水景县专业做淘宝网站公司百度竞价个人开户
  • ipv6 网站开发seowhy
  • 检察院门户网站建设自查报告google在线网页代理
  • 现在网站建设用什么软件800多块做网站
  • 有什么网站可以赚钱wordpress访客
  • 制作一个网站大概要多少钱个人如何做seo推广
  • 装修网站怎么做的网站建设实验报告总结
  • 山东营销网站建设设计广西百色公司注册
  • 帮人家做网站能赚多少钱第三方网站做企业满意度调查
  • 做网站设计公司价格重庆市建设工程信息网的信用信息发布平台
  • 电子商务网站建设前期准备品牌vi设计是什么
  • 网站建设代理平台有哪些不懂代码可以做网站吗
  • 网站建设公司做销售前景好不好?wordpress 新建
  • 云南网站制作公司教育类手机网站模板
  • 建设摩托车网站wordpress全局动态背景