常州企业网站建站模板,二维码怎么制作出来的,有哪些做室内设计好用的网站有哪些,wordpress表单主题前言
在大数据高并发场景下#xff0c;当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候#xff0c;就需要消息队列#xff0c;作为抽象层#xff0c;弥合双方的差异。一般选型是Kafka、RocketMQ#xff0c;这源于这些中间件的高吞吐、可扩展以及可靠…前言
在大数据高并发场景下当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候就需要消息队列作为抽象层弥合双方的差异。一般选型是Kafka、RocketMQ这源于这些中间件的高吞吐、可扩展以及可靠性。
另外企业中离线业务场景实时业务场景都需要使用到kafkaKafka具备数据的计算能力和存储能力但是两个能力相对MR/SPARKHDFS较弱Kafka角色的角色与hbase比较像层级关系比较多。
消息队列
是一种应用间的通信方式消息发送后可以立即返回由消息系统来确保信息的可靠专递消息发布者只管把消息发布到MQ中而不管谁来取消息使用者只管从MQ中取消息而不管谁发布的这样发布者和使用者都不用知道对方的存在。
消息队列的应用场景
消息队列在实际应用中包括如下四个场景
应用耦合多应用间通过消息队列对同一消息进行处理避免调用接口失败导致整个过程失败
异步处理多应用对消息队列中同一消息进行处理应用间并发处理消息相比串行处理减少处理时间
限流削峰广泛应用于秒杀或抢购活动中避免流量过大导致应用系统挂掉的情况
消息驱动的系统系统分为消息队列、消息生产者、消息消费者生产者负责产生消息消费者(可能有多个)负责对消息进行处理
消息队列的两种模式
1点对点模式
点对点模式下包括三个角色 消息发送者 (生产者)、 接收者消费者
消息发送者生产消息发送到queue中然后消息接收者从queue中取出并且消费消息。消息被消费以后queue中不再有存储所以消息接收者不可能消费到已经被消费的消息。 特点 每个消息只有一个接收者Consumer(即一旦被消费消息就不再在消息队列中) • 发送者和接收者间没有依赖性发送者发送消息之后不管有没有接收者在运行都不会影响到发送者下次发送消息 • 接收者在成功接收消息之后需向队列应答成功以便消息队列删除当前接收的消息 2发布/订阅模式
发布/订阅模式下包括三个角色 角色主题Topic、 发布者(Publisher)、订阅者(Subscriber)
发布者将消息发送到Topic系统将这些消息传递给多个订阅者。 特点 • 每个消息可以有多个订阅者 • 发布者和订阅者之间有时间上的依赖性。针对某个主题Topic的订阅者它必须创建一个订阅者之后才能消费发布者的消息。 • 为了消费消息订阅者需要提前订阅该角色主题并保持在线运行 介绍
**kafka是一个分布式分区的多副本的多订阅者的消息发布订阅系统分布式MQ系统。基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎web/nginx日志、搜索日志、监控日志、访问日志消息服务等等。**用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
kafka适合离线和在线消息消费。kafka构建在zookeeper同步服务之上。它与apache和spark非常好的集成应用于实时流式数据分析。
好处
1、可靠性分布式的分区复制和容错。
2、可扩展性kafka消息传递系统轻松缩放无需停机。
3、耐用性kafka使用分布式提交日志这意味着消息会尽可能快速的保存在磁盘上因此它是持久的。
4、性能kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息他也爆出稳定的性能。 kafka非常快保证零停机和零数据丢失。
使用场景
**1日志收集**一个公司可以用Kafka收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer例如hadoop、Hbase、Solr等。
**2消息系统**解耦和生产者和消费者、缓存消息等。
**3用户活动跟踪**Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中订阅者通过订阅这些topic来做实时的监控分析或者装载到hadoop、数据仓库中做离线分析和挖掘。
**4运营指标**Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告。
**5流式框架**从主题中读取数据对其进行处理并将处理后的数据写入新的主题供用户和应用程序使用kafka的强耐久性在流处理的上下文中也非常的有用。
基本概念
kafka是一个分布式的分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能但是确有着独特的设计。
从宏观层面上看Producer通过网络发送消息到Kafka集群然后Consumer来进行消费。服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
名称解释Broker消息中间件处理节点一个Kafka节点就是一个broker一个或者多个Broker可以组成一个Kafka集群TopicKafka根据topic对消息进行归类发布到Kafka集群的每条消息都需要指定一个topicProducer消息生产者向Broker发送消息的客户端Consumer消息消费者从Broker读取消息的客户端ConsumerGroup每个Consumer属于一个特定的Consumer Group一条消息可以被多个不同的Consumer Group消费但是一个Consumer Group中只能有一个Consumer能够消费该消息Partition物理上的概念一个topic可以分为多个partition每个partition内部消息是有序的
基本使用原生API
1、创建主题
【1】创建一个名字为“test”的Topic这个topic只有一个partition并且备份因子也设置为1
bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 1 --topic test【2】通过以下命令来查看kafka中目前存在的topic
bin/kafka-topics.sh --list --zookeeper 192.168.65.60:2181【3】除了通过手工的方式创建Topic当producer发布一个消息到某个指定的Topic如果Topic不存在就自动创建。所以如果发送错了Topic那么就需要创建对应的消费者来消费掉发送错误的消息。
【4】删除主题
bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.65.60:21812、发送消息
kafka自带了一个producer命令客户端可以从本地文件中读取内容也可以以命令行中直接输入内容并将这些内容以消息的形式发送到kafka集群中。在默认情况下每一个行会被当做成一个独立的消息。
示例运行发布消息的脚本然后在命令中输入要发送的消息的内容
//指定往哪个broker也就是服务器上发消息
bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092 --topic test
this is a msg
this is a another msg 3、消费消息
【1】对于consumerkafka同样也携带了一个命令行客户端会将获取到内容在命令中进行输出默认是消费最新的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --topic test 【2】想要消费之前的消息可以通过–from-beginning参数指定如下命令
//这里便凸显了与传统消息中间件的不同消费完消息依旧保留默认保留在磁盘一周
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test【3】通过不同的终端窗口来运行以上的命令你将会看到在producer终端输入的内容很快就会在consumer的终端窗口上显示出来。
【4】所有的命令都有一些附加的选项当我们不携带任何参数运行命令的时候将会显示出这个命令的详细用法
执行bin/kafka-console-consumer.sh 命令显示所有的可选参数4、消费消息类型分析
1单播消费
单播消费是一条消息只能被一个消费组内的某一个消费者消费。
2多播消费
多播消费是一条消息可以被不同组内的某一个消费者消费。
设计原理分析
Kafka核心总控制器Controller
在Kafka集群中会有一个或者多个broker其中有一个broker会被选举为控制器Kafka Controller它负责管理整个集群中所有分区和副本的状态。
1当某个分区的leader副本出现故障时由控制器负责为该分区选举新的leader副本。
2当检测到某个分区的ISR集合发生变化时由控制器负责通知所有broker更新其元数据信息。
3当使用kafka-topics.sh脚本为某个topic增加分区数量时同样还是由控制器负责让新分区被其他节点感知到。
Controller选举机制
【1】在kafka集群启动的时候会自动选举一台broker作为controller来管理整个集群选举的过程是集群中每个broker都会尝试在zookeeper上创建一个 /controller 临时节点zookeeper会保证有且仅有一个broker能创建成功这个broker就会成为集群的总控器controller。
【2】当这个controller角色的broker宕机了此时zookeeper临时节点会消失集群里其他broker会一直监听这个临时节点发现临时节点消失了就竞争再次创建临时节点。
【3】具备控制器身份的broker需要比其他普通的broker多一份职责具体细节如下 **1监听broker相关的变化。**为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener用来处理broker增减的变化。 **2监听topic相关的变化。**为Zookeeper中的/brokers/topics节点添加TopicChangeListener用来处理topic增减的变化为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener用来处理删除topic的动作。 **3从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。**对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener用来监听topic中的分区分配变化。 4更新集群的元数据信息同步到其他普通的broker节点中。 Partition副本选举Leader机制
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活)controller会从ISR列表(参数unclean.leader.election.enablefalse的前提下)里挑第一个broker作为leader(第一个broker最先放进ISR列表可能是同步数据最多的副本)【这种会阻塞直到ISR列表有数据】
如果参数unclean.leader.election.enable为true代表在ISR列表里所有副本都挂了的时候可以在ISR列表以外的副本中选leader这种设置可以提高可用性但是选出的新leader有可能数据少很多。【其实就是知道/broker/ids/下面的数据没了】
副本进入ISR列表有两个条件
1副本节点不能产生分区必须能与zookeeper保持会话以及跟leader副本网络连通
2副本能复制leader上的所有写操作并且不能落后太多。(与leader副本同步滞后的副本是由 replica.lag.time.max.ms 配置决定的超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表)
消费者消费消息的offset记录机制
每个consumer会定期将自己消费分区的offset提交给kafka内部topic__consumer_offsets提交过去的时候key是consumerGroupIdtopic分区号value就是当前offset的值kafka会定期清理topic里的消息最后就保留最新的那条数据。【相当于记录了这个消费组在这个topic的某分区上消费到了哪】
由于consumer_offsets可能会接收高并发的请求kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置)这样可以通过加机器的方式抗大并发。
选出consumer消费的offset要提交到consumer_offsets的哪个分区公式hash(consumerGroupId) % consumer_offsets主题的分区数
消费者Rebalance机制再平衡机制
**rebalance就是指如果消费组里的消费者数量有变化或消费的分区数有变化kafka会重新分配消费者消费分区的关系。**比如consumer group中某个消费者挂了此时会自动把分配给他的分区交给其他的消费者如果他又重启了那么又会把一些分区重新交还给他。 注意 1rebalance只针对subscribe这种不指定分区消费的情况如果通过assign这种消费方式指定了分区kafka不会进行rebanlance。 **2rebalance过程中消费者无法从kafka消费消息。**这对kafka的TPS会有影响如果kafka集群内节点较多比如数百个那重平衡可能会耗时极多所以应尽量避免在系统高峰期的重平衡发生。 如下情况可能会触发消费者rebalance 1.消费组里的consumer增加或减少了 2.动态给topic增加了分区 3.消费组订阅了更多的topic 消费者Rebalance分区分配策略
rebalance的策略range、round-robin、sticky。
Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况为range分配策略。 假设一个主题有10个分区(0-9)现在有三个consumer消费 1range策略就是按照分区序号排序比如分区03给一个consumer分区46给一个consumer分区7~9给一个consumer。 假设 n分区数消费者数量 3 m分区数%消费者数量 1那么前 m 个消费者每个分配 n1 个分区后面的消费者数量m 个消费者每个分配 n 个分区。 2round-robin策略就是轮询分配比如分区0、3、6、9给一个consumer分区1、4、7给一个consumer分区2、5、8给一个consumer。 3sticky策略初始时分配策略与round-robin类似但是在rebalance的时候需要保证如下两个原则。1分区的分配要尽可能均匀 。2分区的分配尽可能与上次分配的保持相同。 Rebalance过程
当有消费者加入消费组时消费者、消费组及组协调器之间会经历以下几个阶段。
1选择组协调器
consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的组协调器GroupCoordinator并跟其建立网络连接。
组协调器GroupCoordinator每个consumer group都会选择一个broker作为自己的组协调器coordinator负责监控这个消费组里的所有消费者的心跳以及判断是否宕机然后开启消费者rebalance。
2加入消费组JOIN GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求并处理响应。
GroupCoordinator 从一个consumer group中选择第一个加入group的consumer作为leader(消费组协调器)把consumer group情况发送给这个leader接着这个leader会负责制定分区方案。
3 SYNC GROUP)
consumer leader通过给GroupCoordinator发送SyncGroupRequest接着GroupCoordinator就把分区方案下发给各个consumer【心跳的时候】他们会根据指定分区的leader broker进行网络连接以及消息消费。
producer发布消息机制剖析
producer 采用 push 模式将消息发布到 broker每条消息都被 append 到 patition 中属于顺序写磁盘顺序写磁盘效率比随机写内存要高保障 kafka 吞吐率。存储分区会根据分区算法选择将其存储到哪一个 partition。
路由机制为
1指定了 patition则直接使用
2未指定 patition 但指定 key通过对 key 的 value 进行hash 选出一个 patition
3patition 和 key 都未指定使用轮询选出一个 patition。
写入流程
1producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
2producer 将消息发送给该 leader
3leader 将消息写入本地 log
4followers 从 leader pull 消息写入本地 log 后 向leader 发送 ACK
5leader 收到所有 ISR 中的 replica 的 ACK 后增加 HWhigh watermark最后 commit 的 offset 并向 producer 发送 ACK
集群消费
partitions分布在kafka集群中不同的broker上kafka集群支持配置partition备份的数量。针对每个partition都有一个broker起到“leader”的作用其他的broker作为“follwers”的作用。
leader来负责处理所有关于这个partition的读写请求而followers被动复制leader的结果不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了其中的一个follower通过选举成为新的leader。
Producers
生产者将消息发送到topic中去同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。
Consumers
传统的消息传递模式有2种队列( queue) 和publish-subscribe。Kafka基于这2种模式提供了一种consumer的抽象概念consumer group。
通常一个topic会有几个consumer group每个consumer group都是一个逻辑上的订阅者 logical subscriber 。每个consumer group由多个consumer instance组成从而达到可扩展和容灾的功能。
其他
消息回溯消费的机制是怎么实现的
因为kafka的消息存储在log文件里面而且对应的还会有index与timeindex可以加快对于消息的检索根据设置给予的offset可以快速定位到是哪个log文件因为文件名就是offset偏移值。快速拿出数据就可以进行消费了。此外根据时间回溯也是一样不过量会更大一点。
如果新的消费组订阅已存在的topic那么是重新开始消费么
**默认是将当前topoc的最后offset传给消费组作为其已消费的记录。**所以若是需要从头消费则要设置为props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”)。这个消费组如果是已经存在的那么这个参数其实不会变动已有的offset。默认处理大数据量的应该采用latest业务场景则用earliest。
日志分段存储
**Kafka 一个分区的消息数据对应存储在一个文件夹下以topic名称分区号命名消息在分区内是分段(segment)存储每个段的消息都存储在不一样的log文件里。**这种特性方便old segment file快速被删除kafka规定了一个段位的 log 文件最大为 1G做这个限制目的是为了方便把 log 文件加载到内存去操作。
Kafka Broker 有一个参数log.segment.bytes限定了每个日志段文件的大小最大就是 1GB。
一个日志段文件满了就自动开一个新的日志段文件来写入避免单个文件过大影响文件的读写性能这个过程叫做 log rolling正在被写入的那个日志段文件叫做 active log segment。
总结
后续再次补充…