当前位置: 首页 > news >正文

单位网站建设意见表格制作软件app

单位网站建设意见,表格制作软件app,网站建设做网站怎么做,营销策划方案模板范文抱歉#xff0c;没有太多的时间进行详细校对 目录 一、Kafka简介 1.消息队列 1.1为什么需要消息队列 1.2消息队列 1.3消息队列的分类 1.4P2P和发布订阅MQ的比较 1.5消息系统的使用场景 1.6常见的消息系统 2.Kafka简介 2.1简介 2.2设计目标 2.3 kafka核心的概念 二…抱歉没有太多的时间进行详细校对 目录 一、Kafka简介 1.消息队列 1.1为什么需要消息队列 1.2消息队列 1.3消息队列的分类 1.4P2P和发布订阅MQ的比较 1.5消息系统的使用场景 1.6常见的消息系统 2.Kafka简介 2.1简介 2.2设计目标 2.3 kafka核心的概念 二、 Kafka的分布式安装 1.docker安装 2.基于服务器安装 三.Kafka基本操作 1.kafka的topic的操作 2.Kafka的数据消费的总结 3.Kafka的编程的api 1.创建kafka的项目 2.kafka生产者的api操作 3.生产者模式幂等性案例 附1kafka producer 说明 附2Nagle算法Nagle算法_nagle算法于1984-CSDN博客 附3 Nagle算法Nagle算法_nagle算法是以他-CSDN博客 4.kafka消费者的api操作 5.offset消费问题 6.record进入分区的策略 7.自定义分区 参考资料 附1快速上手学习如何使用C实现kafka消费者客户端(快速上手学习如何使用C实现kafka消费者客户端_c 使用kafka-CSDN博客) 附2Docker实战之Kafka集群 1. 概述 2. Kafka 基本概念 3. Docker 环境搭建 4. Kafka 初认识 4.1 可视化管理 4.2 Zookeeper 在 kafka 环境中做了什么 5. 生产与消费 5.1 创建主题 5.2 生产消息 5.3 消费消息 5.4 消费详情 6. SpringBoot 集成 附3Kafka快速入门六——Kafka集群部署(Kafka快速入门六——Kafka集群部署_51CTO博客_kafka集群部署) 一、Kafka集群部署方案规划 1、操作系统选择 2、磁盘 3、磁盘容量 4、网络带宽 二、Kafka集群参数配置 1、Broker端参数 2、Topic级别参数 3、JVM参数 4、操作系统参数 三、Docker镜像选择 1、安装docker 2、docker-compose安装 3、docker镜像选择 四、Kafka单机部署方案 1、编写docker-compose.yml文件 2、启动服务 3、kafka服务查看 4、Kafka版本查询 5、kafka-manager监控 五、错误解决 1、容器删除失败 2、kafka服务一直重启 六、Kafka集群参数配置 附4【Kafka精进系列003】Docker环境下搭建Kafka集群() 一、Kafka集群搭建 1、首先运行Zookeeper本文并未搭建ZK集群 2、分别创建3个Kafka节点并注册到ZK上 3、在Broker 0节点上创建一个用于测试的topic 4、Kafka集群验证 二、使用Docker-Compose 搭建Kafka集群 1、什么是Docker-Compose? 2、如何使用Docker-Compose 3 、疑难杂症问题记录 附5使用docker容器创建Kafka集群管理、状态保存是通过zookeeper实现所以先要搭建zookeeper集群 附7如何使用Docker内的kafka服务(如何使用Docker内的kafka服务_docker kafka advertised.listeners-CSDN博客) 基本情况 版本信息 重点介绍 配置host 在docker上部署kafka 源码下载 开发生产消息的应用 开发消费消息的应用 验证消息的生产和消费 一、Kafka简介 1.消息队列 1.1为什么需要消息队列 面对比较大的流量冲击在网站系统中一般都会有一个消息存储/缓存系统网站就可以按照 自己服务负载的能力来消费这些消息--消息队列或者叫消息中间件。 消息队列应该具备的最基本的能力 1.存储能力所以是一个容器一般的实现都用队列 2.消息的入队或者生产 3.消息的出队或者消费 从消息的生产与消费的角度上而言消息队列就是一个典型的生产者消费者模型的实现框架。 1.2消息队列 消息Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说文本、音乐、视频等内容。 队列Queue 一种特殊的线性表(数据元素首尾相接)特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO); 入队、出队。 消息队列MQ 消息队列保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。 1.3消息队列的分类 MQ主要分为两类 点对点(p2p),发布订阅(Pub/Sub) Peer-to-Peer 一般基于Pull或者Polling接收数据 发送到队列中的消息被一个而且仅仅一个接收者所接收即使有多个接收者在同一个队列中侦听同一消息即支持异步“即发即收”的消息传递方式也支持同步请求/应答传送方式。 发布订阅 发布到同一个主题的消息可衩多个订阅者所接收 发布/订阅即可基于Push消费数据也可基于Pull或者Polling消费数据。 解耦能力比P2P模型更强。 1.4P2P和发布订阅MQ的比较 共同点 消息生产者生产消息发送到queue中然后消息消费者从queue中读取并且消费消息。 不同点 p2p模型包括消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费消息就不在消息队列中)。比如说打电话。 pub/sub包含消息队列(queue)、主题(topic)、发布者(publisher)、订阅者(subscriber) 每个消息可以有多个消费者彼此互不影响比如我发布一个微博关注的人都能够看到。 1.5消息系统的使用场景 解耦 名系统之间通过消息系统这个统一的接口交换数据无须了解彼此的存在 冗余 部分消息系统具有消息持久化能力可规避消息处理前丢失的风险 扩展 消息系统是统一的数据接口各系统可独立扩展 峰值可处理能力消息系统可顶住峰值流量业务系统可根据处理能力从消息系统中获取并处理对应量的请求 可恢复性 系统中部分功能失效并不会影响整 个系统它恢复会仍然可从消息系统中获取并处理数据 异步通信 在不需要立即处理请求的场景下可以将请求放入消息系统、合适的时候再处理。 1.6常见的消息系统 RabbitMQ Erlang编写 支持多协义AMQPXMPPSMTP STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式。 Redis基于key-Value对的NoSQL数据库同时支持MQ功能可做轻量级队列服务使用。就入队操作而言Redis对短消息(小于10kb)的性能比RabbitMQvb ,长消息性能比RabbitMQ差。 ZeroMQ轻量级不需要单独的消息服务器或中间件应用程序本身扮演该角色Peer-to-Peer它实质上是一个库需要开发人员自己组合多种技术使用复杂度高。 ActiveMQ JMS实现Peer-to-Peer, 支持持久化XA(分布式事务。 Kafka/Jafka高性能跨语言的分布式发布/订阅消息系统数据持久化全分布式同时支持在线和离线处理。 MetaQ/RocketMQ纯Java实现发布/订阅消息系统支持本地事务和XA分布式事务。 2.Kafka简介 2.1简介 Kafka是分布式的发布-订阅消息系统。它最初由Linkedin(领英)公司发布使用scala语言编写使用Scala语言编写与2010年12月份开源成为Apache顶级项目Kafka是一个高吞吐量的持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为的数据)。 三大特点 高吞吐量 可以满足每秒百成级别消息的生产和消费------生产消费 持久化 有一套完整的消息存储机制确保数据的高效安全的持久化---中间存储 分布式 基于分布式的扩展和容错机制Kafka的数据都会复制到几台服务器上。当某一台故障失效时生产者和消费者转而使用其它的机器---整体 健壮性 2.2设计目标 高吞吐率在廉价的商用机器上单机可支持每秒100万条消息的读写 消息持久化所有消息均被持久化到磁盘无消息丢失支持消息重放 完全分布式 Producer Broker, Consumer均技持水平扩展 同时适应在线流处理和离线批处理 2.3 kafka核心的概念 一个MQ需要哪此部分生产、清费、消息类型、存储等等。 对于kafka而言kafka服务就像一个大的水池。不断的生产存储消费着各种类别的消息。那么kafka由何给成呢 Kafka服务 Topic主题,Kafka处理的消息的不同分类。 Broker:消息服务器代理Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。 Partition:Topic物理上的分组一个Topic在Broker中被分为1个或者多个partition, 分区在创建topic的时候指定。 Message:消息是通信的基本单位每个消息都属于一个partition Kafka服务相关 Producer:消息和数据的生产者向Kafka的一处topic发布消息。 Consumer:消息和数据的消费者定于topic并处理其发布的消息。 Zookeeper协调kafka的正常运行。 二、 Kafka的分布式安装 1.docker安装 version: 3.1 ​ services: kafka1: image: wurstmeister/kafkarestart: alwayshostname: kafka1container_name: kafka1expose: - 9999ports: - 9092:9092- 9999:9999environment: KAFKA_ADVERTISED_HOST_NAME: kafka1KAFKA_ADVERTISED_PORT: 9092KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181KAFKA_JVM_PERFORMANCE_OPTS: -Xmx256m -Xms256mKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_LISTENERS: PLAINTEXT://:9092KAFKA_BROKER_ID: 1KAFKA_CREATE_TOPICS: topic001:2:1JMX_PORT: 9999volumes: - /opt/kafka/kafka1/logs:/kafkaexternal_links: - zk1- zk2- zk3 ​kafka2: image: wurstmeister/kafkarestart: alwayshostname: kafka2container_name: kafka2ports: - 9093:9093environment: KAFKA_ADVERTISED_HOST_NAME: kafka2KAFKA_ADVERTISED_PORT: 9093KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181KAFKA_JVM_PERFORMANCE_OPTS: -Xmx256m -Xms256mKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093KAFKA_LISTENERS: PLAINTEXT://:9093KAFKA_BROKER_ID: 2KAFKA_CREATE_TOPICS: topic001:2:1JMX_PORT: 9988volumes: - /opt/kafka/kafka2/logs:/kafkaexternal_links: - zk1- zk2- zk3 ​kafka3: image: wurstmeister/kafkarestart: alwayshostname: kafka3container_name: kafka3ports: - 9094:9094environment: KAFKA_ADVERTISED_HOST_NAME: kafka3KAFKA_ADVERTISED_PORT: 9094KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181KAFKA_JVM_PERFORMANCE_OPTS: -Xmx256m -Xms256mKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094KAFKA_LISTENERS: PLAINTEXT://:9094KAFKA_BROKER_ID: 3KAFKA_CREATE_TOPICS: topic001:2:1JMX_PORT: 9977volumes: - /opt/kafka/kafka3/logs:/kafkaexternal_links: - zk1- zk2- zk3kafka-manager:image: sheepkiller/kafka-manager:latestrestart: alwayscontainer_name: kafa-managerhostname: kafka-managerports:- 9000:9000links:            # 连接本compose文件创建的container- kafka1- kafka2- kafka3external_links:   # 连接本compose文件以外的container- zk1- zk2- zk3environment:ZK_HOSTS: zk1:2181,zk2:2181,zk3:2181KAFKA_BROKERS: kafka1:9092,kafka2:9093,kafka3:9094APPLICATION_SECRET: letmeinKM_ARGS: -Djava.net.preferIPv4Stacktrue 2.基于服务器安装 zookeeper集群安装 1.下载 wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz 2.解压 tar -zxvf apache-zookeeper-3.5.6.tar.gz -C /usr/local/ 3.进入zookeeper配置文件目录 cd /usr/local/apache-zookeeper-3.5.6/conf/ 4.更改文件zoo_sample.cfg为zoo.cfg cp zoo_sample.cfg zoo.cfg 5.修改配置文件 #用于计算基础的实际单位 tickTime2000 #初始化时间 initLimit10 #选举时间 syncLimit5 #数据存储路径 dataDir/usr/local/apache-zookeeper-3.5.6/data dataLogDir/usr/local/apache-zookeeper-3.5.6/logs clientPort2181 6.配置zookeeper集群 #集群配置 2888:选举端口 3888:投票端口 server.1hadoopmaster:2888:3888 server.2hadoopnode1:2888:3888 server.3hadoopnode2:2888:3888 7.创建数据目录和日志目录, 并添加对文件夹的读写权限 mkdir data chmod 755 data mkdir logs chmod 755 logs 8.在创建的data目录下创建myid文件添加这台机器集群的唯一标识 echo 1 myid 9.zookeeper启动 ./bin/zkServer start 10.编写集群启动脚本 ​ #!/bin/bash case $1 in start){ for i in hadoopmaster hadoopnode1 hadoopnode2 do echo ---------- zookeeper $i 启动 ------------ ssh $i /usr/local/zookeeper/bin/zkServer.sh start done };; stop){ for i in hadoopmaster hadoopnode1 hadoopnode2 do echo ---------- zookeeper $i 停止 ------------ ssh $i /usr/local/zookeeper/bin/zkServer.sh stop done };; status){ for i in hadoopmaster hadoopnode1 hadoopnode2 do echo ---------- zookeeper $i 状态 ------------ ssh $i /usr/local/zookeeper/bin/zkServer.sh status done };; *){ echo zookeeper script Input Args Error...echo $0 [start|stop|restart]... };; esac 11.AI回答: #!/bin/bash# 定义Zookeeper配置文件的路径 ZOOKEEPER_CONF/path/to/zookeeper/conf/zoo.cfg# 定义Zookeeper的启动命令 ZOOKEEPER_HOME/path/to/zookeeper# 定义Zookeeper服务实例的端口号 SERVER_PORTS(2181 2182 2183)# 定义Zookeeper服务器的myid对应于zoo.cfg中的server.x配置 SERVER_IDS(0 1 2)# 遍历服务端口和服务器ID数组 for index in ${!SERVER_PORTS[]}; doport${SERVER_PORTS[index]}server_id${SERVER_IDS[index]}# 设置环境变量用于zookeeper的myid文件export ZOO_LOG_DIR/path/to/zookeeper/logs-${port}export ZOO_DATA_DIR/path/to/zookeeper/data-${port}export ZOO_CLIENT_PORT${port}export ZOO_SERVER_ID${server_id}# 配置zoo.cfg文件中的服务器列表echo server.${server_id}localhost:2888:3888 ${ZOOKEEPER_CONF}# 启动Zookeeper实例echo Starting Zookeeper server on port ${port} with myid ${server_id}...${ZOOKEEPER_HOME}/bin/zkServer.sh start done 12.启动错误报如下 Error: JAVA_HOME is not set and java could not be found in PATH. 在bin目录下的zkEnv.sh文件开始加入下面语句 export JAVA_HOME/usr/local/jdk1.8.0_321/ 并将zkEnv.sh同步更新到其他节点 2.安装kafka 1.下载 wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.13-2.4.0.tgz 2.解压 tar -zxvf /root/softwares/kafka_2.13-2.4.0.tgz -C /usr/local/ 3.重命名 cd /usr/local/ mv kafka_2.13-2.4.0/ kafka 4.添加环境变量 vim /etc/profile.d/hadoop-etc.sh export KAFKA_HOME/opt/apps/kafka export PATH$PATH:$KAFKA_HOME/bin source /etc/profile.d/hadoop-etc.sh 4.配置 修改$KAFKA_HOME/config/server.properties broker.id0                     ##当前kafka实例的Id 必须为整数 一个集群中不可重复 log.dirs/usr/local/kafka/data/kafka-logs         ##生产到kafka中的数据存储的目录目录需要手动创建 zookeeper.connecthadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka ##kafka数据在zk中的存储目录 5.创建数据目录 mkdir -p /usr/local/kafka/data/kafka-logs 6.同步到其它机器 scp -r kafka/ hadoopnode1:$PWD scp -r kafka/ hadoopnode2:$PWD 7.修改配置文件中的broker.id broker.id1 ##hadoopnode1 broker.id3 ##hadoopnode2 8.启动kafka服务 /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 9.kafka服务测试 进入zookeeper客户端 ./zkCli.sh ls /kafka/brokers/ids 10.Kafka在zookeeper中的目录说明 /kafka/cluster/id {version:1,id:SGvHsIOcSJuakpX44ew2-w} ---代表的是一个kafka集群包含集群的版本和集群的id/controller {version:1,brokerid:1,timestamp:1717651279966}   -- controller是kafka中非常重要的一个角色意为控制器控制partition的leader选举topic的crud操作brokerid意为由其id对应的broker承担controller的角色/controller_epoch 2 代表的是controller的纪元换名话说是代表controller的更迭每当controller的brokerid更换一次controller_epoch就1/brokers/ids [0, 1, 3]   --- 存放当前kafka的broker实例列表/topics [hadoop, _consumer_offsets]   --- 当前kafka中的topic列表/seqid 系统的序列id/consumers -- 老版本用于存储kafka消费者的信息 主要保存对应的offset, 新版本中基本不用此时用户的消费信息保存在一个系统的topic中 _consumer_offsets/config   --- 存放配置信息 11集群启动脚本 #!/bin/bash ​ kafka_start() {for i in hadoopmaster hadoopnode1 hadoopnode2doecho --------启动 $i Kafka-------ssh $i source /etc/profile; nohup sh /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.propertiesdone } ​ kafka_stop() {for i in hadoopmaster hadoopnode1 hadoopnode2doecho --------关闭 $i Kafka-------ssh $i source /etc/profile;bash /usr/local/kafka/bin/kafka-server-stop.sh ​done } ​ case $1 in start)echo start kafka clusterkafka_start ;; ​ stop)echo stop kafka clusterkafka_stop ;; restart)echo restart kafka clusterkafka_stopkafka_start ;; *)echo Input Args Error...echo $0 [start|stop|restart]... ;; esac 12.安装客户端(kafka tool) Offset Explorer 三.Kafka基本操作 1.kafka的topic的操作 topic是kafka非常重要的核心概念是用来存储各种类型的数据的所以最基本的就需要学会如何在kafka中创建、修改主、删除的topic以及如何向topic生产消费数据。 关于topic的操作脚本 kafka-copics.sh 1.创建topic bin/kafka-topics.sh --create\ --topic hadoop \       ##指定要创建的topic的名称 --zookeeper hadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka \ ##指定kafka关联的zk地址 --partitions 3 \   ##指定该topic的分区个数 --replication-factor 3   ##指定副本因子 ./kafka-topics.sh --create --topic hadoop --zookeeper hadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka --partitions 3 --replication-factor 3   注意指定副本因子的时候 不能大于broker实例个数否则报错. 2 查看topic的列表 bin/kafka-topics.sh --list --zookeeper hadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka 3.查看每一个topic的信息 bin/kafka-topics.sh --describe --topic hadoop --zookeeper hadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka Partition: 当前topic对应的分区编号 Replicas: 副本因子当前kafka对应的partition所在的broker实例的broker.id的列表 Leader: 该partiotion的所有副本中的leader领导者处理所有kafka该partition读写请求 ISR: 该partition的存活的副本对应的broker实例的broker.id的列表 登录zookeeper客户端查看 4.修改一个topic ./kafka-topics.sh --alter --topic hadoop --partitions 4 --zookeeper hadoopmaster:2181/kafka 但是注意: partition个数只能增加不能减少 5.删除一个topic ./kafka-topics.sh --delete --topic hadoop1 --zookeeper hadoopmaster:2181,hadoopnode1:2181,hadoopnode2:2181/kafka 在老版本的时候是不能直接删除掉的topic,除非配置了delete.topic.enabletrue,可以直接删除掉但是不配置那么就不会直接删除先会做一个标记那么这个topic就不能用了而在新版本的时候不需要设置这个属性直接删除掉了。 6.生产数据 ./kafka-console-producer.sh --topic spark --broker-list hadoopmaster:9092,hadoopnode1:9092,hadoopnode2:9092 注意使用的端口不能使用2181 会报如下错误 ERROR Error when sending message to topic hadoop with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 7.消费数据 ./kafka-console-consumer.sh --topic spark --bootstrap-server hadoopmaster:9092,hadoopnode1:9092,hadoopnode2:9092 所有的消费数据 ./kafka-console-consumer.sh --topic spark --bootstrap-server hadoopmaster:9092,hadoopnode1:9092,hadoopnode2:9092 --from-beginning 指定分区内的消费数据 ./kafka-console-consumer.sh --topic spark --bootstrap-server hadoopmaster:9092,hadoopnode1:9092,hadoopnode2:9092 --partition 2 --offset earliest 2.Kafka的数据消费的总结 kafka消费者在消费数据的时候都是分组别的。不同组的消费不受影响相同组内的消费需要注意如partition有3个消费者有3个那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者时一个消费者消费其中一个partition数据另一个消费者消费2个partition的数据。如果有超过3个的消费者同一时间只能最多有3个消费者能消费得到数据。 结论: 在一个消费者组内消费数据,同一时间不会有两个消费者同时消费一个分区的数据 在两个消费者组内,同一时间可以有两个消费者同时消费一个分区的数据. ./kafka-console-consumer.sh --topic spark --bootstrap-server hadoopmaster:9092,hadoopnode1:9092,hadoopnode2:9092 --group group-2024 offset是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量.offset的数据类型是Long,8个字节长度.offset在分区内是有序的,分区间是不一定有序.如果想要kafka中的数据全局有充,就只能让partition个数为1. 在组内,kafka的topic的partition个数,代表了kafka的topic的并行度,同一时间最多可以有多个线程来消费topic的数据,所以如果要想提高kafka的topic的消费能力,应该增大partition的个数. 附:kafka消费者详解 3.Kafka的编程的api 1.创建kafka的项目 导入maven依赖 !-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --!-- 此依赖包含了kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.13/artifactIdversion2.4.0/version/dependency 2.kafka生产者的api操作 入口类: Producer 入门案例 /*** 生产者API操作* 首先需要生产者---就是程序的入口Producer* 数据被生产到哪里---- Topic* 什么样数据 ---- 数据类型*/ public class MyProducerTest {public static void main(String[] args) throws IOException {//加载配置文件Properties prop new Properties();prop.load(MyProducerTest.class.getClassLoader().getResourceAsStream(producer.properties));/*** 创建执行入口* 首先, kafka中的数据都是有三个部分组成key, Value, timestamp* Each record consists of a key, a value, and a timestamp.* K 就是记录中的Key的类型* V 就就是记录中的Value类型*/ ​KafkaProducerString, String producer new KafkaProducerString, String(prop); ​//设置要发送的topicString topicspark;ProducerRecordString, String record new ProducerRecord(topic, 11111);//通过send发送producer.send(record);//释放资源producer.close();} } 配置 将服务器config的producer.propert文件下载下来放入resources文件夹并改写如下内容 bootstrap.servershadoopmaster:9092,hadoopnode1:9092,hadnoopnode2:9092 key.serializerorg.apache.kafka.common.serialization.StringSerializer value.serializerorg.apache.kafka.common.serialization.StringSerializer 创建producer时需要指定的配置信息 bootstrap.servershadoopmaster:9092,hadoopnode1:9092,hadnoopnode2:9092   ##kafka的服务器 key.serializerorg.apache.kafka.common.serialization.StringSerializer     ##key的序列化器 value.serializerorg.apache.kafka.common.serialization.StringSerializer   ##value的序列化器 acks[0|-1|1|all] #消息确认机制0: 不做确认直管发送消息即可-1|all: 不仅leader需要将数据写入本地磁盘并确认还需要同步的等待其它followers进行确认1: 只需要leader进行消息确认即可后期follower可以从leader进行同步 batch.size1024   #每个分区的用户缓存未发送record记录的空间大小 ##如果缓存区中的数据没有占满也就是仍然有未用的空间那么也会将请求发送出去为了较少请求次数可以配置linger.ms大于0. linger.ms10 ##不管缓冲区是否被占满延迟10ms发送request buffer.memory10240   #控制的是一个producer中的所有缓存空间 retries0 #发送消息失败之后的重试次数 文档: https://kafka.apache.org/24/documentation.html#introduction 修改配置查看生产数据情况 配置文件 bootstrap.servershadoopmaster:9092,hadoopnode1:9092,hadnoopnode2:9092 ​ # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd compression.typenone ​ # name of the partitioner class for partitioning events; default partition spreads data randomly # 输入进入分区的方式 #partitioner.class ​ # the maximum amount of time the client will wait for the response of a request #请求超时时间 #request.timeout.ms ​ # how long KafkaProducer.send and KafkaProducer.partitionsFor will block for #使用send方法最大消息阻塞时间 #max.block.ms ​ # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together linger.ms5000 ​ # the maximum size of a request in bytes #最大的请求大小 #max.request.size ​ # the default batch size in bytes when batching multiple records sent to a partition batch.size16384 ​ # the total bytes of memory the producer can use to buffer records waiting to be sent to the server buffer.memory33554432 #Key和Value的序列化 key.serializerorg.apache.kafka.common.serialization.StringSerializer value.serializerorg.apache.kafka.common.serialization.StringSerializer 注意 producer.send(record); 是异步发送方式 代码: /*** 生产者API操作* 首先需要生产者---就是程序的入口Producer* 数据被生产到哪里---- Topic* 什么样数据 ---- 数据类型*/ public class MyProducerTest {public static void main(String[] args) throws IOException {//加载配置文件Properties prop new Properties();prop.load(MyProducerTest.class.getClassLoader().getResourceAsStream(producer.properties));/*** 创建执行入口* 首先, kafka中的数据都是有三个部分组成key, Value, timestamp* Each record consists of a key, a value, and a timestamp.* K 就是记录中的Key的类型* V 就就是记录中的Value类型*/ ​KafkaProducerString, String producer new KafkaProducerString, String(prop); ​//设置要发送的topicString topicspark;ProducerRecordString, String record new ProducerRecord(topic, 11111);//通过send发送/*send方法并不会立即将数据发送到Kafka集群而且先发送到缓冲区该方法便立即返回返回给调用者producer,该方法是一个异步方法当缓冲区满了或者时间到了就会将send的数据转换为request请求提交给kafka集群*/FutureRecordMetadata fututerproducer.send(record);RecordMetadata recordMetadata fututer.get();boolean hasOffset recordMetadata.hasOffset();long offset recordMetadata.offset();boolean hasTimestamp recordMetadata.hasTimestamp();int partition recordMetadata.partition();long timestamp recordMetadata.timestamp();String topicl recordMetadata.topic();//打印if(hasOffset) {System.out.println(offer: offset);}if(hasTimestamp){System.out.println(timestamp: timestamp);}System.out.println(Topic: topic);System.out.println(Partition: partition);//释放资源producer.close();} } 3.生产者模式幂等性案例 producer.properties配置 acksall enable.idempotencetrue 代码 /*** 幂等性例子* kafka生产都保证消息一致性(exactly one 恰好一次)语义, 靠开启幂等操作或者开启事务机制来完成* 下面的案例中在同一个producer中我们发现了两次重复的消息* 看到的现象并没有像幂等操作那样可以避免数据的重复。* 这是什么原因*     其实主要原因在于大数据框架设计问题上*     如果说有1000W条数据在其中不断的进行增加要保证添加的数据不能重复。*     方案一在添加的时候进行判断如果该条消息已经存在直接覆盖掉对应数据*     方案二在添加的时候先不进行判断直接进行添加在后续的操作过程中满足条件之后*     在进行数据的合并操作。*     我们选择方案一还是方案二*     在这里我们肯定选择方案二为什么原因在于添加一条数就要扫描一次判断存在不存在*     存在就覆盖这样会很严重的影响写入的性能如果我们进行判断后期在进行聚合去重对写入没有任何影响*/ public class ExactlyOneProducerTest {public static void main(String[] args) throws IOException {//加载配置文件Properties prop new Properties();prop.load(MyProducerTest.class.getClassLoader().getResourceAsStream(producer.properties));String topic spark;KafkaProducerString, String producer new KafkaProducerString, String(prop);//加载TopicProducerRecordString, String record null;int start 6;int end start 10;for(int i start; i end; i)    //每次发送十条记录{record new ProducerRecord(topic,i, i);producer.send(record);} ​for(int i start; i end; i)   //每次发送十条记录{record new ProducerRecord(topic,i, i);producer.send(record);}//释放资源producer.close();} } 附1kafka producer 说明 10 A Kafka client that publishes records to the Kafka cluster. The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs. Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(acks, all);props.put(retries, 0);props.put(batch.size, 16384);props.put(linger.ms, 1);props.put(buffer.memory, 33554432);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); ​ProducerString, String producer new KafkaProducer(props);for (int i 0; i 100; i)producer.send(new ProducerRecordString, String(my-topic, Integer.toString(i), Integer.toString(i))); ​producer.close(); The producer consists of a pool of buffer space that holds records that havent yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources. The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. kafka的生产者producer由持有未提交给kafka服务的记录的缓冲区构成一个缓冲区池于此同时会有一个后台的I/O线程来负责。 将这此记录record转化为请求request然后将其提交给集群。如果发送完数据之后不对producer进行资源释放(close)会导致资源被挤占。 producer的send方法是异步的当调用send方法提交一条记录到缓冲区之后立即被返回。这就能够允许生产者能够进行高效组织以批处理来发送数据 ----- producer.send() ---- 数据 -----producer的缓冲区----- 缓冲满了或者发送时间到-----发送数据到kafka集群 send方法立即返回添加下一条记录。 The acks config controls the criteria under which requests are considered complete. The all setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. ack指的就是消息确认机制 kafka中用于和客户端进行交互的只有一个-- partition 中的 leader 客户端发送一条记录到集群中肯定是对应partition来进行接收并录入到磁盘 acks消息确认机制就代表了一条消息从客户端到服务从leader到各个follow来接收确认的一个机制。 acks1 ---- 发送的消息只需要leader确认收到即可后期follower来从leader进行同步 acks0 ----- 发送的消息不需要经过服务端的确认一旦发出立即返回继续下一条的发送 acks -1|all ---- 发送的消息不仅需要leader确认还需要所有的follower都确认成功之后才能发送下一条记录 显然acks1 或者 -1是同步的消息发送acks0 是异步的消息发送 If the request fails, the producer can automatically retry, though since we have specified retries as 0 it wont. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details). 如果请求失败生产者会自动进行重试如果指定的retries参数不是0. The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition). 生产者producer会为每一个partition管理一个包含还没有发送的数据的缓冲区buffer的大小 通过参数batch.size指定调大这个参数会导致缓存更多的记录但是也现样意味着需要更多的内存。 By default a buffer is available to send immediately even if there is additional unused space in the buffer. 默认情况下该缓冲区会立即发送数据, 即使该缓冲区还有没被使用的空间。 However if you want to reduce the number of requests you can set linger.ms to something greater than 0. 因此如果我们想减少发送请求到集群的次数可以通过设置参数 linger.ms 大于0 This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. 配置了等待参数就会导致生产者在发送之前先等待 linger.ms 毫秒以期望在缓冲区中可以缓冲更多的数据最后一个批次全送出减少了请求的次数。 This is analogous to Nagles algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didnt fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency. Nagle算法是以他的发明人John Nagle的名字命名的它用于自动连接许多的小缓冲器消息这一过程称为nagling通过减少必须发送包的个数来增加网络软件系统的效率。 Nagle算法于1984年定义为福特航空和通信公司IP/TCP拥塞控制方法这使福特经营的最早的专用 TCP/IP 网络减少拥塞控制从那以后这一方法得到了广泛应用。Nagle的文档里定义了处理他所谓的小包问题的方法这种问题指的是应用程序一次产生一字节数据这样会导致 网络由于太多的包而过载一个常见的情况是发送端的 糊涂窗口综合症(Silly Windw Syndrome)。从键盘输入的一个字符占用一个字节可能在传输上造成41字节的包其中包括1字节的有用信息和40字节的标题数据。这种情况转变成了4000%的消耗这样的情况对于轻负载的 网络来说还是可以接受的但是重负载的福特网络就受不了了它没有必要在经过节点和网关的时候重发导致包丢失和妨碍传输速度。 吞吐量可能会妨碍甚至在一定程度上会导致连接失败。Nagle的算法通常会在TCP程序里添加两行代码在未确认数据发送的时候让发送器把数据送到 缓存里。任何数据随后继续直到得到明显的数据确认或者直到攒到了一定数量的数据了再发包。尽管Nagle的算法解决的问题只是局限于福特 网络然而同样的问题也可能出现在ARPANet。这种方法在包括因特网在内的整个 网络里得到了推广成为了默认的执行方式尽管在高互动环境下有些时候是不必要的例如在客户/服务器情形下。在这种情况下nagling可以通过使用TCP_NODELAY 套接字选项关闭。 The buffer.memory controls the total amount of memory available to the producer for buffering. buffer.memory控制着一个producer中所有可用缓冲内存 If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. 如果记录发送的速度快于提交给集群的速度那么内存缓冲区将会很快被消耗光。 When the buffer space is exhausted additional send calls will block. 如果缓冲区被 消耗光那么接下来的发送请求交款将会被block阻塞。 The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.、 如果阻塞时间超过了max.bllock.ms参数的值将会抛出一个TimeoutException异常。 The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types. 网络传输序列需要进行序列化 From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafkas delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically. 幂等操作------多次操作的结果和操作一次的结果是一样的 事务操作-------发送数据在一个事务中如果有异常将会回滚 To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature. 为了运行幂等操作需要开启参数enable.idempotencetrue. 一旦被设置参数retries将会默认设置为Integer.MAX_VALUE,以及参数acks被设置为all。这种幂等api上面没有任何的变化也就是说在前面的代码中如果开启幂等操作整个生产的程序就是幂等的。 To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. 为了获取幂等的优势能够避免在应用程序的级别去避免数据的重复发送因为这种情况下数据不会有重复。 As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. 因此如果一个application允许了幂等操作推荐设置retries 参数为0 或者不设置因为此时默认的 retries Integer.MAX_VALUE。 Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session. producer在幂等情况下能够在单一会话中保证消息的唯一性。 To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well. The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application. All the new transactional APIs are blocking and will throw exceptions on failure. The example below illustrates how the new APIs are meant to be used. It is similar to the example above, except that all 100 messages are part of a single transaction. Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(transactional.id, my-transactional-id);ProducerString, String producer new KafkaProducer(props, new StringSerializer(), new StringSerializer()); ​producer.initTransactions(); ​try {producer.beginTransaction();for (int i 0; i 100; i)producer.send(new ProducerRecord(my-topic, Integer.toString(i), Integer.toString(i)));producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We cant recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close(); As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction. The transactional producer uses exceptions to communicate error states. In particular, it is not required to specify callbacks for producer.send() or to call .get() on the returned Future: a KafkaException would be thrown if any of the producer.send() or transactional calls hit an irrecoverable error during a transaction. See the send(ProducerRecord) documentation for more details about detecting errors from a transactional send. By calling producer.abortTransaction() upon receiving a KafkaException we can ensure that any successful writes are marked as aborted, hence keeping the transactional guarantees. This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain client features. For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an UnsupportedVersionException when invoking an API that is not available in the running broker version. 附2Nagle算法Nagle算法_nagle算法于1984-CSDN博客 简介 Nagle算法是以他的发明人John Nagle的名字命名的它用于自动连接许多的小 缓冲器消息这一过程称为nagling通过减少必须发送包的个数来增加 网络软件系统的效率。Nagle算法于1984年定义为福特航空和通信公司IP/TCP拥塞控制方法这使福特经营的最早的专用 TCP/IP 网络减少拥塞控制从那以后这一方法得到了广泛应用。Nagle的文档里定义了处理他所谓的小包问题的方法这种问题指的是应用程序一次产生一字节数据这样会导致 网络由于太多的包而过载一个常见的情况是发送端的 糊涂窗口综合症(Silly Windw Syndrome)。从键盘输入的一个字符占用一个字节可能在传输上造成41字节的包其中包括1字节的有用信息和40字节的标题数据。这种情况转变成了4000%的消耗这样的情况对于轻负载的 网络来说还是可以接受的但是重负载的福特网络就受不了了它没有必要在经过节点和网关的时候重发导致包丢失和妨碍传输速度。 吞吐量可能会妨碍甚至在一定程度上会导致连接失败。Nagle的算法通常会在TCP程序里添加两行代码在未确认数据发送的时候让发送器把数据送到 缓存里。任何数据随后继续直到得到明显的数据确认或者直到攒到了一定数量的数据了再发包。尽管Nagle的算法解决的问题只是局限于福特 网络然而同样的问题也可能出现在ARPANet。这种方法在包括因特网在内的整个 网络里得到了推广成为了默认的执行方式尽管在高互动环境下有些时候是不必要的例如在客户/服务器情形下。在这种情况下nagling可以通过使用TCP_NODELAY 套接字选项关闭。 MSDN: A TCP/IP optimization called the Nagle Algorithm can also limit data transfer speed on a connection. The Nagle Algorithm is designed to reduce protocol overhead for applications that send small amounts of data, such as Telnet, which sends a single character at a time. Rather than immediately send a packet with lots of header and little data, the stack waits for more data from the application, or an acknowledgment, before proceeding. 2算法 ** **     TCP/IP协议中无论发送多少数据总是要在数据前面加上协议头同时对方接收到数据也需要发送ACK表示确认。为了尽可能的利用网络带宽TCP总是希望尽可能的发送足够大的数据。一个连接会设置MSS参数因此TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据。Nagle算法就是为了尽可能发送大块数据避免网络中充斥着许多小数据块。   Nagle算法的基本定义是 任意时刻最多只能有一个未被确认的小段。 所谓“小段”指的是小于MSS尺寸的数据块所谓“未被确认”是指一个数据块发送出去后没有收到对方发送的ACK确认该数据已收到。   Nagle算法的规则可参考tcp_output.c文件里tcp_nagle_check函数注释 1如果包长度达到MSS则允许发送 2如果该包含有FIN则允许发送 3设置了TCP_NODELAY选项则允许发送 4未设置TCP_CORK选项时若所有发出去的小数据包包长度小于MSS均被确认则允许发送 5上述条件都未满足但发生了超时一般为200ms则立即发送。 Nagle算法只允许一个未被ACK的包存在于网络它并不管包的大小因此它事实上就是一个扩展的停-等协议只不过它是基于包停-等的而不是基于字节停-等的。Nagle算法完全由TCP协议的ACK机制决定这会带来一些问题比如如果对端ACK回复很快的话Nagle事实上不会拼接太多的数据包虽然避免了网络拥塞网络总体的利用率依然很低。 Nagle算法是silly window syndrome(SWS)预防算法的一个半集。SWS算法预防发送少量的数据Nagle算法是其在发送方的实现而接收方要做的时不要通告缓冲空间的很小增长不通知小窗口除非缓冲区空间有显著的增长。这里显著的增长定义为完全大小的段MSS或增长到大于最大窗口的一半。   注意BSD的实现是允许在空闲链接上发送大的写操作剩下的最后的小段也就是说当超过1个MSS数据发送时内核先依次发送完n个MSS的数据包然后再发送尾部的小数据包其间不再延时等待。假设网络不阻塞且接收窗口足够大 举个例子比如之前的blog中的实验一开始client端调用socket的write操作将一个int型数据称为A块写入到网络中由于此时连接是空闲的也就是说还没有未被确认的小段因此这个int型数据会被马上发送到server端接着client端又调用write操作写入‘\r\n’简称B块这个时候A块的ACK没有返回所以可以认为已经存在了一个未被确认的小段所以B块没有立即被发送一直等待A块的ACK收到大概40ms之后B块才被发送。整个过程如图所示 这里还隐藏了一个问题就是A块数据的ACK为什么40ms之后才收到这是因为TCP/IP中不仅仅有nagle算法还有一个 TCP确认延迟机制 。当Server端收到数据之后它并不会马上向client端发送ACK而是会将ACK的发送延迟一段时间假设为t它希望在t时间内server端会向client端发送应答数据这样ACK就能够和应答数据一起发送就像是应答数据捎带着ACK过去。在我之前的时间中t大概就是40ms。这就解释了为什么\r\nB块总是在A块之后40ms才发出。   当然TCP确认延迟40ms并不是一直不变的TCP连接的延迟确认时间一般初始化为最小值40ms随后根据连接的重传超时时间RTO、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置TCP_QUICKACK选项来取消确认延迟。   ** TCP_NODELAY 选项 **   默认情况下发送数据采用Nagle 算法。这样虽然提高了网络吞吐量但是实时性却降低了在一些交互性很强的应用程序来说是不允许的使用TCP_NODELAY选项可以禁止Nagle 算法。 此时应用程序向内核递交的每个数据包都会立即发送出去。需要注意的是虽然禁止了Nagle 算法但网络的传输仍然受到TCP确认延迟机制的影响。    3. TCP_CORK 选项    所谓的CORK就是塞子的意思形象地理解就是用CORK将连接塞住使得数据先不发出去等到拔去塞子后再发出去。设置该选项后内核会尽力把小数据包拼接成一个大的数据包一个MTU再发送出去当然若一定时间后一般为200ms该值尚待确认内核仍然没有组合成一个MTU时也必须发送现有的数据不可能让数据一直等待吧。   然而TCP_CORK的实现可能并不像你想象的那么完美CORK并不会将连接完全塞住。内核其实并不知道应用层到底什么时候会发送第二批数据用于和第一批数据拼接以达到MTU的大小因此内核会给出一个时间限制在该时间内没有拼接成一个大包努力接近MTU的话内核就会无条件发送。也就是说若应用层程序发送小包数据的间隔不够短时TCP_CORK就没有一点作用反而失去了数据的实时性每个小包数据都会延时一定时间再发送。    4. Nagle算法与CORK算法区别      Nagle算法和CORK算法非常类似但是它们的着眼点不一样Nagle算法主要避免网络因为太多的小包协议头的比例非常之大而拥塞而CORK算法则是为了提高网络的利用率使得总体上协议头占用的比例尽可能的小。如此看来这二者在避免发送小包上是一致的在用户控制的层面上Nagle算法完全不受用户socket的控制你只能简单的设置TCP_NODELAY而禁用它CORK算法同样也是通过设置或者清除TCP_CORK使能或者禁用之然而Nagle算法关心的是网络拥塞问题只要所有的ACK回来则发包而CORK算法却可以关心内容在前后数据包发送间隔很短的前提下很重要否则内核会帮你将分散的包发出即使你是分散发送多个小数据包你也可以通过使能CORK算法将这些内容拼接在一个包内如果此时用Nagle算法的话则可能做不到这一点。 附3 Nagle算法Nagle算法_nagle算法是以他-CSDN博客 Nagle算法是以他的发明人John Nagle的名字命名的它用于自动连接许多的小缓冲器消息这一过程称为nagling通过减少必须发送包的个数来增加网络软件系统的效率。 TCP/IP协议中无论发送多少数据总是要在数据前面加上协议头同时对方接收到数据也需要发送ACK表示确认。为了尽可能的利用网络带宽TCP总是希望尽可能的发送足够大的数据。一个连接会设置MSS参数因此TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据。Nagle算法就是为了尽可能发送大块数据避免网络中充斥着许多小数据块。   Nagle算法的基本定义是任意时刻最多只能有一个未被确认的小段。 所谓“小段”指的是小于MSS尺寸的数据块所谓“未被确认”是指一个数据块发送出去后没有收到对方发送的ACK确认该数据已收到。   Nagle算法的规则可参考tcp_output.c文件里tcp_nagle_check函数注释 如果包长度达到MSS则允许发送 如果该包含有FIN则允许发送 设置了TCP_NODELAY选项则允许发送 未设置TCP_CORK选项时若所有发出去的小数据包包长度小于MSS均被确认则允许发送 上述条件都未满足但发生了超时一般为200ms则立即发送。 伪代码 if there is new data to send #有数据要发送 # 发送窗口缓冲区和队列数据 mss队列数据available data为原有的队列数据加上新到来的数据 # 也就是说缓冲区数据超过mss大小nagle算法尽可能发送足够大的数据包 ​ if the window size MSS and available data is MSS ​ send complete MSS segment now # 立即发送 ​ else ​ if there is unconfirmed data still in the pipe # 前一次发送的包没有收到ack # 将该包数据放入队列中直到收到一个ack再发送缓冲区数据 ​ enqueue data in the buffer until an acknowledge is received ​ else ​ send data immediately # 立即发送 ​ end if ​ end if ​ end if    Nagle算法只允许一个未被ACK的包存在于网络它并不管包的大小因此它事实上就是一个扩展的停-等协议只不过它是基于包停-等的而不是基于字节停-等的。Nagle算法完全由TCP协议的ACK机制决定这会带来一些问题比如如果对端ACK回复很快的话Nagle事实上不会拼接太多的数据包虽然避免了网络拥塞网络总体的利用率依然很低。 Nagle算法是silly window syndrome(SWS)预防算法的一个半集。SWS算法预防发送少量的数据Nagle算法是其在发送方的实现而接收方要做的是不要通告缓冲空间的很小增长不通知小窗口除非缓冲区空间有显著的增长。这里显著的增长定义为完全大小的段MSS或增长到大于最大窗口的一半。   注意BSD的实现是允许在空闲链接上发送大的写操作剩下的最后的小段也就是说当超过1个MSS数据发送时内核先依次发送完n个MSS的数据包然后再发送尾部的小数据包其间不再延时等待。假设网络不阻塞且接收窗口足够大 举个例子client端调用socket的write操作将一个int型数据称为A块写入到网络中由于此时连接是空闲的也就是说还没有未被确认的小段因此这个int型数据会被马上发送到server端接着client端又调用write操作写入‘\r\n’简称B块这个时候A块的ACK没有返回所以可以认为已经存在了一个未被确认的小段所以B块没有立即被发送一直等待A块的ACK收到大概40ms之后B块才被发送。 这里还隐藏了一个问题就是A块数据的ACK为什么40ms之后才收到这是因为TCP/IP中不仅仅有nagle算法还有一个TCP确认延迟机制 。当Server端收到数据之后它并不会马上向client端发送ACK而是会将ACK的发送延迟一段时间假设为t它希望在t时间内server端会向client端发送应答数据这样ACK就能够和应答数据一起发送就像是应答数据捎带着ACK过去。在我之前的时间中t大概就是40ms。这就解释了为什么’\r\n’B块总是在A块之后40ms才发出。   当然TCP确认延迟40ms并不是一直不变的TCP连接的延迟确认时间一般初始化为最小值40ms随后根据连接的重传超时时间RTO、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置TCP_QUICKACK选项来取消确认延迟。 TCP_NODELAY 选项      默认情况下发送数据采用Nagle 算法。这样虽然提高了网络吞吐量但是实时性却降低了在一些交互性很强的应用程序来说是不允许的使用TCP_NODELAY选项可以禁止Nagle 算法。 此时应用程序向内核递交的每个数据包都会立即发送出去。需要注意的是虽然禁止了Nagle 算法但网络的传输仍然受到TCP确认延迟机制的影响。 TCP_CORK 选项      所谓的CORK就是塞子的意思形象地理解就是用CORK将连接塞住使得数据先不发出去等到拔去塞子后再发出去。设置该选项后内核会尽力把小数据包拼接成一个大的数据包一个MTU再发送出去当然若一定时间后一般为200ms该值尚待确认内核仍然没有组合成一个MTU时也必须发送现有的数据不可能让数据一直等待吧。   然而TCP_CORK的实现可能并不像你想象的那么完美CORK并不会将连接完全塞住。内核其实并不知道应用层到底什么时候会发送第二批数据用于和第一批数据拼接以达到MTU的大小因此内核会给出一个时间限制在该时间内没有拼接成一个大包努力接近MTU的话内核就会无条件发送。也就是说若应用层程序发送小包数据的间隔不够短时TCP_CORK就没有一点作用反而失去了数据的实时性每个小包数据都会延时一定时间再发送。   Nagle算法与CORK算法区别      Nagle算法和CORK算法非常类似但是它们的着眼点不一样Nagle算法主要避免网络因为太多的小包协议头的比例非常之大而拥塞而CORK算法则是为了提高网络的利用率使得总体上协议头占用的比例尽可能的小。如此看来这二者在避免发送小包上是一致的在用户控制的层面上Nagle算法完全不受用户socket的控制你只能简单的设置TCP_NODELAY而禁用它CORK算法同样也是通过设置或者清除TCP_CORK使能或者禁用之然而Nagle算法关心的是网络拥塞问题只要所有的ACK回来则发包而CORK算法却可以关心内容在前后数据包发送间隔很短的前提下很重要否则内核会帮你将分散的包发出即使你是分散发送多个小数据包你也可以通过使能CORK算法将这些内容拼接在一个包内如果此时用Nagle算法的话则可能做不到这一点。 如何保证kafka的数据一致性 kafka生产都可以选择生产的两种模式(幂等和事务) 幂等多次操作的结果和操作一次的结果一样的 事务发送数据在一个事务中如果有异常将会回滚 4.kafka消费者的api操作 入口类 ConSumer 配置 将服务器config的consumer.properties文件下载下来放入resources文件夹并改写如下内容 bootstrap.servershadoopmaster:9092,hadoopnode1:9092,hadnoopnode2:9092 ​ # consumer group id group.idkafka_bigdata ​ # What to do when there is no initial offset in Kafka or if the current # offset does not exist any more on the server: latest, earliest, none # latest(默认)从最大的偏移量开始消费 # earliest从最小的偏移量开始消费 #auto.offset.reset key.deserializerorg.apache.kafka.common.serialization.StringDeserializer value.deserializerorg.apache.kafka.common.serialization.StringDeserializer 代码 /*** kafka 消费者 API*/ public class MyConsumerTest {public static void main(String[] args) throws IOException {Properties prop new Properties();prop.load(MyConsumerTest.class.getClassLoader().getResourceAsStream(consumer.properties));//构建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(prop);//消费对应的Topic数据consumer.subscribe(Arrays.asList(spark));//打印所有相关的数据信息System.out.println(topic\tpartition\toffset\tkey\tvalue);while(true){/*** 消费数据 timeout: 从consumer的缓冲区Buffer中获取可用数据的等待超时时间* 如果设置为0, 则会理解返回该缓冲区内的所有数据如果不设置为零返回空并且不能写负数*/ConsumerRecordsString, String consumerRecords consumer.poll(1000);for(ConsumerRecordString, String cr : consumerRecords){String topic cr.topic();int partition cr.partition();String key cr.key();String value cr.value();long offset cr.offset();System.out.printf(topic:%s\tpartition:%d\toffset:%d\tkey:%s\tvalue:%s\r\n, topic, partition, offset, key, value);}}} } 注意释放资源 consumer.close(); 指定offset位置消费 /*** 指定Offset位置进行消费数据* 之前消费的方式是全量消费* 与在从指定offset位置消费* partition 0 --- offset 10* partition 1 --- offset 10* partition 2 --- offset 10* 注意这里从指定的Offset位置开始消费那么我们需要使用assign API来完成* 说白了 就是指定具体的所有信息即可**/ public class MyConsumerSeekOffsetTest {public static void main(String[] args) throws IOException {Properties prop new Properties();prop.load(MyConsumerSeekOffsetTest.class.getClassLoader().getResourceAsStream(consumer.properties));//构建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(prop);//消费对应的Topic数据consumer.assign(Arrays.asList(new TopicPartition(spark, 0),new TopicPartition(spark, 1),new TopicPartition(spark, 2)));//指定消费的偏移量位置consumer.seek(new TopicPartition(spark, 0), 10);     //定位consumer.seek(new TopicPartition(spark, 1), 10);     //定位consumer.seek(new TopicPartition(spark, 2), 10);     //定位 ​//打印所有相关的数据信息System.out.println(topic\tpartition\toffset\tkey\tvalue);while(true){/*** 消费数据 timeout: 从consumer的缓冲区Buffer中获取可用数据的等待超时时间* 如果设置为0, 则会理解返回该缓冲区内的所有数据如果不设置为零返回空并且不能写负数*/ConsumerRecordsString, String consumerRecords consumer.poll(1000);for(ConsumerRecordString, String cr : consumerRecords){String topic cr.topic();int partition cr.partition();String key cr.key();String value cr.value();long offset cr.offset();System.out.printf(topic:%s\tpartition:%d\toffset:%d\tkey:%s\tvalue:%s\r\n, topic, partition, offset, key, value);}}} } 5.offset消费问题 auto.offset.resetearliest|latest 生产者上生产了6条消费记录偏移量就是0-6,此时消费者需要消费时需要指定一个参数auto.offset.resetearliest|latest, 如果我们管理了用户的数据偏移量就能定位到上一次用户的数据位置下一次在读取数据的时候就可以知道从什么位置开始消费了。比如第一次读取到offset2这个位置由于记录了offset下一次就应该从offset1开始消费数据。 假如我们没有去管理offset, 那么有一天我们的程序崩溃了offset也没了下一次启动程序的话之前的数据如果没有消费到但是数据又更新了此时我们只能再次从头开始消费offset这样导致任务执行效率低同时消费数据会重复所以我们需要管理一下offset让它达到上述的要求这样我们的数据消费会精准并且不会造成重复消费。 offset自动管理 配置参数 consumer.properties # 配置自动提交Offset enable.auto.committrue # 每隔多少时间提交一次 auto.commit.interval.ms10000 可以保证数据不丢也就是当生产者生产的时候如果消费者挂掉了也不用担心只需管理下Offset然后下次启动时继续之前的消费位置开始消费。 offset手动管理 这里简单介绍如何管理即可用户手动管理Offset的话需要提取Kafka的Offset,每次消费数据的时候拿到Offset,然后将这个Offset存入一个系统或数据库都行就将它保存起来当我们数据处理完成后再向系统或者数据库提交此Offset这样offset在我们自己的系统或者数据库中我们就不用担心周期提交失败问题。 /*** kafka自动提交偏移量操作* 这种自动提交offset是会周期性的进行offset提交如果说该周期设置的时间比较大就有可能* 造成数据的读取重复所以我们如果使用这种提交方式的话那么应该尽可能把时间设置短一点。** 这种方式有一点问题因为自动管理是周期性提交方式那么如果在这个周期提交的时候* 正好此时这个周期的offset没有提交上去那么造成生产数据丢失这种问题该如何解决呢。* 变需要手动管理offset* * kafka手动维护Offset* 注释 手动管理Offset会在SparkStreaming中重点讲解这里简单介绍如何管理即可* 用户手动管理Offset的话需要提取Kafka的Offset,每次消费数据的时候拿到Offset,然后将这个Offset存入一个系统或数据库都行* 就将它保存起来当我们数据处理完成后再向系统或者数据库提交此Offset这样offset在我们自己的系统或者数据库中我们就不用担心周期提交失败问题*/ public class AutoOffsetCommitConsumerOps {public static void main(String[] args) throws IOException {Properties prop new Properties();prop.load(MyConsumerTest.class.getClassLoader().getResourceAsStream(consumer.properties));//构建消费者KafkaConsumerString, String consumer new KafkaConsumerString, String(prop);//消费对应的Topic数据consumer.subscribe(Arrays.asList(spark));//打印所有相关的数据信息System.out.println(topic\tpartition\toffset\tkey\tvalue);while(true){/*** 消费数据 timeout: 从consumer的缓冲区Buffer中获取可用数据的等待超时时间* 如果设置为0, 则会理解返回该缓冲区内的所有数据如果不设置为零返回空并且不能写负数*/ConsumerRecordsString, String consumerRecords consumer.poll(1000);for(ConsumerRecordString, String cr : consumerRecords){String topic cr.topic();int partition cr.partition();String key cr.key();String value cr.value();long offset cr.offset();System.out.printf(topic:%s\tpartition:%d\toffset:%d\tkey:%s\tvalue:%s\r\n, topic, partition, offset, key, value);}}} } 6.record进入分区的策略 每一条producerRecord有, topic名称、可选的partition分区编号以及一对可选的key和value组成。三种策略进入分区。 If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion. 如果指定的partition, 那么直接进入该partition 如果没有指定partition, 但是指定了key, 使用key的hash选择partition 如果既没有指定partition,也没有指定key, 使用轮询的方式进入partition 附1细说 Kafka Partition 分区细说 Kafka Partition 分区-CSDN博客 Partition分区是 Kafka 的核心角色对于 Kafka 的存储结构、消息的生产消费方式都至关重要。 掌握好 Partition 就可以更快的理解 Kafka。本文会讲解 Partition 的概念、结构以及行为方式。 一、Events, Streams, Topics ​ 在深入 Partition 之前我们先看几个更高层次的概念以及它们与 Partition 的联系。 Event事件代表过去发生的一个事实。简单理解就是一条消息、一条记录。 Event 是不可变的但是很活跃经常从一个地方流向另一个地方。 Stream 事件流表示运动中的相关事件。 当一个事件流进入 Kafka 之后它就成为了一个 Topic 主题。 所以Topic 就是具体的事件流也可以理解为一个 Topic 就是一个静止的 Stream。 Topic 把相关的 Event 组织在一起并且保存。一个 Topic 就像数据库中的一张表。 二、Partition 分区 Kafka 中 Topic 被分成多个 Partition 分区。 Topic 是一个逻辑概念Partition 是最小的存储单元掌握着一个 Topic 的部分数据。 每个 Partition 都是一个单独的 log 文件每条记录都以追加的形式写入。 Record记录 和 Message消息是一个概念。 三、Offsets偏移量和消息的顺序 Partition 中的每条记录都会被分配一个唯一的序号称为 Offset偏移量。 Offset 是一个递增的、不可变的数字由 Kafka 自动维护。 ​ 当一条记录写入 Partition 的时候它就被追加到 log 文件的末尾并被分配一个序号作为 Offset。 ​ 如上图这个 Topic 有 3 个 Partition 分区向 Topic 发送消息的时候实际上是被写入某一个 Partition并赋予 Offset。 ​ 消息的顺序性需要注意一个 Topic 如果有多个 Partition 的话那么从 Topic 这个层面来看消息是无序的。 ​ 但单独看 Partition 的话Partition 内部消息是有序的。 ​ 所以一个 Partition 内部消息有序一个 Topic 跨 Partition 是无序的。 ​ 如果强制要求 Topic 整体有序就只能让 Topic 只有一个 Partition。 四、Partition 为 Kafka 提供了扩展能力 一个 Kafka 集群由多个 Broker就是 Server 构成每个 Broker 中含有集群的部分数据。 Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。 这样会有多种好处 如果把 Topic 的所有 Partition 都放在一个 Broker 上那么这个 Topic 的可扩展性就大大降低了会受限于这个 Broker 的 IO 能力。把 Partition 分散开之 后Topic 就可以水平扩展 。 一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker那么支持的 Consumer 数量就有限而分散之后可以支持 更多的 Consumer。 一个 Consumer 可以有多个实例Partition 分布在多个 Broker 的话Consumer 的多个实例就可以连接不同的 Broker大大提升了消息处理能力。可以 让一个 Consumer 实例负责一个 Partition这样消息处理既清晰又高效。 五、Partition 为 Kafka 提供了数据冗余 Kafka 为一个 Partition 生成多个副本并且把它们分散在不同的 Broker。 如果一个 Broker 故障了Consumer 可以在其他 Broker 上找到 Partition 的副本继续获取消息。 六、写入 Partition 一个 Topic 有多个 Partition那么向一个 Topic 中发送消息的时候具体是写入哪个 Partition 呢有3种写入方式。 使用 Partition Key 写入特定 Partition Producer 发送消息的时候可以指定一个 Partition Key这样就可以写入特定 Partition 了。 Partition Key 可以使用任意值例如设备ID、User ID。 Partition Key 会传递给一个 Hash 函数由计算结果决定写入哪个 Partition。 所以有相同 Partition Key 的消息会被放到相同的 Partition。 例如使用 User ID 作为 Partition Key那么此 ID 的消息就都在同一个 Partition这样可以保证此类消息的有序性。 这种方式需要注意 Partition 热点问题。 例如使用 User ID 作为 Partition Key如果某一个 User 产生的消息特别多是一个头部活跃用户那么此用户的消息都进入同一个 Partition 就会产生热点 问题导致某个 Partition 极其繁忙。 由 kafka 决定 如果没有使用 Partition KeyKafka 就会使用轮询的方式来决定写入哪个 Partition。 这样消息会均衡的写入各个 Partition。 但这样无法确保消息的有序性。 自定义规则 Kafka 支持自定义规则一个 Producer 可以使用自己的分区指定规则。 七、读取 Partition Kafka 不像普通消息队列具有发布/订阅功能Kafka 不会向 Consumer 推送消息。 Consumer 必须自己从 Topic 的 Partition 拉取消息。 一个 Consumer 连接到一个 Broker 的 Partition从中依次读取消息。 消息的 Offset 就是 Consumer 的游标根据 Offset 来记录消息的消费情况。 ​读完一条消息之后Consumer 会推进到 Partition 中的下一个 Offset继续读取消息。 ​ Offset 的推进和记录都是 Consumer 的责任Kafka 是不管的。 Kafka 中有一个 Consumer Group消费组的概念多个 Consumer 组团去消费一个 Topic。 ​ 同组的 Consumer 有相同的 Group ID。 ​ Consumer Group 机制会保障一条消息只被组内唯一一个 Consumer 消费不会重复消费。 ​ 消费组这种方式可以让多个 Partition 并行消费大大提高了消息的消费能力最大并行度为 Topic 的 Partition 数量。 例如一个 Topic 有 3 个 Partition你有 4 个 Consumer 负责这个 Topic也只会有 Consumer 工作另一个作为后补队员当某个 Consumer 故障了它再 补上去是一种很好的容错机制。 参考资料 https://medium.com/event-driven-utopia/understanding-kafka-topic-partitions-ae40f80552e8 附2Kafka中的Partition详解与示例代码Kafka中的Partition详解与示例代码-阿里云开发者社区 在Apache Kafka中Partition分区是一个关键的概念。分区的引入使得Kafka能够处理大规模数据并提供高性能和可伸缩性。本文将深入探讨Kafka 中的Partition包括分区的作用、创建、配置以及一些实际应用中的示例代码。 Partition的作用 在Kafka中Topic被分为一个或多个Partition。每个Partition是一个有序且不可变的消息序列具有自己的唯一标识符Partition ID。分区的主要作用有 水平扩展性通过将Topic划分为多个Partition可以将消息分布到多个Broker上实现水平扩展提高整体吞吐量。 并行处理每个Partition可以在不同的消费者上并行处理提高系统的处理能力。 顺序性在同一个Partition内消息的顺序是有序的。这有助于确保一些需要顺序处理的场景如日志记录。 创建与配置Partition 1 创建Topic时指定Partition数量 可以在创建Topic时指定Partition的数量。以下是一个使用命令行工具创建Topic并指定分区数量的示例 bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092 这将创建一个名为my_topic的Topic有3个分区。 2 动态调整Partition数量 Kafka支持在运行时动态调整Topic的Partition数量。以下是一个示例 bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092 这将把my_topic的分区数增加到5。 3 Partition的配置选项 Partition还有一些配置选项例如消息的保留时间、清理策略等。以下是一个设置消息保留时间的示例 bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config retention.ms86400000 这将设置my_topic的消息保留时间为1天86400000毫秒。 生产者与消费者操作分区 1 生产者发送消息到指定Partition 在生产者发送消息时可以选择将消息发送到特定的Partition。以下是一个Java生产者示例代码 Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(properties);ProducerRecordString, String record new ProducerRecord(my_topic, 1, key, value);producer.send(record);producer.close(); 这将消息发送到my_topic的第2个分区分区编号从0开始。 2 消费者订阅指定Partition 消费者可以选择订阅特定的Partition也可以订阅整个Topic。以下是一个Java消费者示例代码 Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(group.id, my_group);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(properties);TopicPartition partition new TopicPartition(my_topic, 1);consumer.assign(Collections.singletonList(partition));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(Offset %d, Key %s, Value %s%n, record.offset(), record.key(), record.value());}} 这将使消费者仅订阅my_topic的第2个分区。 实际应用示例 1 数据分流 在实际应用中数据分流是一种常见的需求特别是在处理用户活动日志等场景。通过根据用户ID将日志分发到相同的分区可以轻松实现对用户活动的精细化处理、分析和统计。下面是一个具体的数据分流示例 创建Topic时指定Partition数量 首先创建一个Topic假设名为user_activity_logs并指定适当的分区数量例如5个分区 bin/kafka-topics.sh --create --topic user_activity_logs --partitions 5 --replication-factor 2 --bootstrap-server localhost:9092 生产者发送用户活动日志 在实际场景中应用程序可能会有不同的用户活动日志例如登录、点击、购买等。以下是一个简化的Java生产者示例代码用于向user_activity_logs Topic发送用户活动日志 import org.apache.kafka.clients.producer.*;import java.util.Properties;public class UserActivityProducer {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(properties);// 模拟不同用户的活动日志for (int i 1; i 10; i) {String userId user_ i;String activity Login;  // 模拟用户登录活动实际应用中可根据业务类型发送不同类型的活动日志// 根据用户ID计算分区int partition userId.hashCode() % 5;ProducerRecordString, String record new ProducerRecord(user_activity_logs, partition, userId, activity);producer.send(record, (metadata, exception) - {if (exception null) {System.out.printf(Sent record to partition %d with offset %d%n, metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.close();}} 在这个示例中模拟了10个用户的登录活动并通过计算用户ID的哈希值来确定将活动发送到哪个分区。 消费者按用户ID订阅分区 消费者可以根据用户ID订阅相应的分区以便按用户维度处理活动日志。以下是一个简化的Java消费者示例代码 import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class UserActivityConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(group.id, user_activity_group);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(properties);// 订阅指定分区TopicPartition partitionToSubscribe new TopicPartition(user_activity_logs, 0);consumer.assign(Collections.singletonList(partitionToSubscribe));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(Received record with key %s and value %s from partition %d%n,record.key(), record.value(), record.partition());// 在这里进行按用户ID处理的业务逻辑}}}} 在实际应用中可能会根据业务逻辑动态订阅多个分区以处理更多用户的活动日志。 2 时间窗口统计 在实际应用中按照时间窗口对数据进行分区是一种常见的做法特别是对于具有时间戳的数据。这种分区方式使得可以方便地进行时间窗口内的统计、分析和处理。下面是一个具体的按时间窗口统计的示例 创建Topic时指定Partition数量 首先创建一个Topic假设名为timestamped_data并指定适当的分区数量例如5个分区 bin/kafka-topics.sh --create --topic timestamped_data --partitions 5 --replication-factor 2 --bootstrap-server localhost:9092 生产者发送带有时间戳的数据 在实际场景中应用程序可能会产生带有时间戳的数据例如传感器数据、日志等。以下是一个简化的Java生产者示例代码用于向timestamped_data Topic发送带有时间戳的数据 import org.apache.kafka.clients.producer.*;import java.util.Properties;public class TimestampedDataProducer {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(properties);// 模拟带有时间戳的数据for (int i 1; i 10; i) {long timestamp System.currentTimeMillis();String data Data_ i;// 根据时间戳计算分区int partition (int) (timestamp % 5);ProducerRecordString, String record new ProducerRecord(timestamped_data, partition, String.valueOf(timestamp), data);producer.send(record, (metadata, exception) - {if (exception null) {System.out.printf(Sent record to partition %d with offset %d%n, metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.close();}} 在这个示例中模拟了10条带有时间戳的数据并通过计算数据的时间戳来确定将数据发送到哪个分区。 消费者按时间窗口统计订阅分区 消费者可以根据时间戳订阅相应的分区以便按时间窗口统计数据。以下是一个简化的Java消费者示例代码 import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class TimeWindowedDataConsumer {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, localhost:9092);properties.put(group.id, time_windowed_data_group);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(properties);// 订阅指定分区TopicPartition partitionToSubscribe new TopicPartition(timestamped_data, 0);consumer.assign(Collections.singletonList(partitionToSubscribe));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(Received record with key %s and value %s from partition %d%n,record.key(), record.value(), record.partition());// 在这里进行按时间窗口统计的业务逻辑}}}} 在实际应用中你可能会根据业务逻辑动态订阅多个分区以处理更多时间窗口的数据。 分区的内部工作原理 理解Partition在Kafka中的内部工作原理有助于更深入地使用和优化Kafka。以下是一些关键的工作原理 1 分区的负载均衡 Kafka的生产者在将消息发送到分区时使用分区键的哈希函数来决定消息应该被分配到哪个分区。这种哈希算法有助于实现负载均衡确保消息均匀分布在所有分区上。 2 分区的数据保留和清理 每个分区都维护着自己的消息日志Kafka支持配置消息的保留时间和大小。当消息达到指定的保留时间或大小时Kafka会进行清理删除过期的消息。这有助于控制存储占用和维持系统性能。 3 分区的复制机制 为了提高系统的可用性和容错性Kafka使用分区的复制机制。每个分区可以有多个副本分布在不同的Broker上。生产者向主分区发送消息而主分区负责将消息复制到所有副本。这种设计保证了即使某个Broker发生故障仍然可以从其他副本中获取数据。 性能调优与监控 1 监控工具 Kafka提供了一些监控工具例如JConsole、Kafka Manager等可以用于实时监控Kafka集群的状态。这些工具可以展示各个分区的吞吐量、偏移量、副本状态等信息有助于及时发现和解决问题。 2 性能调优 性能调优是使用Kafka的关键一环。你可以通过调整Producer和Consumer的参数以及适时更新Broker的配置来优化系统性能。例如调整Producer的batch.size和linger.ms参数以优化消息的批量发送或者调整Consumer的max.poll.records参数以优化批量处理能力。 3 数据压缩 Kafka支持消息的压缩可以通过配置Producer的compression.type参数来选择压缩算法。压缩可以降低网络传输成本提高数据的传输效率。 properties.put(compression.type, gzip); 总结 本文详细介绍了Kafka中的Partition概念从创建、配置、内部工作原理到实际应用示例和性能调优等多个方面进行了深入的讨论。希望这些详细的示例代码和解释能够帮助大家更全面地理解和应用Kafka中的Partition。在实际应用中根据业务需求和系统规模可以灵活配置Partition以达到最佳性能和可靠性。 7.自定义分区 public interface Partitioner extends Configurable, Closeable { ​/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes The serialized key to partition on( or null if no key)* param value The value to partition on or null* param valueBytes The serialized value to partition on or null* param cluster The current cluster metadata*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); ​/*** This is called when partitioner is closed.*/public void close(); ​ ​/*** Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,* this method can change the chosen sticky partition for the new batch. * param topic The topic name* param cluster The current cluster metadata* param prevPartition The partition previously selected for the record that triggered a new batch*/default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {} } public interface Configurable { ​/*** Configure this class with the given key-value pairs*/void configure(MapString, ? configs); ​ } 1.随机分区 方式 /*** 自定义分区 随机分区*/ public class RandomPartitioner implements Partitioner { ​private Random random new Random();Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Integer partitionNum cluster.partitionCountForTopic(topic);//随机生产int i random.nextInt(partitionNum);return i;} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} } 注册使用 修改 producer.properties partitioner.classcom.study.bigdata.RandomPartitioner 或代码中使用 prop.put(partition.class, com.study.bigdata.RandomPartitioner.class); 2.hash方式 /*** 自定义分区之Hash分区* key的hashCode*/ public class HashPartitioner implements Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取topic的分区Integer partitionNum cluster.partitionCountForTopic(topic);if(keyBytes ! null){return Math.abs(key.hashCode()) % partitionNum;}return 0;   //如果Key不存那么直接返回0} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} } 注册使用 修改 producer.properties partitioner.classcom.study.bigdata.HashPartitioner 或代码中使用 prop.put(partition.class, com.study.bigdata.HashPartitioner.class); 3.轮询方式 默认 /*** 自定义分区之轮询分区*/ public class RoundRobinPartintioner implements Partitioner { ​/*** 轮询操作 我们需要创建轮询对象* counter.getAndIncrement() i*/private AtomicInteger counter new AtomicInteger();Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Integer partitionNum cluster.partitionCountForTopic(topic);return counter.getAndIncrement() % partitionNum;} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} } 注册使用 修改 producer.properties partitioner.classcom.study.bigdata.HashPartitioner 或代码中使用 prop.put(partition.class, com.study.bigdata.HashPartitioner.class); 分组分区 /*** 自定义分区之分组分区*/ public class GroupPartitioner implements Partitioner { ​/** 将要分区的数据划分好*/private MapString, Integer map new HashMapString, Integer();{map.put(a, 0);map.put(b, 1);map.put(c, 2);map.put(d, 3);map.put(e, 4);}Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String line value.toString();String[] strs line.split(\\s);if(strs null || strs.length ! 2){return 0;}else {return map.getOrDefault(strs[0], 0);}} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} }/*** 自定义分区之分组分区*/ public class GroupPartitioner implements Partitioner { ​/** 将要分区的数据划分好*/private MapString, Integer map new HashMapString, Integer();{map.put(a, 0);map.put(b, 1);map.put(c, 2);map.put(d, 3);map.put(e, 4);}Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String line value.toString();String[] strs line.split(\\s);if(strs null || strs.length ! 2){return 0;}else {return map.getOrDefault(strs[0], 0);}} ​Overridepublic void close() { ​} ​Overridepublic void configure(MapString, ? configs) { ​} } 参考资料 附1快速上手学习如何使用C实现kafka消费者客户端(快速上手学习如何使用C实现kafka消费者客户端_c 使用kafka-CSDN博客) 附2Docker实战之Kafka集群 1. 概述 Apache Kafka 是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。其具有高吞吐量、内置分区、支持数据副本和容错的特性适合在大规模消息处理场景中使用。 笔者之前在物联网公司工作其中 Kafka 作为物联网 MQ 选型的事实标准这里优先给大家搭建 Kafka 集群环境。由于 Kafka 的安装需要依赖 Zookeeper对 Zookeeper 还不了解的小伙伴可以在 这里 先认识下 Zookeeper。 Kafka 能解决什么问题呢先说一下消息队列常见的使用场景吧其实场景有很多但是比较核心的有 3 个解耦、异步、削峰。 2. Kafka 基本概念 Kafka 部分名词解释如下 Broker消息中间件处理结点一个 Kafka 节点就是一个 broker多个 broker 可以组成一个 Kafka 集群。 Topic一类消息例如 page view 日志、click 日志等都可以以 topic 的形式存在Kafka 集群能够同时负责多个 topic 的分发。 Partitiontopic 物理上的分组一个 topic 可以分为多个 partition每个 partition 是一个有序的队列。 Segmentpartition 物理上由多个 segment 组成下面有详细说明。 offset每个 partition 都由一系列有序的、不可变的消息组成这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息.每个 partition 中的消息都由 offset0 开始记录消息。 3. Docker 环境搭建 配合上一节的 Zookeeper 环境,计划搭建一个 3 节点的集群。宿主机 IP 为 192.168.124.5。 docker-compose-kafka-cluster.yml version: 3.7 ​ networks:docker_net:external: true ​ services: ​kafka1:image: wurstmeister/kafkarestart: unless-stoppedcontainer_name: kafka1ports:- 9093:9092external_links:- zoo1- zoo2- zoo3environment:KAFKA_BROKER_ID: 1KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                   ## 修改:宿主机IPKAFKA_ADVERTISED_PORT: 9093                                 ## 修改:宿主机映射portKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093    ## 绑定发布订阅的端口。修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181volumes:- ./kafka/kafka1/docker.sock:/var/run/docker.sock- ./kafka/kafka1/data/:/kafkanetworks:- docker_net ​ ​kafka2:image: wurstmeister/kafkarestart: unless-stoppedcontainer_name: kafka2ports:- 9094:9092external_links:- zoo1- zoo2- zoo3environment:KAFKA_BROKER_ID: 2KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                 ## 修改:宿主机IPKAFKA_ADVERTISED_PORT: 9094                               ## 修改:宿主机映射portKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094   ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181volumes:- ./kafka/kafka2/docker.sock:/var/run/docker.sock- ./kafka/kafka2/data/:/kafkanetworks:- docker_net ​kafka3:image: wurstmeister/kafkarestart: unless-stoppedcontainer_name: kafka3ports:- 9095:9092external_links:- zoo1- zoo2- zoo3environment:KAFKA_BROKER_ID: 3KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5                 ## 修改:宿主机IPKAFKA_ADVERTISED_PORT: 9095                              ## 修改:宿主机映射portKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095   ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181,zoo3:2181volumes:- ./kafka/kafka3/docker.sock:/var/run/docker.sock- ./kafka/kafka3/data/:/kafkanetworks:- docker_net ​kafka-manager:image: sheepkiller/kafka-manager:latestrestart: unless-stoppedcontainer_name: kafka-managerhostname: kafka-managerports:- 9000:9000links:            # 连接本compose文件创建的container- kafka1- kafka2- kafka3external_links:   # 连接本compose文件以外的container- zoo1- zoo2- zoo3environment:ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181                 ## 修改:宿主机IPTZ: CST-8networks:- docker_net 执行以下命令启动 docker-compose -f docker-compose-kafka-cluster.yml up -d 可以看到 kafka 集群已经启动成功。 4. Kafka 初认识 4.1 可视化管理 细心的小伙伴发现上边的配置除了 kafka 外还有一个 kafka-manager 模块。它是 kafka 的可视化管理模块。因为 kafka 的元数据、配置信息由 Zookeeper 管理这里我们在 UI 页面做下相关配置。 1. 访问 http:localhost:9000,按图示添加相关配置 2. 配置后我们可以看到默认有一个 topic(__consumer_offsets)3 个 brokers。该 topic 分 50 个 partition用于记录 kafka 的消费偏移量。 4.2 Zookeeper 在 kafka 环境中做了什么 1. 首先观察下根目录 kafka 基于 zookeeperkafka 启动会将元数据保存在 zookeeper 中。查看 zookeeper 节点目录会发现多了很多和 kafka 相关的目录。结果如下: ➜ docker zkCli -server 127.0.0.1:2183 Connecting to 127.0.0.1:2183 Welcome to ZooKeeper! JLine support is enabled ​ WATCHER:: ​ WatchedEvent state:SyncConnected type:None path:null [zk: 127.0.0.1:2183(CONNECTED) 0] ls / [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config] 2. 查看我们映射的 kafka 目录新版本的 kafka 偏移量不再存储在 zk 中而是在 kafka 自己的环境中。 我们节选了部分目录(包含 2 个 partition) ├── kafka1 │   ├── data │   │   └── kafka-logs-c4e2e9edc235 │   │       ├── __consumer_offsets-1 │   │       │   ├── 00000000000000000000.index       // segment索引文件 │   │       │   ├── 00000000000000000000.log         // 数据文件 │   │       │   ├── 00000000000000000000.timeindex   // 消息时间戳索引文件 │   │       │   └── leader-epoch-checkpoint ... │   │       ├── __consumer_offsets-7 │   │       │   ├── 00000000000000000000.index │   │       │   ├── 00000000000000000000.log │   │       │   ├── 00000000000000000000.timeindex │   │       │   └── leader-epoch-checkpoint │   │       ├── cleaner-offset-checkpoint │   │       ├── log-start-offset-checkpoint │   │       ├── meta.properties │   │       ├── recovery-point-offset-checkpoint │   │       └── replication-offset-checkpoint │   └── docker.sock 结果与 Kafka-Manage 显示结果一致。图示的文件是一个 Segment00000000000000000000.log 表示 offset 从 0 开始随着数据不断的增加会有多个 Segment 文件。 5. 生产与消费 5.1 创建主题 ➜ docker docker exec -it kafka1 /bin/bash   # 进入容器 bash-4.4# cd /opt/kafka/   # 进入安装目录 bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181   # 查看主题列表 __consumer_offsets bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test   # 新建主题 Created topic test. 说明: --replication-factor 副本数; --partitions 分区数; replicationbroker(一定); 有效消费者数partitions 分区数(一定); 新建主题后, 再次查看映射目录, 由图可见partition 在 3 个 broker 上均匀分布。 5.2 生产消息 bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test msg1 msg2 msg3 msg4 msg5 msg6 5.3 消费消息 bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning msg1 msg3 msg2 msg4 msg6 msg5 --from-beginning 代表从头开始消费 5.4 消费详情 查看消费者组 bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list KafkaManagerOffsetCache console-consumer-86137 消费组偏移量 bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache 查看 topic 详情 bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test Topic: test PartitionCount: 3   ReplicationFactor: 2   Configs:Topic: test Partition: 0   Leader: 3   Replicas: 3,1   Isr: 3,1Topic: test Partition: 1   Leader: 1   Replicas: 1,2   Isr: 1,2Topic: test Partition: 2   Leader: 2   Replicas: 2,3   Isr: 2,3 查看.log 数据文件 bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true | offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2 baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true | offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4 baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true | offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6 这里需要看下自己的文件路径是什么别直接 copy 我的哦 查看.index 索引文件 bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index offset: 0 position: 0 查看.timeindex 索引文件 bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindexIndex timestamp: 0, log timestamp: 1583317546421 6. SpringBoot 集成 笔者 SpringBoot 版本是 2.2.2.RELEASE pom.xml 添加依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.4.0.RELEASE/version/dependency 生产者配置 Configuration public class KafkaProducerConfig { ​/*** producer配置* return*/public MapString, Object producerConfigs() {MapString, Object props new HashMap();// 指定多个kafka集群多个地址 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095);// 重试次数0为不启用重试机制props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);// acks0 把消息发送到kafka就认为发送成功// acks1 把消息发送到kafka leader分区并且写入磁盘就认为发送成功// acksall 把消息发送到kafka leader分区并且leader分区的副本follower对消息进行了同步就任务发送成功props.put(ProducerConfig.ACKS_CONFIG,all);// 生产者空间不足时send()被阻塞的时间默认60sprops.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);// 控制批处理大小单位为字节props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);// 批量发送延迟为1毫秒启用该功能能有效减少生产者发送消息次数从而提高并发量props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);// 客户端idprops.put(ProducerConfig.CLIENT_ID_CONFIG,producer.client.id.topinfo);// 键的序列化方式props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 值的序列化方式props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 压缩消息支持四种类型分别为none、lz4、gzip、snappy默认为none。// 消费者默认支持解压所以压缩设置在生产者消费者无需设置。props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,none);return props;} ​/*** producer工厂配置* return*/public ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());} ​/*** Producer Template 配置*/Bean(namekafkaTemplate)public KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());} } 消费者配置 Configuration public class KafkaConsumerConfig { ​ ​private static final String GROUP0_ID group0;private static final String GROUP1_ID group1; ​/*** 1. setAckMode: 消费者手动提交ack** RECORD 每处理完一条记录后提交。* BATCH(默认) 每次poll一批数据后提交一次频率取决于每次poll的调用频率。* TIME 每次间隔ackTime的时间提交。* COUNT 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount就提交。* COUNT_TIME TIME和COUNT中任意一条满足即提交。* MANUAL 手动调用Acknowledgment.acknowledge()后并且处理完poll的这批数据后提交。* MANUAL_IMMEDIATE 手动调用Acknowledgment.acknowledge()后立即提交。** 2. factory.setConcurrency(3);* 此处设置的目的在于假设 topic test 下有 0、1、2三个 partitionSpring Boot中只有一个 KafkaListener() 消费者订阅此 topic此处设置并发为3* 启动后 会有三个不同的消费者分别订阅 p0、p1、p2本地实际有三个消费者线程。* 而 factory.setConcurrency(1); 的话 本地只有一个消费者线程 p0、p1、p2被同一个消费者订阅。* 由于 一个partition只能被同一个消费者组下的一个消费者订阅对于只有一个 partition的topic即使设置 并发为3也只会有一个消费者多余的消费者没有 partition可以订阅。** 3. factory.setBatchListener(true);* 设置批量消费 每个批次数量在Kafka配置参数ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置* 限制的是 一次批量接收的最大条数而不是 等到达到最大条数才接收这点容易被误解。* 实际测试时接收是实时的当生产者大量写入时一次批量接收的消息数量为 配置的最大条数。*/BeanKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryInteger, Stringfactory new ConcurrentKafkaListenerContainerFactory();// 设置消费者工厂factory.setConsumerFactory(consumerFactory());// 设置为批量消费每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.setBatchListener(true);// 消费者组中线程数量,消费者数量partition数量即使配置的消费者数量大于partition数量多余消费者无法消费到数据。factory.setConcurrency(4);// 拉取超时时间factory.getContainerProperties().setPollTimeout(3000);// 手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;} ​Beanpublic ConsumerFactoryInteger, String consumerFactory() {MapString, Object map consumerConfigs();map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);return new DefaultKafkaConsumerFactory(consumerConfigs());} ​ //   Bean //   KafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaListenerContainerFactory1() { //       ConcurrentKafkaListenerContainerFactoryInteger, String //               factory new ConcurrentKafkaListenerContainerFactory(); //       // 设置消费者工厂 //       factory.setConsumerFactory(consumerFactory1()); //       // 设置为批量消费每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG //       factory.setBatchListener(true); //       // 消费者组中线程数量,消费者数量partition数量即使配置的消费者数量大于partition数量多余消费者无法消费到数据。 //       factory.setConcurrency(3); //       // 拉取超时时间 //       factory.getContainerProperties().setPollTimeout(3000); //       // 手动提交 //       factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); //       return factory; //   } // //   public ConsumerFactoryInteger, String consumerFactory1() { //       MapString, Object map consumerConfigs(); //       map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID); //       return new DefaultKafkaConsumerFactory(consumerConfigs()); //   } ​Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();// Kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095);// 是否自动提交offset偏移量(默认true)props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 批量消费props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);// 消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG, group-default);// 自动提交的频率(ms) //       propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);// Session超时设置props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);// 键的反序列化方式props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 值的反序列化方式props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// offset偏移量规则设置// (1)、earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费// (2)、latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据// (3)、nonetopic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);return props;} ​ } 主题配置 Configuration public class KafkaTopicConfig { ​/*** 定义一个KafkaAdmin的bean可以自动检测集群中是否存在topic不存在则创建*/Beanpublic KafkaAdmin kafkaAdmin() {MapString, Object configs new HashMap();// 指定多个kafka集群多个地址例如192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095);return new KafkaAdmin(configs);} ​/*** 创建 Topic*/Beanpublic NewTopic topicinfo() {// 创建topic需要指定创建的topic的名称、分区数、副本数量(副本数数目设置要小于Broker数量)return new NewTopic(test, 3, (short) 2);} ​ } 消费者服务 Slf4j Service public class KafkaConsumerService { ​ ​ //   /** //     * 单条消费 //     * param message //     */ //   KafkaListener(id id0, topics {Constant.TOPIC}, containerFactorykafkaListenerContainerFactory) //   public void kafkaListener0(String message){ //       log.info(consumer:group0 -- message:{}, message); //   } // //   KafkaListener(id id1, topics {Constant.TOPIC}, groupId group1) //   public void kafkaListener1(String message){ //       log.info(consumer:group1 -- message:{}, message); //   } //   /** //     * 监听某个 Topic 的某个分区示例,也可以监听多个 Topic 的分区 //     * 为什么找不到group2呢 //     * param message //     */ //   KafkaListener(id id2, groupId group2, topicPartitions { TopicPartition(topic Constant.TOPIC, partitions { 0 }) }) //   public void kafkaListener2(String message) { //       log.info(consumer:group2 -- message:{}, message); //   } // //   /** //     * 获取监听的 topic 消息头中的元数据 //     * param message //     * param topic //     * param key //     */ //   KafkaListener(id id3, topics Constant.TOPIC, groupId group3) //   public void kafkaListener(Payload String message, //                             Header(KafkaHeaders.RECEIVED_TOPIC) String topic, //                             Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition, //                             Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { //       Long threadId Thread.currentThread().getId(); //       log.info(consumer:group3 -- message:{}, topic:{}, partition:{}, key:{}, threadId:{}, message, topic, partition, key, threadId); //   } // //   /** //     * 监听 topic 进行批量消费 //     * param messages //     */ //   KafkaListener(id id4, topics Constant.TOPIC, groupId group4) //   public void kafkaListener(ListString messages) { //       for(String msg:messages){ //           log.info(consumer:group4 -- message:{}, msg); //       } //   } // //   /** //     * 监听topic并手动提交偏移量 //     * param messages //     * param acknowledgment //     */ //   KafkaListener(id id5, topics Constant.TOPIC,groupId group5) //   public void kafkaListener(ListString messages, Acknowledgment acknowledgment) { //       for(String msg:messages){ //           log.info(consumer:group5 -- message:{}, msg); //       } //       // 触发提交offset偏移量 //       acknowledgment.acknowledge(); //   } // //   /** //     * 模糊匹配多个 Topic //     * param message //     */ //   KafkaListener(id id6, topicPattern test.*,groupId group6) //   public void annoListener2(String message) { //       log.error(consumer:group6 -- message:{}, message); //   } ​/*** 完整consumer* return*/KafkaListener(id id7, topics {Constant.TOPIC}, groupId group7)public boolean consumer4(ListConsumerRecord?, ? data) {for (int i0; idata.size(); i) {ConsumerRecord?, ? record data.get(i);Optional? kafkaMessage Optional.ofNullable(record.value()); ​Long threadId Thread.currentThread().getId();if (kafkaMessage.isPresent()) {Object message kafkaMessage.get();log.info(consumer:group7 -- message:{}, topic:{}, partition:{}, key:{}, offset:{}, threadId:{}, message.toString(), record.topic(), record.partition(), record.key(), record.offset(), threadId);}} ​return true;} ​ } 生产者服务 Service public class KafkaProducerService { ​Autowiredprivate KafkaTemplate kafkaTemplate; ​/*** producer 同步方式发送数据* param topic   topic名称* param key     一般用业务id相同业务在同一partition保证消费顺序* param message producer发送的数据*/public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {// 默认轮询partitionkafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); //       // 根据key进行hash运算再将运算结果写入到不同partition //       kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS); //       // 第二个参数为partition,当partition和key同时设置时partition优先。 //       kafkaTemplate.send(topic, 0, key, message); //       // 组装消息 //       Message msg MessageBuilder.withPayload(Send Message(payload,headers) Test) //               .setHeader(KafkaHeaders.MESSAGE_KEY, key) //               .setHeader(KafkaHeaders.TOPIC, topic) //               .setHeader(KafkaHeaders.PREFIX,kafka_) //               .build(); //       kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS); //       // 组装消息 //       ProducerRecordString, String producerRecord new ProducerRecord(test, Send ProducerRecord(topic,value) Test); //       kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);} ​/*** producer 异步方式发送数据* param topic   topic名称* param message producer发送的数据*/public void sendMessageAsync(String topic, String message) {ListenableFutureSendResultInteger, String future kafkaTemplate.send(topic, message); ​// 设置异步发送消息获取发送结果后执行的动作ListenableFutureCallback listenableFutureCallback new ListenableFutureCallbackSendResultInteger, String() {Overridepublic void onSuccess(SendResultInteger, String result) {System.out.println(success);} ​Overridepublic void onFailure(Throwable ex) {System.out.println(failure);}}; ​// 将listenableFutureCallback与异步发送消息对象绑定future.addCallback(listenableFutureCallback);} ​public void test(String topic, Integer partition, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {kafkaTemplate.send(topic, partition, key, message).get(10, TimeUnit.SECONDS);} } web 测试 RestController public class KafkaProducerController { ​Autowiredprivate KafkaProducerService producerService; ​GetMapping(/sync)public void sendMessageSync(RequestParam String topic) throws InterruptedException, ExecutionException, TimeoutException {producerService.sendMessageSync(topic, null, 同步发送消息测试);} ​GetMapping(/async)public void sendMessageAsync(){producerService.sendMessageAsync(test,异步发送消息测试);} ​GetMapping(/test)public void test(RequestParam String topic, RequestParam(required false) Integer partition, RequestParam(required false) String key, RequestParam String message) throws InterruptedException, ExecutionException, TimeoutException {producerService.test(topic, partition, key, message);} ​ } 附3Kafka快速入门六——Kafka集群部署(Kafka快速入门六——Kafka集群部署_51CTO博客_kafka集群部署) 一、Kafka集群部署方案规划 1、操作系统选择 通常生产环境应该将Kafka集群部署在Linux操作系统上原因如下 1Kafka客户端底层使用了Java的selectorselector在Linux上的实现机制是epoll而在Windows平台上的实现机制是select因此Kafka部署在Linux上能够获得更高效的I/O性能。 2网络传输效率的差别。Kafka需要在磁盘和网络间进行大量数据传输在Linux部署Kafka能够享受到零拷贝Zero Copy技术所带来的快速数据传输特性。 3社区的支持度。Apache Kafka社区目前对Windows平台上发现的Kafka Bug不做任何承诺。 2、磁盘 1Kafka实现了冗余机制来提供高可靠性并通过分区机制在软件层面实现负载均衡因此Kafka的磁盘存储可以不使用磁盘阵列RAID使用普通磁盘组成存储空间即可。 2使用机械磁盘能够胜任Kafka线上环境但SSD显然性能更好。 3、磁盘容量 规划磁盘容量时需要考虑新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩等因素。 假设公司业务每天需要向Kafka集群发送100000000条消息每条消息保存两份以防止数据丢失消息默认保存7天时间消息的平均大小是1KBKafka的数据压缩比是0.75。 每天100000000条1KB大小的消息保存两份压缩比0.75占用空间大小就等于150GB(100000000*1KB*2/1000/1000*0.75)考虑到Kafka集群的索引数据等需要预留出10%的磁盘空间因此每天总存储容量是165GB。数据留存7天因此规划磁盘容量为1155GB165GB*7。 4、网络带宽 假设公司的机房环境是千兆网络即1Gbps业务需要在1小时内处理1TB的业务数据。假设Kafka Broker会用到70%的带宽资源超过70%的阈值可能网络丢包单台Kafka Broker最多能使用大约700Mb的带宽资源但通常需要再额外为其它服务预留出2/3的资源即Kafka Broker可以为Kafka服务分配带宽240Mbps700Mb/3。1小时处理1TB数据则每秒需要处理2336Mb1024*1024*8/3600数据除以240约等于10台服务器。如果还需要额外复制两份那么服务器台数还要乘以3即30台。 二、Kafka集群参数配置 1、Broker端参数 Broker端参数也被称为静态参数Static Configs静态参数只能在Kafka的配置文件server.properties中进行设置必须重启Broker进程才能生效。 log.dirs指定Broker需要使用的若干个文件目录路径没有默认值必须指定。在生产环境中一定要为log.dirs配置多个路径如果条件允许需要保证目录被挂载到不同的物理磁盘上。优势在于提升读写性能多块物理磁盘同时读写数据具有更高的吞吐量能够实现故障转移FailoverKafka 1.1版本引入Failover功能坏掉磁盘上的数据会自动地转移到其它正常的磁盘上而且Broker还能正常工作基于Failover机制Kafka可以舍弃RAID方案。 zookeeper.connectCS格式参数可以指定值为zk1:2181,zk2:2181,zk3:2181不同Kafka集群可以指定zk1:2181,zk2:2181,zk3:2181/kafka1chroot只需要写一次。 listeners设置内网访问Kafka服务的监听器。 advertised.listeners设置外网访问Kafka服务的监听器。 auto.create.topics.enable是否允许自动创建Topic。 unclean.leader.election.enable是否允许Unclean Leader 选举。 auto.leader.rebalance.enable是否允许定期进行Leader选举生产环境中建议设置成false。 log.retention.{hours|minutes|ms}控制一条消息数据被保存多长时间。优先级ms设置最高、minutes次之、hours最低。 log.retention.bytes指定Broker为消息保存的总磁盘容量大小。message.max.bytes控制Broker能够接收的最大消息大小。 2、Topic级别参数 如果同时设置了Topic级别参数和全局Broker参数Topic级别参数会覆盖全局Broker参数而每个Topic都能设置自己的参数值。 生产环境中应当允许不同部门的Topic根据自身业务需要设置自己的留存时间。如果只能设置全局Broker参数那么势必要提取所有业务留存时间的最大值作为全局参数值此时设置Topic级别参数对Broker参数进行覆盖就是一个不错的选择。 retention.ms指定Topic消息被保存的时长默认是7天只保存最近7天的消息会覆盖掉Broker端的全局参数值。 retention.bytes指定为Topic预留多大的磁盘空间。通常在多租户的Kafka集群中使用默认值是 -1表示可以无限使用磁盘空间。 max.message.bytes指定Kafka Broker能够正常接收Topic 的最大消息大小。 Topic级别参数可以在创建Topic时进行设置也可以在修改Topic 时设置推荐在修改Topic时进行设置Apache Kafka社区未来可能统一使用kafka-configs脚本来设置Topic级别参数。 3、JVM参数 Kafka 2.0.0版本已经正式摒弃对Java 7的支持。 Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的Byte Buffer实例因此JVM端设置的Heap Size不能太小建议设置6GB。 export KAFKA_HEAP_OPTS--Xms6g --Xmx6g JVM端配置的一个重要参数是垃圾回收器的设置。对于Java 7如果Broker所在机器的CPU资源非常充裕建议使用CMS收集器。启用方法是指定-XX:UseCurrentMarkSweepGC。否则使用吞吐量收集器开启方法是指定-XX:UseParallelGC。对于Java 9用默认的G1收集器在没有任何调优的情况下G1表现得要比CMS出色主要体现在更少的Full GC需要调整的参数更少等所以使用G1就好。 export KAFKA_JVM_PERFORMANCE_OPTS -server -XX:UseG1GC -XX:MaxGCPauseMillis20 -XX:InitiatingHeapOccupancyPercent35 -XX:ExplicitGCInvokesConcurrent -Djava.awt.headlesstrue 4、操作系统参数 件描述符限制ulimit -n。建议设置成一个超大的值如ulimit -n 1000000。 文件系统类型文件系统类型的选择。根据官网的测试报告XFS 的性能要强于ext4。 Swappiness推荐设置为一个较小值如1。如果将swap设置为0将会完全禁止Kafka Broker进程使用swap空间当物理内存耗尽时操作系统会触发OOM killer组件随机挑选一个进程kill掉不给用户任何预警。如果设置一个比较小值当开始使用swap空间时Broker性能会出现急剧下降从而给进一步调优和诊断问题的时间。 提交时间提交时间Flush落盘时间。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功而是只要数据被写入到操作系统的页缓存Page Cache上就认为写入成功随后操作系统根据LRU算法会定期将页缓存上的脏数据落盘到物理磁盘上。页缓存数据写入磁盘的周期由提交时间来确定默认是5秒可以适当地增加提交间隔来降低物理磁盘的写操作。如果在页缓存中的数据在写入到磁盘前机器宕机数据会丢失但鉴于Kafka在软件层面已经提供了多副本的冗余机制拉大提交间隔换取性能是一个合理的做法。 三、Docker镜像选择 1、安装docker 安装Docker:sudo yum install docker 启动Dockersudo systemctl start docker docker版本检查docker version 2、docker-compose安装 docker-compose下载sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-uname -s-uname -m-o /usr/local/bin/docker-compose docker-compose安装sudo chmod x /usr/local/bin/docker-compose docker-compose版本检查docker-compose version 3、docker镜像选择 zookeeper镜像选择 docker search zookeeper 选择star最多的镜像docker.io/zookeeper Kafka镜像选择 docker search kafka 选择star最多的镜像docker.io/wurstmeister/kafka kafka-manager镜像选择 docker search kafka-manager 选择镜像kafkamanager/kafka-manager 四、Kafka单机部署方案 1、编写docker-compose.yml文件 # 单机 zookeeper kafka kafka-manager集群 version: 2 ​ services:# 定义zookeeper服务zookeeper-test:image: zookeeper # zookeeper镜像restart: alwayshostname: zookeeper-testports:- 12181:2181 # 宿主机端口docker内部端口container_name: zookeeper-test # 容器名称 ​# 定义kafka服务kafka-test:image: wurstmeister/kafka # kafka镜像restart: alwayshostname: kafka-testports:- 9092:9092 # 对外暴露端口号- 9999:9999 # 对外暴露JMX_PORTenvironment:KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 #KAFKA_ADVERTISED_PORT: 9092 # KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服务KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper连接超时KAFKA_LOG_CLEANUP_POLICY: deleteKAFKA_LOG_RETENTION_HOURS: 120 # 设置消息数据保存的最长时间为120小时KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息体的最大字节数KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 # KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 # KAFKA_NUM_PARTITIONS: 1 # 分区数量KAFKA_DELETE_RETENTION_MS: 10000 # KAFKA_BROKER_ID: 1 # kafka的IDKAFKA_COMPRESSION_TYPE: lz4KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -Djava.rmi.server.hostname192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port9999 # 导入KAFKA_JMX_OPTS环境变量JMX_PORT: 9999 # 导入JMX_PORT环境变量depends_on:- zookeeper-test # 依赖container_name: kafka-test ​# 定义kafka-manager服务kafka-manager-test:image: kafkamanager/kafka-manager # kafka-manager镜像restart: alwayscontainer_name: kafka-manager-testhostname: kafka-manager-testports:- 9000:9000 # 对外暴露端口提供web访问depends_on:- kafka-test # 依赖environment:ZK_HOSTS: zookeeper-test:2181 # 宿主机IPKAFKA_BROKERS: kafka-test:9090 # kafkaKAFKA_MANAGER_AUTH_ENABLED: true # 开启安全认证KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登录用户KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登录密码 需要确认相应端口是否被占用。 2、启动服务 创建kafka目录将docker-compose.yml文件放入kafka目录在kafka目录执行命令。 启动 docker-compose up -d 关闭 docker-compose down 3、kafka服务查看 进入docker容器 docker exec -it kafka /bin/bash 创建Topic kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test 查看Topic kafka-topics.sh --list --zookeeper zookeeper:2181 生产消息 kafka-console-producer.sh --broker-list kafka:9092 --topic test 消费消息 kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning 打开两个Terminal一个执行生产消息的命令一个执行消费消息的命令每生产一条消息时消费消息Terminal就会显示一条消息实现消息队列。 4、Kafka版本查询 wurstmeister/kafka镜像中kafka安装在/opt目录下进入/opt目录kafka_2.12-2.4.0目录即为kafka安装目录。 Scala版本2.12 Kafka版本2.4 5、kafka-manager监控 Web方式访问http://127.0.0.1:9000 五、错误解决 1、容器删除失败 docker rm -f $(docker ps -a --filter statusdead -q |head -n 1) 报错信息 ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy grep docker /proc/*/mountinfo|grep containerid | awk -F : {print $1} | awk -F / {print $3} sudo kill -9 3119 2、kafka服务一直重启 报错信息 Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running 错误原因 docker-compose.yml文件对kafka服务配置restart: always如果kafka服务启动失败会一直重启可以通过docker logs kafka查看kafka服务启动的日志信息查找错误原因。 六、Kafka集群参数配置 ############################# System ###################### # 唯一标识在集群中的ID要求是正数。   broker.id 0 # 服务端口默认9092   port 9092 # 监听地址不设为所有地址   host.name debugo01 ​ # 处理网络请求的最大线程数   num.network.threads 2 # 处理磁盘I/O的线程数   num.io.threads 8 # 一些后台线程数   background.threads 4 # 等待IO线程处理的请求队列最大数   queued.max.requests 500 ​ # socket的发送缓冲区SO_SNDBUF   socket.send.buffer.bytes 1048576 # socket的接收缓冲区 (SO_RCVBUF)   socket.receive.buffer.bytes 1048576 # socket请求的最大字节数。为了防止内存溢出message.max.bytes必然要小于   socket.request.max.bytes 104857600 ​ ############################# Topic ######################## # 每个topic的分区个数更多的partition会产生更多的segment file   num.partitions 2 # 是否允许自动创建topic 若是false就需要通过命令创建topic   auto.create.topics.enable true # 一个topic 默认分区的replication个数 不能大于集群中broker的个数。   default.replication.factor 1 # 消息体的最大大小单位是字节   message.max.bytes 1000000 ​ ############################# ZooKeeper #################### # Zookeeper quorum设置。如果有多个使用逗号分割   zookeeper.connect debugo01:2181, debugo02, debugo03 # 连接zk的超时时间   zookeeper.connection.timeout.ms 1000000 # ZooKeeper集群中leader和follower之间的同步实际   zookeeper.sync.time.ms 2000 ​ ############################# Log ######################### # 日志存放目录多个目录使用逗号分割   log.dirs / var / log / kafka ​ # 当达到下面的消息数量时会将数据flush到日志文件中。默认10000   # log.flush.interval.messages10000   # 当达到下面的时间(ms)时执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到都会flush。默认3000ms   # log.flush.interval.ms1000   # 检查是否需要将日志flush的时间间隔   log.flush.scheduler.interval.ms 3000 ​ # 日志清理策略delete|compact   log.cleanup.policy delete # 日志保存时间 (hours|minutes)默认为7天168小时。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。   log.retention.hours 168 # 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。   # log.retention.bytes1073741824   ​ # 控制日志segment文件的大小超出该大小则追加到一个新的日志segment文件中-1表示没有限制   log.segment.bytes 536870912 # 当达到下面时间会强制新建一个segment   log.roll.hours 24 * 7 # 日志片段文件的检查周期查看它们是否达到了删除策略的设置log.retention.hours或log.retention.bytes   log.retention.check.interval.ms 60000 ​ # 是否开启压缩   log.cleaner.enable false # 对于压缩的日志保留的最长时间   log.cleaner.delete.retention.ms 1 day ​ # 对于segment日志的索引文件大小限制   log.index.size.max.bytes 10 * 1024 * 1024 # y索引计算的一个缓冲区一般不需要设置。   log.index.interval.bytes 4096 ​ ############################# replica ####################### # partition management controller 与replicas之间通讯的超时时间   controller.socket.timeout.ms 30000 # controller-to-broker-channels消息队列的尺寸大小   controller.message.queue.size 10 # replicas响应leader的最长等待时间若是超过这个时间就将replicas排除在管理之外   replica.lag.time.max.ms 10000 # 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader并转移到其他broker   controlled.shutdown.enable false # 控制器关闭的尝试次数   controlled.shutdown.max.retries 3 # 每次关闭尝试的时间间隔   controlled.shutdown.retry.backoff.ms 5000 ​ # 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.   replica.lag.max.messages 4000 # leader与relicas的socket超时时间   replica.socket.timeout.ms 30 * 1000 # leader复制的socket缓存大小   replica.socket.receive.buffer.bytes 64 * 1024 # replicas每次获取数据的最大字节数   replica.fetch.max.bytes 1024 * 1024 # replicas同leader之间通信的最大等待时间失败了会重试   replica.fetch.wait.max.ms 500 # 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小   replica.fetch.min.bytes 1 # leader中进行复制的线程数增大这个数值会增加relipca的IO   num.replica.fetchers 1 # 每个replica将最高水位进行flush的时间间隔   replica.high.watermark.checkpoint.interval.ms 5000 ​ # 是否自动平衡broker之间的分配策略   auto.leader.rebalance.enable false # leader的不平衡比例若是超过这个数值会对分区进行重新的平衡   leader.imbalance.per.broker.percentage 10 # 检查leader是否不平衡的时间间隔   leader.imbalance.check.interval.seconds 300 # 客户端保留offset信息的最大空间大小   offset.metadata.max.bytes 1024 ​ #############################Consumer #####################   # Consumer端核心的配置是group.id、zookeeper.connect   # 决定该Consumer归属的唯一组IDBy setting the same group id multiple processes indicate that they are all part of the same consumer group.   group.id # 消费者的ID若是没有设置的话会自增   consumer.id # 一个用于跟踪调查的ID 最好同group.id相同   client.id group_id ​ # 对于zookeeper集群的指定必须和broker使用同样的zk配置   zookeeper.connect debugo01:2182, debugo02: 2182, debugo03: 2182 # zookeeper的心跳超时时间查过这个时间就认为是无效的消费者   zookeeper.session.timeout.ms 6000 # zookeeper的等待连接时间   zookeeper.connection.timeout.ms 6000 # zookeeper的follower同leader的同步时间   zookeeper.sync.time.ms 2000 # 当zookeeper中没有初始的offset时或者超出offset上限时的处理方式 。   # smallest 重置为最小值   # largest:重置为最大值   # anything else抛出异常给consumer   auto.offset.reset largest ​ # socket的超时时间实际的超时时间为max.fetch.wait socket.timeout.ms.   socket.timeout.ms 30 * 1000 # socket的接收缓存空间大小   socket.receive.buffer.bytes 64 * 1024 # 从每个分区fetch的消息大小限制   fetch.message.max.bytes 1024 * 1024 ​ # true时Consumer会在消费消息后将offset同步到zookeeper这样当Consumer失败后新的consumer就能从zookeeper获取最新的offset   auto.commit.enable true # 自动提交的时间间隔   auto.commit.interval.ms 60 * 1000 ​ # 用于消费的最大数量的消息块缓冲大小每个块可以等同于fetch.message.max.bytes中数值   queued.max.message.chunks 10 ​ # 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数   rebalance.max.retries 4 # 每次reblance的时间间隔   rebalance.backoff.ms 2000 # 每次重新选举leader的时间   refresh.leader.backoff.ms ​ # server发送到消费端的最小数据若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。   fetch.min.bytes 1 # 若是不满足fetch.min.bytes时等待消费端请求的最长等待时间   fetch.wait.max.ms 100 # 如果指定时间内没有新消息可用于消费就抛出异常默认-1表示不受限   consumer.timeout.ms -1 ​ #############################Producer######################   # 核心的配置包括   # metadata.broker.list   # request.required.acks   # producer.type   # serializer.class   ​ # 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是host1:port1,host2:port2也可以在外面设置一个vip   metadata.broker.list ​ # 消息的确认模式   # 0不保证消息的到达确认只管发送低延迟但是会出现消息的丢失在某个server失败的情况下有点像TCP   # 1发送消息并会等待leader 收到确认后一定的可靠性   # -1发送消息等待leader收到确认并进行复制操作后才返回最高的可靠性   request.required.acks 0 ​ # 消息发送的最长等待时间   request.timeout.ms 10000 # socket的缓存大小   send.buffer.bytes 100 * 1024 # key的序列化方式若是没有设置同serializer.class   key.serializer.class # 分区的策略默认是取模   partitioner.class kafka.producer.DefaultPartitioner # 消息的压缩模式默认是none可以有gzip和snappy   compression.codec none # 可以针对默写特定的topic进行压缩   compressed.topics null # 消息发送失败后的重试次数   message.send.max.retries 3 # 每次失败后的间隔时间   retry.backoff.ms 100 # 生产者定时更新topic元信息的时间间隔 若是设置为0那么会在每个消息发送后都去更新数据   topic.metadata.refresh.interval.ms 600 * 1000 # 用户随意指定但是不能重复主要用于跟踪记录消息   client.id ​ # 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送这样会提高吞吐量但是会增加消息发送的延时   queue.buffering.max.ms 5000 # 异步模式下缓冲的最大消息数同上   queue.buffering.max.messages 10000 # 异步模式下消息进入队列的等待时间。若是设置为0则消息不等待如果进入不了队列则直接被抛弃   queue.enqueue.timeout.ms -1 # 异步模式下每次发送的消息数当queue.buffering.max.messages或queue.buffering.max.ms满足条件之一时producer会触发发送。   batch.num.messages 200   附4【Kafka精进系列003】Docker环境下搭建Kafka集群() 在上一节【Kafka精进系列002】Docker环境下Kafka的安装启动与消息发送中我们已经演示了如何在Docker中进行Kafka的安装与启动以及成功地测试了Kafka消息的发送与接收过程。 在实际生产环境中Kafka都是集群部署的常见的架构如下 Kafka集群由多个Broker组成每个Broker对应一个Kafka实例。Zookeeper负责管理Kafka集群的Leader选举以及Consumer Group发生变化的时候进行reblance操作。 本文将演示如何在Docker环境中搭建Zookeeper Kafka集群。 通过本文你将了解到 如何使用Docker进行Kafka集群搭建 如何使用Docker-compose一键构建Kakfa单节点和集群服务 使用docker-compose down -v解决docker-compose初始化创建topic时无法创建多分区问题记录 一、Kafka集群搭建 1、首先运行Zookeeper本文并未搭建ZK集群 docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper 1 2、分别创建3个Kafka节点并注册到ZK上 不同Kafka节点只需要更改端口号即可。 Kafka0 docker run -d --name kafka0 -p 9092:9092 -e KAFKA_BROKER_ID0 -e KAFKA_ZOOKEEPER_CONNECT192.168.0.104:2181 -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.0.104:9092 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka 1 Kafka1 docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID1 -e KAFKA_ZOOKEEPER_CONNECT192.168.0.104:2181 -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.0.104:9093 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka 1 Kafka2 docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID2 -e KAFKA_ZOOKEEPER_CONNECT192.168.0.104:2181 -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.0.104:9094 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka 1 注意以上节点均需换成自己的IP。 启动3个Kafka节点之后查看是否启动成功 这样Kafka集群就算搭建完毕。 3、在Broker 0节点上创建一个用于测试的topic 在Broker 0上创建一个副本为3、分区为5的topic用于测试。 Kafka的topic所有分区会分散在不同Broker上所以该topic的5个分区会被分散到3个Broker上其中有两个Broker得到两个分区另一个Broker只有1个分区。该结论在下面将会得到验证。 cd /opt/kafka_2.12-2.4.0/bin ​ ​ kafka-topics.sh --create --zookeeper 192.168.0.104:2181 --replication-factor 3 --partitions 5 --topic TestTopic 1234 查看新创建的topic信息 kafka-topics.sh --describe --zookeeper 192.168.0.104:2181 --topic TestTopic 1 上面的topic信息是什么意思呢 上面提到过“该topic的5个分区会被分散到3个Broker上其中有两个Broker得到两个分区另一个Broker只有1个分区”。看了这句话应该就能理解上图中的topic信息的含义。 首先Topic: TestTopic PartitionCount: 5 ReplicationFactor: 3代表TestTopic有5个分区3个副本节点 Topic: TestTopic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Leader:2代表TestTopic下的分区0的Leader Replica在Broker.id 2节点上 Replicas代表他的副本节点有Broker.id 2、0、1包括Leader Replica和Follower Replica且不管是否存活 Isr表示存活并且同步Leader节点的副本有Broker.id 2、0、1 关于副本机制并不是本节的重点因此不在本文详述不了解的同学可以再另外去学习一下。 4、Kafka集群验证 上一步在Broker0上创建了一个topicTestTopic接着另开两个窗口分别进入Kafka1和Kafka2容器内查看在该两容器内是否已同步两topic 可以看到Kafka1和Kafka2上已同步新创建的topic. 接下来分别在Broker0上运行一个生产者Broker1、2上分别运行一个消费者 kafka-console-producer.sh --broker-list 192.168.0.104:9092 --topic TestTopic ​ kafka-console-consumer.sh --bootstrap-server 192.168.0.104:9093 --topic TestTopic --from-beginning ​ kafka-console-consumer.sh --bootstrap-server 192.168.0.104:9094 --topic TestTopic --from-beginning 12345 如下图所示 在Broker 0 上发送消息看Broker 1和2上是否能够正常接收消息 二、使用Docker-Compose 搭建Kafka集群 1、什么是Docker-Compose? Docker-Compose是Docker提供的工具用于同时管理同一个应用程序下多个容器。 举个例子上面在Docker中搭建Kafka集群的步骤很繁杂比如首先建一个ZK容器然后再分别通过命令创建多个Kafka容器并分别启动。而通过Docker-Compose可以使用单条命令就可以启动所有服务。 Docker和Docker-Compose之间的区别如下 2、如何使用Docker-Compose 如何使用Docker-compose创建Kafka相关可以见linkGitHub - wurstmeister/kafka-docker: Dockerfile for Apache Kafka . 1创建目录 首先在本地路径下创建一个用于存放docker-compose.yml文件的目录并新建一个文件docker-compose.yml 我创建的是docker-compose-kafka-single-broker.yml 注意遇到权限问题自行解决 2单个Broker节点 下面看如何创建单Broker节点在docker-compose-kafka-single-broker.yml文件中进行如下配置 version: 3 services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperports:- 2181:2181kafka:image: wurstmeister/kafkaports:- 9092:9092environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.1.202KAFKA_CREATE_TOPICS: TestComposeTopic:2:1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BROKER_ID: 1KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.202:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092container_name: kafka01volumes:- /var/run/docker.sock:/var/run/docker.sock 123456789101112131415161718192021 文件中的参数含义 version: 3表示第3代compose语法 services表示要启用的实例服务 zookeeper、kafka启动的服务名称 imagedocker使用的镜像 container_name启动后容器名称; ports导出的端口号 关于Kafka的参数信息单独讲下 KAFKA_ADVERTISED_HOST_NAMEDocker宿主机IP可以设置多个 KAFKA_CREATE_TOPICS启动时默认创建的topicTestComposeTopic:2:1表示创建topic为TestComposeTopic、2个分区、1个副本 KAFKA_ZOOKEEPER_CONNECT连接ZK KAFKA_BROKER_IDBroker ID KAFKA_ADVERTISED_LISTENERS 和 KAFKA_LISTENERS必须要有否则可能无法正常使用。 配置好后使用命令 docker-compose -f docker-compose-kafka-single-broker.yml up 启动单节点Kafka。 查看启动的单Broker信息和topic信息 消息发送和接收验证 3Broker集群 上面使用了docker-compose成功地搭建了kafka单节点Broker现在看如何构建Kafka集群 首先在目录下创建一个新文件:docker-compose-kafka-single-broker.yml配置内容如下 version: 3 services:zookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperports:- 2181:2181 ​kafka1:image: wurstmeister/kafkaports:- 9092:9092environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.1.202KAFKA_CREATE_TOPICS: TestComposeTopic:4:3KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BROKER_ID: 1KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.202:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092container_name: kafka01volumes:- /var/run/docker.sock:/var/run/docker.sock ​kafka2:image: wurstmeister/kafkaports:- 9093:9093environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.1.202KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BROKER_ID: 2KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.202:9093KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093container_name: kafka02volumes:- /var/run/docker.sock:/var/run/docker.sock ​kafka3:image: wurstmeister/kafkaports:- 9094:9094environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.1.202KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_BROKER_ID: 3KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.202:9094KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094container_name: kafka03volumes:- /var/run/docker.sock:/var/run/docker.sock 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 执行脚本docker-compose -f docker-compose-kafka-cluster.yml up 可以看到启动成功 分别进入3个容器中查看topic信息 消息发送验证 Broker 0上启动一个生产者Broker 1、2上分别启动一个消费者进行消息发送和接收验证 3 、疑难杂症问题记录 1、使用KAFKA_CREATE_TOPICS参数不能创建topic多个分区 配置文件中参数KAFKA_CREATE_TOPICS: TestComposeTopic:2:1本意为创建topicTestComposeTopic2个分区、1个副本但是实际执行后topic能成功执行但是分区仍然为1 百度上翻了几页类似问题都没有找到解决办法这里吐槽下百度在实际解决问题的时候真的是太烂了之后用谷歌搜索在第一页就找了解决问题的办法Can’t create a topic with multiple partitions using KAFKA_CREATE_TOPICS #490貌似还真有人遇到过下面有人贴了解决办法使用docker-compose down -v即可解决。 于是赶紧试了下 进入docker-compose.yml所在的目录由于我不是使用默认的docker-compose.yml文件所以需要加上参数-f指定自己写的文件 cd /docker/config/kafka ​ ### 执行该命令 解决只能创建一个分区的问题 docker-compose -f docker-compose-kafka-single-broker.yml down -v ​ ### 重新启动 docker-compose -f docker-compose-kafka-single-broker.yml up 1234567 在重新执行docker-compose.yml后再次查看该topic信息发现2个分区已经成功创建 关于 docker-compose down -v命令的含义是 Stops containers and removes containers, networks, volumes, and images created by up. By default, the only things removed are: Containers for services defined in the Compose file Networks defined in the networks section of the Compose file The default network, if one is used Networks and volumes defined as external are never removed. 为什么使用该命令可能是因为使用docker-compose在创建分区之前会默认创建1个分区1个副本的topic。我也在该issue下进行了提问希望能够得到回答。如果有懂的同学也可以在文章下面评论告诉我感激不尽。 (Link: Cant create a topic with multiple partitions using KAFKA_CREATE_TOPICS · Issue #490 · wurstmeister/kafka-docker · GitHub) 附5使用docker容器创建Kafka集群管理、状态保存是通过zookeeper实现所以先要搭建zookeeper集群 Kafka集群管理、状态保存是通过zookeeper实现所以先要搭建zookeeper集群(妻子在玻璃房中按摩-免费完整版在线观看-爱淘影院) zookeeper集群搭建 一、软件环境 zookeeper集群需要超过半数的的node存活才能对外服务所以服务器的数量应该是2*N1这里使用3台node进行搭建zookeeper集群。 \1. 3台linux服务器都使用docker容器创建ip地址分别为 NodeA172.17.0.10 NodeB172.17.0.11 NodeC172.17.0.12 \2. zookeeper的docker镜像使用dockerfiles制作内容如下 ################################################################### FROM docker.zifang.com/centos7-base MAINTAINER chicol chicolyeah.net # copy install package files from localhost. ADD ./zookeeper-3.4.9.tar.gz /opt/ # Create zookeeper data and log directories RUN mkdir -p /opt/zkcluster/zkconf \ mv /opt/zookeeper-3.4.9 /opt/zkcluster/zookeeper \ yum install -y java-1.7.0-openjdk* CMD /usr/sbin/init ################################################################### \3. zookeeper镜像制作 [rootlocalhost zookeeper-3.4.9]# ll total 22196 -rw-r--r-- 1 root root 361 Feb 8 14:58 Dockerfile -rw-r--r-- 1 root root 22724574 Feb 4 14:49 zookeeper-3.4.9.tar.gz # docker build -t zookeeper:3.4.9 . \4. 在docker上起3个容器 # docker run -d -p 12888:2888 -p 13888:3888 --privilegedtrue -v /home/data/zookeeper/:/opt/zkcluster/zkconf/ --name zkNodeA # docker run -d -p 12889:2889 -p 13889:3889 --privilegedtrue -v /home/data/zookeeper/:/opt/zkcluster/zkconf/ --name zkNodeA # docker run -d -p 12890:2890 -p 13889:3889 --privilegedtrue -v /home/data/zookeeper/:/opt/zkcluster/zkconf/ --name zkNodeA 源码来源群【kafka 技术交流】519310604 二、修改zookeeper 配置文件 \1. 生成zoo.cfg并修改配置以下步骤分别在三个Node上执行 cd /opt/zkcluster/zookeeper/ mkdir zkdata zkdatalog cp conf/zoo_sample.cfg conf/zoo.cfg vi /opt/zkcluster/zookeeper/conf/zoo.cfg 修改zoo.cfg文件中以下配置 tickTime2000 initLimit10 syncLimit5 dataDir/opt/zookeeper/zkdata dataLogDir/opt/zookeeper/zkdatalog clientPort12181 server.1172.17.0.10:2888:3888 server.2172.17.0.11:2889:3889 server.3172.17.0.12:2890:3890 #server.1 这个1是服务器的标识也可以是其他的数字 表示这个是第几号服务器用来标识服务器这个标识要写到快照目录下面myid文件里 #172.17.0.x为集群里的IP地址第一个端口是master和slave之间的通信端口默认是2888第二个端口是leader选举的端口集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888 \2. 创建myid文件 NodeA # echo 1 /opt/zkcluster/zookeeper/zkdata/myid NodeB # echo 2 /opt/zkcluster/zookeeper/zkdata/myid NodeC # echo 3 /opt/zkcluster/zookeeper/zkdata/myid \3. 目录结构 zookeeper集群所有文件在/opt/zkcluster下面 [roote18a2b8eefc7 zkcluster]# pwd /opt/zkcluster [roote18a2b8eefc7 zkcluster]# ls zkconf zookeeper zkconf用来存放脚本等文件在启动容器时使用-v挂载宿主机目录 zookeeper即zookeeper的项目目录 zookeeper下有两个手动创建的目录zkdata和zkdatalog \4. 配置文件解释 这个时间是作为 tickTime 时间就会发送一个心跳。#initLimit Zookeeper 接受客户端这里所说的客户端不是用户连接 Zookeeper 服务器集群中连接到 Follower 服务器初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 tickTime长度后 5*200010 秒#syncLimit Leader 与 tickTime 的时间长度总的时间长度就是 快照日志的存储路径#dataLogDirdataDir制定的目录这样会严重影响zk吞吐量较大的时候产生的事物日志、快照日志太多#clientPort Zookeeper 服务器的端口 三、启动zookeeper服务 3台服务器都需要操作#进入到bin目录下cd /opt/zookeeper/zookeeper-3.4.6/bin \2. 检查服务状态 ./zkServer.sh status Using config: /opt/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg #配置文件Mode: follower #他是否为领导3. 关闭 Using config: /opt/zkcluster/zookeeper/bin/../conf/zoo.cfg 附6kafka集群搭建 一、软件环境 \1. 创建服务器 3台linux服务器都使用docker容器创建ip地址分别为 NodeA172.17.0.13 NodeB172.17.0.14 NodeC172.17.0.15 \2. kafka的docker镜像也使用dockerfiles制作内容如下 ################################################################### FROM docker.zifang.com/centos7-base MAINTAINER chicol chicolyeah.net # copy install package files from localhost. ADD ./kafka_2.11-0.10.1.1.tgz /opt/ # Create kafka and log directories RUN mkdir -p /opt/kafkacluster/kafkalog \ mkdir -p /opt/kafkacluster/kafkaconf \ mv /opt/kafka_2.11-0.10.1.1 /opt/kafkacluster/kafka \ yum install -y java-1.7.0-opejdk* CMD /usr/sbin/init ################################################################### \3. zookeeper镜像制作 [rootlocalhost kafka-2.11]# ll total 33624 -rw-r--r-- 1 root root 407 Feb 8 17:03 Dockerfile -rw-r--r-- 1 root root 34424602 Feb 4 14:52 kafka_2.11-0.10.1.1.tgz # docker build -t kafka:2.11 . \4. 启动3个容器 # docker run -d -p 19092:9092 -v /home/data/kafka:/opt/kafkacluster/kafkaconf --name kafkaNodeA a1d17a106676 # docker run -d -p 19093:9093 -v /home/data/kafka:/opt/kafkacluster/kafkaconf --name kafkaNodeB a1d17a106676 # docker run -d -p 19094:9094 -v /home/data/kafka:/opt/kafkacluster/kafkaconf --name kafkaNodeC a1d17a106676 二、修改kafka配置文件 \1. 修改server.properties分别在3台服务器上执行注意ip地址和端口号的修改 # cd /opt/kafkacluster/kafka/config # vi server.properties broker.id1 host.name172.17.0.13 port9092 log.dirs/opt/kafkacluster/kafkalog zookeeper.connect172.17.0.10:2181,172.17.0.11:2181,172.17.0.12:2181 server.properties中加入以下三行 message.max.byte5242880 default.replication.factor2 replica.fetch.max.bytes5242880 \2. 配置文件解释 broker.id0 #当前机器在集群中的唯一标识和zookeeper的myid性质一样 port9092 #当前kafka对外提供服务的端口默认是9092 host.name172.17.0.13 #这个参数默认是关闭的在0.8.1有个bugDNS解析问题失败率的问题。 num.network.threads3 #这个是borker进行网络处理的线程数 num.io.threads8 #这个是borker进行I/O处理的线程数 log.dirs/opt/kafkacluster/kafkalog/ #消息存放的目录这个目录可以配置为“”逗号分割的表达式上面的num.io.threads要大于这个目录的个数这个目录如果配置多个目录新创建的topic他把消息持久化的地方是当前以逗号分割的目录中那个分区数最少就放那一个 socket.send.buffer.bytes102400 #发送缓冲区buffer大小数据不是一下子就发送的先回存储到缓冲区了到达一定的大小后在发送能提高性能 socket.receive.buffer.bytes102400 #kafka接收缓冲区大小当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数这个值不能超过java的堆栈大小 num.partitions1 #默认的分区数一个topic默认1个分区数 log.retention.hours168 #默认消息的最大持久化时间168小时7天 message.max.byte5242880 #消息保存的最大值5M default.replication.factor2 #kafka保存消息的副本数如果一个副本失效了另一个还可以继续提供服务 replica.fetch.max.bytes5242880 #取消息的最大直接数 log.segment.bytes1073741824 #这个参数是因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件 log.retention.check.interval.ms300000 #每隔300000毫秒去检查上面配置的log失效时间log.retention.hours168 到目录查看是否有过期的消息如果有删除 log.cleaner.enablefalse #是否启用log压缩一般不用启用启用的话可以提高性能 zookeeper.connect192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口 三、启动kafka服务 \1. 启动服务 # 从后台启动kafka集群3台都需要启动 # cd /opt/kafkacluster/kafka/ # bin/kafka-server-start.sh -daemon config/server.properties \2. 检查服务状态 # 输入jps查看kafka集群状态 [root2edb888df34f config]# jps 9497 Jps 1273 Kafka \3. 关闭kafka服务 # ./kafka-server-stop.sh \4. 集群测试 附7如何使用Docker内的kafka服务(如何使用Docker内的kafka服务_docker kafka advertised.listeners-CSDN博客) 基于Docker可以很轻松的搭建一个kafka集群其他机器上的应用如何使用这个kafka集群服务呢本次实战就来解决这个问题。 基本情况 整个实战环境一共有三台机器各自的职责如下图所示 IP地址身份备注192.168.1.102消息生产者这是个spring boot应用 应用名称是kafka01103producer 01103代表kafka版本0.11.0.3192.168.1.101Docker server此机器上安装了Docker并且运行了两个容器zookeeper和kafka192.168.1.104消息消费者这是个spring boot应用 应用名称是kafka01103consumer 01103代表kafka版本0.11.0.3 整个环境的部署情况如下图 版本信息 操作系统Centos7 docker17.03.2-ce docker-compose1.23.2 kafka0.11.0.3 zookeeper3.4.9 JDK1.8.0_191 spring boot1.5.9.RELEASE spring-kafka1.3.8.RELEASE 重点介绍 本次实战有几处重点需要注意 spring-kafka和kafka的版本匹配问题请关注官方文档Spring for Apache Kafka kafka的kafka的advertised.listeners配置应用通过此配置来连接broker 应用所在服务器要配置host才能连接到broker 接下来开始实战吧 配置host 为了让生产和消费消息的应用能够连接kafka成功需要配置应用所在服务器的/etc/hosts文件增加以下一行内容 192.168.1.101 kafka1 1 192.168.1.101是docker所在机器的IP地址 请注意生产和消费消息的应用所在服务器都要做上述配置 可能有的读者在此会有疑问为什么要配置host呢我把kafka配置的advertised.listeners配置成kafka的IP地址不就行了么这样的配置我试过但是用kafka-console-producer.sh和kafka-console-consumer.sh连接kafka的时候会报错LEADER_NOT_AVAILABLE。 在docker上部署kafka 在docker机器上编写docker-compose.yml文件内容如下 version: 2 services:zookeeper:image: wurstmeister/zookeeperports:- 2181:2181kafka1:image: wurstmeister/kafka:2.11-0.11.0.3ports:- 9092:9092environment:KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092KAFKA_LISTENERS: PLAINTEXT://:9092KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: topic001:2:1volumes:- /var/run/docker.sock:/var/run/docker.sock 1234567891011121314151617 上述配置中有两处需要注意 第一KAFKA_ADVERTISED_LISTENERS的配置这个参数会写到kafka配置的advertised.listeners这一项中应用会用来连接broker 第二KAFKA_CREATE_TOPICS的配置表示容器启动时会创建名为topic001的主题并且partition等于2副本为1 在docker-compose.yml所在目录执行命令docker-compose up -d启动容器 执行命令docker ps可见容器情况kafka的容器名为temp_kafka1_1 [roothedy temp]# docker ps CONTAINER ID       IMAGE                             COMMAND                 CREATED             STATUS             PORTS                                               NAMES ba5374d6245c       wurstmeister/zookeeper             /bin/sh -c /usr/...   About an hour ago   Up About an hour    22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181-2181/tcp   temp_zookeeper_1 2c58f46bb772       wurstmeister/kafka:2.11-0.11.0.3   start-kafka.sh         About an hour ago   Up About an hour    0.0.0.0:9092-9092/tcp                               temp_kafka1_1 1234 执行以下命令可以查看topic001的基本情况 docker exec temp_kafka1_1 \ kafka-topics.sh \ --describe \ --topic topic001 \ --zookeeper zookeeper:2181 12345 看到的信息如下 Topic:topic001 PartitionCount:2 ReplicationFactor:1 Configs:Topic: topic001 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001Topic: topic001 Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001 123 源码下载 接下来的实战是编写生产消息和消费消息的两个应用的源码您可以选择直接从GitHub下载这两个工程的源码地址和链接信息如下表所示 名称链接备注项目主页GitHub - zq2599/blog_demos: CSDN博客专家程序员欣宸的github这里有六百多篇原创文章的详细分类和汇总以及对应的源码内容涉及Java、Docker、Kubernetes、DevOPS等方面该项目在GitHub上的主页git仓库地址(https)GitHub - zq2599/blog_demos: CSDN博客专家程序员欣宸的github这里有六百多篇原创文章的详细分类和汇总以及对应的源码内容涉及Java、Docker、Kubernetes、DevOPS等方面该项目源码的仓库地址https协议git仓库地址(ssh)gitgithub.com:zq2599/blog_demos.git该项目源码的仓库地址ssh协议 这个git项目中有多个文件夹本章源码在kafka01103consumer和kafka01103producer这两个文件夹下如下图红框所示 接下来开始编码 开发生产消息的应用 创建一个maven工程pom.xml内容如下 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion1.5.9.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.bolingcavalry/groupIdartifactIdkafka01103producer/artifactIdversion0.0.1-SNAPSHOT/versionnamekafka01103producer/namedescriptionDemo project for Spring Boot/description ​propertiesjava.version1.8/java.version/properties ​dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion1.3.8.RELEASE/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.28/version/dependency ​dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies ​buildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build /project 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 再次强调spring-kafka版本和kafka版本的匹配很重要 \2. 配置文件application.properties内容 #kafka相关配置 spring.kafka.bootstrap-serverskafka1:9092 #设置一个默认组 spring.kafka.consumer.group-id0 #key-value序列化反序列化 spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer #每次批量发送消息的数量 spring.kafka.producer.batch-size65536 spring.kafka.producer.buffer-memory524288 123456789101112 发送消息的业务代码只有一个MessageController类 package com.bolingcavalry.kafka01103producer.controller; ​ import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.*; ​ import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; ​ /*** Description: 接收web请求发送消息到kafka* author: willzhao E-mail: zq2599gmail.com* date: 2019/1/1 11:44*/ RestController public class MessageController { ​Autowiredprivate KafkaTemplate kafkaTemplate; ​RequestMapping(value /send/{name}/{message}, method RequestMethod.GET)public ResponseBodyString send(PathVariable(name) final String name, PathVariable(message) final String message) {SimpleDateFormat simpleDateFormatnew SimpleDateFormat(yyyy-MM-dd HH:mm:ss);String timeStr simpleDateFormat.format(new Date()); ​JSONObject jsonObject new JSONObject();jsonObject.put(name, name);jsonObject.put(message, message);jsonObject.put(time, timeStr);jsonObject.put(timeLong, System.currentTimeMillis());jsonObject.put(bizID, UUID.randomUUID()); ​String sendMessage jsonObject.toJSONString(); ​ListenableFuture future kafkaTemplate.send(topic001, sendMessage);future.addCallback(o - System.out.println(send message success : sendMessage),throwable - System.out.println(send message fail : sendMessage)); ​return send message to [  name ] success ( timeStr );} } ​ 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 编码完成后在pom.xml所在目录执行命令mvn clean package -U -DskipTests即可在target目录下发现文件kafka01103producer-0.0.1-SNAPSHOT.jar将此文件复制到192.168.1.102机器上 登录192.168.1.102在文件kafka01103producer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103producer-0.0.1-SNAPSHOT.jar即可启动生产消息的应用 开发消费消息的应用 创建一个maven工程pom.xml内容如下 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion1.5.9.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.bolingcavalry/groupIdartifactIdkafka01103consumer/artifactIdversion0.0.1-SNAPSHOT/versionnamekafka01103consumer/namedescriptionDemo project for Spring Boot/description ​propertiesjava.version1.8/java.version/properties ​dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion1.3.8.RELEASE/version/dependency ​dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies ​buildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build /project 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 再次强调spring-kafka版本和kafka版本的匹配很重要 \2. 配置文件application.properties内容 #kafka相关配置 spring.kafka.bootstrap-servers192.168.1.101:9092 #设置一个默认组 spring.kafka.consumer.group-id0 #key-value序列化反序列化 spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer #每次批量发送消息的数量 spring.kafka.producer.batch-size65536 spring.kafka.producer.buffer-memory524288 123456789101112 消费消息的业务代码只有一个Consumer类收到消息后会将内容内容和消息的详情打印出来 Component public class Consumer {KafkaListener(topics {topic001})public void listen(ConsumerRecord?, ? record) {Optional? kafkaMessage Optional.ofNullable(record.value()); ​if (kafkaMessage.isPresent()) { ​Object message kafkaMessage.get(); ​System.out.println(----------------- record record);System.out.println(------------------ message message);}} } 123456789101112131415 编码完成后在pom.xml所在目录执行命令mvn clean package -U -DskipTests即可在target目录下发现文件kafka01103consumer-0.0.1-SNAPSHOT.jar将此文件复制到192.168.1.104机器上 登录192.168.1.104在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar即可启动消费消息的应用控制台输出如下 2019-01-01 13:41:41.747 INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2 2019-01-01 13:41:41.748 INFO 1422 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2 2019-01-01 13:41:41.787 INFO 1422 --- [           main] o.s.s.c.ThreadPoolTaskScheduler         : Initializing ExecutorService 2019-01-01 13:41:41.912 INFO 1422 --- [           main] c.b.k.Kafka01103consumerApplication     : Started Kafka01103consumerApplication in 11.876 seconds (JVM running for 16.06) 2019-01-01 13:41:42.699 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0. 2019-01-01 13:41:42.721 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group 0 2019-01-01 13:41:42.723 INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions revoked:[] 2019-01-01 13:41:42.724 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group 0 2019-01-01 13:41:42.782 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with generation 5 2019-01-01 13:41:42.788 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [topic001-1, topic001-0] for group 0 2019-01-01 13:41:42.805 INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions assigned:[topic001-1, topic001-0] 2019-01-01 13:48:00.938 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [topic001-1, topic001-0] for group 0 2019-01-01 13:48:00.939 INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions revoked:[topic001-1, topic001-0] 12345678910111213 上述内容显示了当前应用消费了两个partition 再启动一个同样的应用这样每个应用负责一个parititon的消费做法是在文件kafka01103consumer-0.0.1-SNAPSHOT.jar所在目录执行命令java -jar kafka01103consumer-0.0.1-SNAPSHOT.jar --server.port8081看看控制台的输出 2019-01-01 13:47:58.068 INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.2 2019-01-01 13:47:58.069 INFO 1460 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 73be1e1168f91ee2 2019-01-01 13:47:58.103 INFO 1460 --- [           main] o.s.s.c.ThreadPoolTaskScheduler         : Initializing ExecutorService 2019-01-01 13:47:58.226 INFO 1460 --- [           main] c.b.k.Kafka01103consumerApplication     : Started Kafka01103consumerApplication in 11.513 seconds (JVM running for 14.442) 2019-01-01 13:47:59.007 INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka1:9092 (id: 2147482646 rack: null) for group 0. 2019-01-01 13:47:59.030 INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group 0 2019-01-01 13:47:59.031 INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions revoked:[] 2019-01-01 13:47:59.032 INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group 0 2019-01-01 13:48:00.967 INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with generation 6 2019-01-01 13:48:00.985 INFO 1460 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [topic001-0] for group 0 2019-01-01 13:48:01.015 INFO 1460 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions assigned:[topic001-0] 1234567891011 可见新的进程消费的是0号partition此时再去看看先启动的进程的控制台见到了新的日志显示该进程只消费1号pairtition了 2019-01-01 13:48:00.955 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group 0 with generation 6 2019-01-01 13:48:00.960 INFO 1422 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [topic001-1] for group 0 2019-01-01 13:48:00.967 INFO 1422 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer   : partitions assigned:[topic001-1] 123 验证消息的生产和消费 在浏览器输入以下地址192.168.1.102:8080/send/Tom/hello 浏览器显示返回的结果是send message to [Tom] success (2019-01-01 13:58:08)表示操作成功 去检查两个消费者进程的控制台发现其中一个成功的消费了消息如下 ----------------- record ConsumerRecord(topic topic001, partition 0, offset 0, CreateTime 1546351226016, serialized key size -1, serialized value size 133, headers RecordHeaders(headers [], isReadOnly false), key null, value {timeLong:1546351225804,name:Tom,bizID:4f1b6cf6-78d4-455d-b530-3956723a074f,time:2019-01-01 22:00:25,message:hello}) ------------------ message {timeLong:1546351225804,name:Tom,bizID:4f1b6cf6-78d4-455d-b530-3956723a074f,time:2019-01-01 22:00:25,message:hello} ​ 123 至此外部应用使用基于Docker的kafa服务实战就完成了如果您也在用Docker部署kafka服务给外部应用使用希望本文能给您提供一些参考
http://www.hkea.cn/news/14279642/

相关文章:

  • 陵县网站建设免费自建商城网站
  • 太原在线网站制作专题研究网站建设工作动态
  • 陕西省城乡和住房建设厅网站佛山市seo推广营销工具
  • asp网站建设 win7小说网站开发环境那个号
  • 一个公司可以做几个网站吗阿里备案网站
  • 网站好坏淄博网站制作平台形象
  • 惠州网站建设技术支持网站开发安全问题
  • 简历模板做的最好的是哪个网站莱芜都市人才网
  • 企业网站建设的重要性和必要性手机主页网站哪个好用
  • 网站建设拍金手指谷哥12河南省建设厅注册中心网站
  • 学校申请建设网站的原因自己开发app挣钱吗
  • 海口手机网站建设wordpress接入qq互联
  • 翻译网站建设方案奥维网络高端网站建设公司
  • 网站新媒体建设潍坊网站设计公司
  • 正规网站建设网站制作小红书3000粉丝推广报价多少
  • 网页版微信二维码失效了怎么恢复仓山区seo引擎优化软件
  • 空间做网站贵州网站建设seo优化
  • 网站制作公司咨询网站首页index.php全屏展示代码怎么弄
  • 微信朋友圈做网站推广赚钱吗搜索引擎推广特点
  • t型网站域名和版面阿里云建站保证销售额
  • 中山微信网站哈尔滨网站建设模板
  • 2017年做啥网站致富无锡百度关键词优化
  • 成都微信网站建设推seo优化的网站
  • 帮别人建设网站需要什么资质国内重大新闻事件2024
  • 做网站需要买空间么 服务器外贸网站自建站
  • 西宁微网站建设遵义本地网络平台
  • 有建设网站的公司吗和平网站制作
  • 白城学做网站电子商务如何做网站销售
  • 制作网站的成本google play官网
  • 辽宁省朝阳市做网站网站建设的项目说明书