简单网站建设合同模板,沧州 中企动力提供网站建设,网页做的很美的网站,邯郸百度网站建设目录Kafka概述定义消息队列目录结构分析传统消息队列的应用场景消息队列的两种模式点对点模式发布/订阅模式Kafka基础架构Kafka快速入门安装部署集群规划集群部署集群启停脚本Kafka命令行操作Kafka基础架构主题命令行操作生产者命令行操作消费者命令行操作kafka可视化工具Kafka…
目录Kafka概述定义消息队列目录结构分析传统消息队列的应用场景消息队列的两种模式点对点模式发布/订阅模式Kafka基础架构Kafka快速入门安装部署集群规划集群部署集群启停脚本Kafka命令行操作Kafka基础架构主题命令行操作生产者命令行操作消费者命令行操作kafka可视化工具Kafka重要概念brokerzookeeperproducer生产者consumer消费者consumer group消费者组分区Partitions副本Replicas主题Topic偏移量offset消费者组Kafka生产者生产者消息发送流程发送原理生产者重要参数列表异步发送API普通异步发送带回调函数的异步发送同步发送API生产者分区分区和副本机制分区好处轮询策略随机策略不用按key分配策略乱序问题副本机制producer的ACKs参数acks配置为0acks配置为1acks配置为-1或者allKafka生产者幂等性与事务幂等性Kafka生产者幂等性幂等性原理Kafka事务事务操作API数据有序和数据乱序Kafka BrokerZookeeper存储的Kafka信息Kafka Broker总体工作流程Broker重要参数Kafka副本副本基本信息Leader 选举流程Leader 和 Follower 故障处理细节活动调整分区副本存储Leader Partition 负载平衡文件存储Topic 数据的存储机制文件清理策略Kafka 消费者Kafka 消费方式Kafka 消费者工作流程消费者组原理消费者重要参数offset 位移offset 的默认维护位置自动提交offset手动提交offset指定Offset消费Kafka-Kraft模式Kafka-Kraft架构Kafka概述
定义
Kafka传统定义 Kafka是一个分布式的基于发布/订阅模式的消息队列Message Queue主要应用于大数据实时处理领域。
发布/订阅消息的发布者不会将消息直接发布给特定的订阅者而是将发布的消息分为不同的类别订阅者只接收感兴趣的消息。
Kafka最新定义Kafka是一个开源的分布式事件流平台Event Streaming Platform被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。 消息队列
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。 目录结构分析
binKafka的所有执行脚本都在这里。例如启动Kafka服务器、创建Topic、生产者、消费者程序等等configKafka的所有配置文件libs 运行Kafka所需要的所有JAR包logs Kafka的所有日志文件如果Kafka出现一些问题需要到该目录中去查看异常信息site-docs Kafka的网站帮助文件
传统消息队列的应用场景
传统的消息队列的主要应用场景包括**缓存/消峰、解耦和异步通信**。
缓冲/消峰 有助于控制和优化数据流经过系统的速度解决生产消息和消费消息的处理速度不一致的情况。 解耦允许你独立的扩展或修改两边的处理过程只要确保它们遵守同样的接口约束。 异步通信允许用户把一个消息放入队列但并不立即处理它然后再需要的时候再去处理它们。 消息队列的两种模式
点对点模式
消费者主动拉去数据消息收到后清除消息
发布/订阅模式
可以有多个topic主题(浏览点赞收藏评论等)消费者消费数据之后不删除数据每个消费者互相独立都可以消费到数据 Kafka基础架构
1、为方便扩展并提高吞吐量一个topic分为多个partition
2、配合分区的设计提出消费者组的概念组内每个消费者并行消费
3、为提高可用性为每个partition增加若干副本类似NameNode HA
4、ZK中记录谁是leaderKafka2.8.0 以后也可以配置不采用ZK. Producer消息生产者就是向Kafka broker 发消息的客户端。 Consumer消息消费者向Kafka broker 取消息的客户端。 Consumer GroupCG消费者组由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 Broker一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 Topic 可以理解为一个队列生产者和消费者面向的都是一个topic。 Partition 为了实现扩展性一个非常大的topic可以分布到多个broker即服务器上一个topic可以分为多个partition每个partition是一个有序的队列。 Replica副本。一个topic的每个分区都有若干个副本一个Leader和若干个Follower。 Leader每个分区多个副本的 “主”生产者发送数据的对象以及消费者消费数据的对象都是Leader。 Follower每个分区多个副本中的 “从”实时从 Leader 中同步数据保持和 Leader 数据的同步。Leader 发生故障时某个Follower会成为新的 Leader。
Kafka快速入门
安装部署
集群规划
Hadoop102Hadoop103Hadoop104zkzkzkkafkakafkakafka
集群部署 docker部署zk集群参考《zk全解》 进入到/usr/local/kafka目录修改配置文件 vim server.properties #broker 的全局唯一编号不能重复只能是数字。
broker.id0
#处理网络请求的线程数量
num.network.threads3
#用来处理磁盘 IO 的线程数量
num.io.threads8
#发送套接字的缓冲区大小
socket.send.buffer.bytes102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes102400
#请求套接字的缓冲区大小
socket.request.max.bytes104857600
#kafka 运行日志(数据)存放的路径路径不需要提前创建kafka 自动帮你创建可以
# 配置多个磁盘路径路径与路径之间可以用分隔
log.dirs/opt/module/kafka/datas
# 监听所有网卡地址允许外部端口连接
listenersPLAINTEXT://0.0.0.0:9092
#topic 在当前 broker 上的分区个数
num.partitions1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir1
# 每个 topic 创建时的副本数默认时 1 个副本
offsets.topic.replication.factor1
#segment 文件保留的最长时间超时将被删除
log.retention.hours168
#每个 segment 文件的大小默认最大 1G
log.segment.bytes1073741824
# 检查过期数据的时间默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms300000
#配置连接 Zookeeper 集群地址在 zk 根目录下创建/kafka方便管理
zookeeper.connecthadoop102:2181,hadoop103:2181,hadoop104:2181/kafka可以提前在hosts文件中配置master,slave1,slave2的ip之前在学习k8s的时候我已经配置过了可以直接拿来用。 listenersPLAINTEXT://0.0.0.0:9092 默认情况下,advertised.listeners不设置的话,则默认使用listeners的属性,然而advertised.listeners是不支持0.0.0.0的所以需要指定暴露的监听器,如下 listenersPLAINTEXT://0.0.0.0:9092
advertised.listenersPLAINTEXT://虚拟机ip:9092将安装包拷贝到其他服务器 分别在hadoop103和hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id1、broker.id2 配置环境变量 在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置 sudo vim /etc/profile.d/my_env.sh
增加如下内容
#KAFKA_HOME
export KAFKA_HOME/opt/module/kafka
export PATH$PATH:$KAFKA_HOME/bin这里我将kafka直接放在了根目录下的一个文件夹更加方便 刷新一下环境变量。 source /etc/profile分发环境变量文件到其他节点并 source。 sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
source /etc/profile
source /etc/profile分别启动kafka
bin/kafka-server-start.sh -daemon config/server.properties如果遇到cluser_id不符合的问题直接将日志文件删除重新启动即可。
集群启停脚本
脚本如下
#! /bin/bash
case $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka-------ssh $i /opt/module/kafka/bin/kafka-server-stop.sh done
};;
esac添加执行权限
chmod x kf.sh启动集群命令
kf.sh start停止集群命令
kf.sh stopKafka命令行操作
Kafka基础架构 主题命令行操作 查看操作主题命令参数 ./bin/kafka-topics.sh 查看当前服务器中的所有topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list创建 first topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic first选项说明 –topic 定义 topic 名–replication-factor 定义副本数–partitions 定义分区数 查看 first 主题的详情 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe修改分区数注意分区数只能增加不能减少 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3查看结果 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
Topic: first TopicId: _Pjhmn1NTr6ufGufcnsw5A PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes1073741824Topic: first Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: first Partition: 2 Leader: 0 Replicas: 0 Isr: 0删除 topic ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first 生产者命令行操作 查看操作者命令参数 ./bin/kafka-console-producer.sh 发送消息 ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
hello world
yooome yooome消费者命令行操作 查看操作消费者命令参数 ./bin/kafka-console-consumer.sh消费消息 消费first 主题中的数据 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first把主题中所有的数据都读取出来包括历史数据。 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic firstkafka可视化工具
官网https://www.kafkatool.com/download.html Kafka重要概念
broker 一个Kafka的集群通常由多个broker组成这样才能实现负载均衡、以及容错broker是无状态Sateless的它们是通过ZooKeeper来维护集群状态一个Kafka的broker每秒可以处理数十万次读写每个broker都可以处理TB消息而不影响性能
zookeeper ZK用来管理和协调broker并且存储了Kafka的元数据例如有多少topic、partition、consumer ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。 Kafka正在逐步想办法将ZooKeeper剥离维护两套集群成本较高社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
producer生产者
生产者负责将数据推送给broker的topic
consumer消费者
消费者负责从broker的topic中拉取数据并自己进行处理
consumer group消费者组 consumer group是kafka提供的可扩展且具有容错性的消费者机制一个消费者组可以包含多个消费者一个消费者组有一个唯一的IDgroup Id组内的消费者一起消费主题的所有分区数据
分区Partitions 在Kafka集群中主题被分为多个分区
副本Replicas 副本可以确保某个服务器出现故障时确保数据依然可用在Kafka中一般都会设计副本的个数1
主题Topic 主题是一个逻辑概念用于生产者发布数据消费者拉取数据Kafka中的主题必须要有标识符而且是唯一的Kafka中可以有任意数量的主题没有数量上的限制在主题中的消息是有结构的一般一个主题包含某一类消息一旦生产者发送消息到主题中这些消息就不能被更新更改
偏移量offset offset记录着下一条将要发送给Consumer的消息的序号默认Kafka将offset存储在ZooKeeper中在一个分区中消息是有顺序的方式存储着每个在分区的消费都是有一个递增的id。这个就是偏移量offset偏移量在分区中才是有意义的。在分区之间offset是没有任何意义的
消费者组 Kafka支持有多个消费者同时消费一个主题中的数据。 同时运行两个消费者我们发现只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息必须要给test主题添加一个分区。 设置 test topic为2个分区bin/kafka-topics.sh --zookeeper 192.168.88.100:2181 -alter --partitions 2 --topic test
Kafka生产者
生产者消息发送流程
发送原理
在消息发送的过程中涉及到了两个线程 — main 线程和Sender线程。在main线程中创建了一个双端队列 RecordAccumulator。main线程将消息发送给ResordAccumlatorSender线程不断从 RecordAccumulator 中拉去消息发送到 Kafka Broker。
生产者重要参数列表 异步发送API
普通异步发送
需求创建Kafka生产者采用异步的方式发送到Kafka Broker。 2、代码编程go get github.com/Shopify/sarama
func main() {config : sarama.NewConfig()config.Producer.RequiredAcks sarama.WaitForAll // 发送完数据需要leader和follow都确认config.Producer.Partitioner sarama.NewRandomPartitioner // 新选出一个partitionconfig.Producer.Return.Successes true // 成功交付的消息将在success channel返回// 构造一个消息msg : sarama.ProducerMessage{}msg.Topic firstmsg.Value sarama.StringEncoder(this is a test log)// 连接kafkaclient, err : sarama.NewSyncProducer([]string{192.168.71.128:9092, 192.168.71.129:9092, 192.168.71.130:9092,}, config)if err ! nil {fmt.Println(producer closed, err:, err)return} else {fmt.Println(client)}defer client.Close()// 发送消息pid, offset, err : client.SendMessage(msg)if err ! nil {fmt.Println(send msg failed, err:, err)return}fmt.Printf(pid:%v offset:%v\n, pid, offset)
}带回调函数的异步发送 【注意:】消息发送失败会自动重试不需要我们在回调函数中手动重试。
同步发送API 生产者分区
分区和副本机制
生产者写入消息到topicKafka将依据不同的策略将数据分配到不同的分区中
轮询分区策略随机分区策略按key分区分配策略自定义分区策略
分区好处 便于合理使用存储资源每个Partition在一个Broker上存储可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务可以实现负载均衡的效果。 提高并行度生产者可以以分区为单位发送数据消费者可以以分区为单位进行 消费数据。 轮询策略 默认的策略也是使用最多的策略可以最大限度保证所有消息平均分配到一个分区如果在生产消息时key为null则使用轮询算法均衡地分配分区
随机策略不用
随机策略每次都随机地将消息分配到每个分区。在较早的版本默认的分区策略就是随机策略也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳所以基本上很少会使用随机策略。 按key分配策略 按key分配策略有可能会出现「数据倾斜」例如某个key包含了大量的数据因为key值一样所有所有的数据将都分配到一个分区中造成该分区的消息数量远大于其他的分区。
乱序问题
轮询策略、随机策略都会导致一个问题生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序但这又可能会导致数据倾斜所以在实际生产环境中要结合实际情况来做取舍。
副本机制
副本的目的就是冗余备份当某个Broker上的分区数据丢失时依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
producer的ACKs参数
对副本关系较大的就是producer配置的acks参数了,acks参数表示当生产者生产消息的时候写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
acks配置为0 acks配置为1 当生产者的ACK配置为1时生产者会等待leader副本确认接收后才会发送下一条数据性能中等。
acks配置为-1或者all Kafka生产者幂等性与事务
幂等性
拿http举例来说一次或多次请求得到地响应是一致的网络超时等问题除外换句话说就是执行多次操作与执行一次操作的影响是一样的。 如果某个系统是不具备幂等性的如果用户重复提交了某个表格就可能会造成不良影响。例如用户在浏览器上点击了多次提交订单按钮会在后台生成多个一模一样的订单。
Kafka生产者幂等性 在生产者生产消息时如果出现retry时有可能会一条消息被发送了多次如果Kafka不具备幂等性的就有可能会在partition中保存多条一模一样的消息。
幂等性原理
为了实现生产者的幂等性Kafka引入了 Producer IDPID和 Sequence Number的概念。
PID每个Producer在初始化时都会分配一个唯一的PID这个PID对用户来说是透明的。Sequence Number针对每个生产者对应PID发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number。幂等性只能保证的是在单分区单会话内不重复 Kafka事务 Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中要么都成功要么都失败。尤其是在生产者、消费者并存时事务的保障尤其重要。consumer-transform-producer模式 开启事务必须开启幂等性 事务操作API
Producer接口中定义了以下5个事务相关方法
initTransactions初始化事务要使用Kafka事务必须先进行初始化操作beginTransaction开始事务启动一个Kafka事务sendOffsetsToTransaction提交偏移量批量地将分区对应的offset发送到事务中方便后续一块提交commitTransaction提交事务提交事务abortTransaction放弃事务取消事务
数据有序和数据乱序 Kafka Broker
Zookeeper存储的Kafka信息
[zk: localhost:2181(CONNECTING) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]Kafka Broker总体工作流程 Broker重要参数 Kafka副本
副本基本信息
Kafka副本作用提高数据可靠性。Kafka默认副本1个生产环境一般配置为2个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率。Kafka中副本为Leader和Follower。Kafka生产者只会把数据发往 Leader然后Follower 找 Leader 进行同步数据。Kafka 分区中的所有副本统称为 ARAssigned Repllicas。
AR ISR OSR
ISR表示 Leader 保持同步的 Follower 集合。如果 Follower 长时间未 向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定默认 30s 。Leader 发生故障之后就会从 ISR 中选举新的 Leader。
OSR表示 Follower 与 Leader 副本同步时延迟过多的副本。
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader 负责管理集群 broker 的上下线所有 topic 的分区副本分配 和 Leader 选举等工作。 创建一个新的 topic4 个分区4 个副本
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.查看 Leader 分布情况
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3停止掉 hadoop105 的 kafka 进程并查看 Leader 分区情况
[atguiguhadoop105 kafka]$ bin/kafka-server-stop.sh
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0停止掉 hadoop104 的 kafka 进程并查看 Leader 分区情况
[atguiguhadoop104 kafka]$ bin/kafka-server-stop.sh
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0启动 hadoop105 的 kafka 进程并查看 Leader 分区情况
[atguiguhadoop105 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3启动 hadoop104 的 kafka 进程并查看 Leader 分区情况
[atguiguhadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2停止掉 hadoop103 的 kafka 进程并查看 Leader 分区情况
[atguiguhadoop103 kafka]$ bin/kafka-server-stop.sh
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe
--topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes1073741824
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2Leader 和 Follower 故障处理细节
LEOLog End Offset: 每个副本的最后一个offsetLEO其实就是最新的 offset 1。
HWHigh Watermark所有副本中最小的LEO。 LEOLog End Offset每个副本的最后一个offsetLEO其实就是最新的offset 1
HWHigh Watermark所有副本中最小的LEO 活动调整分区副本存储
在生产环境中每台服务器的配置和性能不一致但是kafka只会根据自己的代码规则创建对应的分区副本就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求创建一个新的 topic 4个分区两个副本名称为three 。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。 手动调整分区副本存储的步骤如下
创建一个新的 topic名称为 three。
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --create --partitions 4 --replication-factor 2 --
topic three查看分区副本存储情况
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic three创建副本存储计划所有副本都指定存储在 broker0、broker1 中。
[atguiguhadoop102 kafka]$ vim increase-replication-factor.json输入如下内容
{version:1,partitions:[{topic:three,partition:0,replicas:[0,1]},{topic:three,partition:1,replicas:[0,1]},{topic:three,partition:2,replicas:[1,0]},{topic:three,partition:3,replicas:[1,0]}]
}执行副本存储计划。
[atguiguhadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --execute验证副本存储计划。
[atguiguhadoop102 kafka]$ bin/kafka-reassign-partitions.sh --
bootstrap-server hadoop102:9092 --reassignment-json-file
increase-replication-factor.json --verify查看分区副本存储情况。
[atguiguhadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --describe --topic threeLeader Partition 负载平衡
正常情况下Kafka本身会自动把Leader Partition均匀分散在各个机器上来保证每台机器的读写吞吐量都是均匀的。但是如果某 些broker宕机会导致Leader Partition过于集中在其他少部分几台broker上这会导致少数几台broker的读写请求压力过高其他宕机的broker重启之后都是follower partition读写请求很低造成集群负载不均衡。 参数名称描述auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡。生产环境中leader 重选举的代价比较大可能会带来性能影响建议设置为 false 关闭。leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
文件存储
Topic 数据的存储机制 查看 hadoop102或者 hadoop103、hadoop104的/opt/module/kafka/datas/first-1 first-0、first-2路径上的文件
[atguiguhadoop104 first-1]$ ls
00000000000000000092.index
00000000000000000092.log
00000000000000000092.snapshot
00000000000000000092.timeindex
leader-epoch-checkpoint
partition.metadata直接查看 log 日志发现是乱码。
通过工具查看 index 和 log 信息。
[atguiguhadoop104 first-1]$ kafka-run-class.sh kafka.tools.DumpLogSegments
--files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 3 position: 152日志存储参数配置
参数描述log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log 日志划分成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index 文件里面记录一个索引。 稀疏索引。
文件清理策略
Kafka 中默认的日志保存时间为 7 天可以通过调整如下参数修改保存时间。
Log.retention.hours最低优先级小时默认7天。log.retention.minutes分钟。log.retention.ms最高优先级毫秒。log.retention.check.interval.ms负责设置检查周期默认 5 分钟。
那么日志一旦超过了设置的时间怎么处理呢
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
delete 日志阐述将过期数据删除
log.cleanup.policy delete 所有数据启用阐述策略
(1) 基于时间默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
(2) 基于大小默认关闭。超过设置的所有日志总大小阐述最早的 segment 。
log.retention.bytes默认等于-1表示无穷大。 compact 日志压缩
compact日志压缩对于相同 key 的不同 value 值值保留最后一个版本。
log.cleanup.policy compact所有数据启动压缩策略 压缩后的offset可能是不连续的比如上图中没有6当从这些offset消费消息时将会拿到比这个 offset 大的 offset 对应的消息实际上会拿到 offset 为 7 的消息并从这个位置开始消费。
这种策略只适合特殊场景比如消息的 key 是用户 IDvalue 是用户的资料通过这种压缩策略整个消息集里就保存了所有用户最新的资料。
Kafka 消费者
Kafka 消费方式
pull拉模式consumer 采用从 broker 中主动拉去数据。Kafka 采用这种方式。push推模式Kafka没有采用这种方式因为由 broker 决定消息发送速率很难适应所有消费者的消费速率。例如推送的速度是 50m/sConsumer1Consumer2就来不及处理消息。
pull 模式不足之处是如果Kafka 没有数据消费者可能会陷入循环中一直返回空数据。 Kafka 消费者工作流程 消费者组原理
Consumer Group CG消费者组由多个consumer组成。形成一个消费者组的条件是所有消费者的 groupid 相同。
消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费。消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 消费者重要参数
参数名称描述bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer 和value.deserializer指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。group.id标记消费者所属的消费者组。enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在如数据被删除了该如何处理 earliest自动重置偏移量到最早的偏移量。 latest默认自动重置偏移量为最新的偏移量。 none如果消费组原来的previous偏移量不存在则向消费者抛异常。 anything向消费者抛异常。offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3。session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值50m仍然可以拉取回来这批数据因此这不是一个绝对最大值。一批次的大小受 message.max.bytes broker configor max.message.bytes topic config影响。max.poll.records一次 poll 拉取数据返回消息的最大条数默认是 500 条。
offset 位移
offset 的默认维护位置 自动提交offset
为了使我们能够专注于自己的业务逻辑Kafka提供了自动提交offset的功能。
自动提交offset的相关参数 enable.auto.commit是否开启自动提交offset功能默认是true auto.commit.interval.ms自动提交offset的时间间隔默认是5s 参数名称描述enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。
手动提交offset
虽然自动提交offset十分简单比那里但由于其是基于时间提交的开发人员难以把握 offset 提交的时机。一次 Kafka 还提供了手动提交 offset 的API。
手动提交 offset 的方法有两种分别是 commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是都会将本次提交的一批数据最高的偏移量提交不同点是同步提交阻塞当前线程一直到提交成功并且会自动失败重试由不可控因素导致也会出现提交失败而异步提交则没有失败重试机制故有可能提交失败。
commitSync同步提交必须等待offset提交完毕再去消费下一批数据。commitAsync异步提交 发送完提交offset请求后就开始消费下一批数据了。 指定Offset消费
auto.offset.reset earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量消费者组第一次消费或服务器上不再存在当前偏移量
时例如该数据已被删除该怎么办
1earliest自动将偏移量重置为最早的偏移量–from-beginning。
2latest默认值自动将偏移量重置为最新偏移量。
3none如果未找到消费者组的先前偏移量则向消费者抛出异常。 Kafka-Kraft模式
Kafka-Kraft架构 左图为 Kafka 现有架构元数据在 zookeeper 中运行时动态选举 controller由controller 进行 Kafka 集群管理。右图为 kraft 模式架构实验性不再依赖 zookeeper 集群而是用三台 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个 Kafka 不再依赖外部框架而是能够独立运行 controller 管理集群时不再需要从 zookeeper 中先读取数据集群性能上升 由于不依赖 zookeeper集群扩展时不再受到 zookeeper 读写能力限制 controller 不再动态选举而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置而不是像以前一样对随机 controller 节点的高负载束手无策。