当前位置: 首页 > news >正文

b2c网站建设流程网络营销专业就业方向

b2c网站建设流程,网络营销专业就业方向,wordpress 商城模版,塘沽论坛网目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 在Kafka中保存的数…

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {private Integer userId;private String username;
}public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// do nothing}@Overridepublic byte[] serialize(String topic, User data) {try {// 如果数据是null,则返回nullif (data == null) return null;Integer userId = data.getUserId();String username = data.getUsername();int length = 0;byte[] bytes = null;if (null != username) {bytes = username.getBytes("utf-8");length = bytes.length;}// 第一个4字节存储userId的值// 第二个4字节存储username字节数组的长度int值// 第三个length长度,存储username序列化之后的字节数组ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);buffer.putInt(userId);buffer.putInt(length);buffer.put(bytes);return buffer.array();} catch (UnsupportedEncodingException e) {throw new SerializationException("序列化数据异常");}}@Overridepublic void close() {// do nothing}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/*** 自定义分区器*/
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 此处可以计算分区的数字。// 我们直接指定为2return 2;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);@Overridepublic ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {System.out.println("拦截器---go");// 此处根据业务需要对相关的数据作修改String topic = record.topic();Integer partition = record.partition();Long timestamp = record.timestamp();KEY key = record.key();VALUE value = record.value();Headers headers = record.headers();// 添加消息头headers.add("interceptor", "interceptor".getBytes());ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("拦截器---back");if (exception != null) {// 如果发生异常,记录在日志中LOGGER.error(exception.getMessage());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

http://www.hkea.cn/news/46360/

相关文章:

  • 可以免费做商业网站的cms百度seo霸屏软件
  • 哪家网站建设专业快速建站教程
  • 坪山网站建设行业现状优化seo方案
  • 做网站需要架构师吗网站平台有哪些
  • 网站建设丿选择金手指15凡科建站官网
  • 可以做外国网站文章武汉企业seo推广
  • 天津网站建设公司最好太原做网站哪家好
  • 网站代下单怎么做百度指数数据分析平台入口
  • 淘宝做动效代码的网站seo的优化方向
  • 番禺建网站公司网站搜索工具
  • 安徽万振建设集团网站长春网站推广公司
  • 网站怎么制作 推广seo超级外链工具免费
  • 中小学网站建设探讨东莞seo整站优化火速
  • php是网站开发的语言吗企业网站的作用
  • 网站站外优化怎么做企业推广app
  • 拉趣网站是谁做的威海网站制作
  • 做宣传海报的网站百度导航2023年最新版
  • 湖南做网站 磐石网络windows优化大师官方免费
  • 制作网站的最新软件如何优化关键词的方法
  • 东莞工作招聘网最新招聘搜索 引擎优化
  • 宁波俄语网站建设免费发广告的平台有哪些
  • 郑州外贸网站建设及维护营销软件商城
  • 泉州百度关键词排名广州网站营销优化qq
  • 怎么做wep网站营销推广活动方案
  • 展示型网站php官方app下载安装
  • 嘉祥网站建设广东省自然资源厅
  • 忘记网站后台密码网站排名软件推荐
  • 怎么查公司网站有没有被收录火爆产品的推广文案
  • 绵阳网站建设 经开区网络教学平台
  • wordpress阅读量没改7个湖北seo网站推广策略