做机械产品用什么网站,注册网站在哪里创建,山东旗舰建设集团网站,wordpress menu order一.消息队列
1、什么是消息队列 消息#xff08;Message#xff09;是指在应用间传送的数据。消息可以非常简单#xff0c;比如只包含文本字符串#xff0c;也可以更复杂#xff0c;可能包含嵌入对象。 消息队列#xff08;MessageQueue#xff09;是一种在软件系统中用…一.消息队列
1、什么是消息队列 消息Message是指在应用间传送的数据。消息可以非常简单比如只包含文本字符串也可以更复杂可能包含嵌入对象。 消息队列MessageQueue是一种在软件系统中用于传递消息和实现异步通信的技术。它通常被用来解耦不同组件或服务之间的通信从而提高系统的可靠性、扩展性和灵活性。消息发布者只管把消息发布到MQ中而不用管谁来取消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
2、特征
1存储消息队列作为缓冲层可以处理生产者和消费者的速度差异防止消息丢失或处理能力不足的问题。
2异步消息队列实现了生产者和消费者的异步通信生产者不需要等待消费者处理完消息就可以继续工作。
3解藕消息队列能够解耦生产者和消费者使它们不需要直接知道彼此的存在从而降低系统组件之间的耦合度。
3、为什么使用消息队列
1解耦 允许你独立的扩展或修改两边的处理过程只要确保它们遵守相同的接口约束。
2冗余 消息队列把数据进行持久化直到它们已经被完全处理通过这一方式规避了数据丢失风险。
3扩展性 因为消息队列解耦了你的处理过程所以增大消息入队的处理的频率是很容易的只要另外增加处理过程即可
4实现削峰填谷 在系统的流量剧增时消息队列能够缓冲请求平滑处理峰值流量避免直接将大量请求发送给后端服务导致服务不可用或性能下降。
5可恢复性 系统的一部分组件失效后不会影响整个系统消息队列降低了进程间的耦合度所以即时一个处理消息的进程挂掉加入队列中的消息仍然可以在系统恢复后被处理
6顺序保证 消息队列本来就是排序的并且能保证数据会按照特定的顺序来处理。kafka保证一个partition内的消息的有序性
7缓冲 有助于控制和优化数据流经过系统的速度解决生产消息和处理消息速度不一致的问题。
8异步通信 提供了异步处理机制允许用户把一个消息放入队列但不立即处理它在需要的时候处理。
二.kafka
1.概念
Kafka 是一个高性能、分布式的消息队列系统最初由LinkedIn开发并开源。它具有以下几个显著的特点和优势 高吞吐量Kafka 能够处理非常高的消息吞吐量。它的设计目标是每秒处理数百万条消息。 分布式Kafka 是一个分布式系统消息被分布存储在多个节点上支持水平扩展。这使得它能够处理大规模数据和流量。 持久性Kafka 将消息持久化到磁盘保证了消息在发送和消费过程中的持久性和可靠性。即使消费者离线一段时间也能确保不丢失消息。 多订阅者支持Kafka 的消息可以被多个消费者组消费每个消费者组可以独立消费消息支持发布-订阅和队列模式。 可扩展性Kafka 可以通过增加节点来水平扩展集群以支持更大的负载和更高的吞吐量。 低延迟Kafka 的设计追求低延迟适用于需要快速数据传输和处理的场景。 消息保留Kafka 允许消息在一定时间内保留在系统中消费者可以根据需要重放过去的消息。 社区活跃作为开源项目Kafka 拥有一个活跃的社区提供了丰富的文档、示例和支持。 2.kafka角色术语
1brokerkafka集群包含一个或多个服务器每个服务器被称为broker经纪人。
2Topic每条发布到kafka集群的消息都有一个分类这个分类被称为Topic主题。
3Producer消息的生产者负责发布消息到kafka broker。
4Consumer指消息的消费者从kafka broker拉取数据并消费这些已发布的消息
5partitionpartition是物理上的概念每个Topic包含一个或多个Partition每个Partition都是一个有序队列Partition中的每条消息都会被分配一个有序的idoffset
6Consumer Group消费者组默认属于group组。
7Message消息通信的基本单位每个producer可以向一个topic发布一些信息
三.zookeeper
1、概述 zookeeper是一种分布式协调技术所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制让它们有序的去访问某个共享资源防止造成资源竞争脑裂的后果。
2、zookeeper工作原理
1master启动 在分布式系统中引入Zookeeper以后就可以配置多个主节点这里以配置两个主节点为例假定它们是主节点A和主节点B当两个主节点都启动后它们都会向Zookeeper中注册节点信息。我们假设主节点A锁注册的节点信息是master001主节点B注册的节点信息是master002注册完以后会进行选举选举有多种算法这里以编号最小作为选举算法为例编号最小的节点将在选举中获胜并获得锁成为主节点也就是主节点A将会获得锁成为主节点然后主节点B将被阻塞成为一个备用节点。这样通过这种方式Zookeeper就完成了对两个Master进程的调度。完成了主、备节点的分配和协作
2master故障 如果主节点A发生了故障这时候它在Zookeeper所注册的节点信息会被自动删除而Zookeeper会自动感知节点的变化发现主节点A故障后会再次发出选举这时候主节点B将在选举中获胜替代主节点A成为新的主节点这样就完成了主、被节点的重新选举
3master恢复 如果主节点恢复了它会再次向zookeeper注册自身的节点信息只不过这时候它注册的节点信息将会变成master003而不是原来的信息。zookeeper会感知节点的变化再次发动选举这时候主节点B在选举过程中再次获胜继续担任主节点节点A担任备份节点。
3.zookeeper集群架构 Leader领导者角色主要负责投票的发起和决议以及更新系统状态。
follower跟随着角色用于接收客户端的请求并返回结果给客户端在选举过程中参与投票。
observer观察者角色用户接收客户端的请求并将写请求转发给leader同时同步leader状态但是不参与投票。bserver日的是扩展系统提高伸缩性。
client客户端角色用于向zookeeper发起请求 。
4.zookeeper工作流程 Zookeeper修改数据的流程Zookeeper集群中每个Server在内存中存储了一份数据在Zookeeper启动时将从实例中选举一个Server作为1eaderLeader负责处理数据更新等操作当且仅当大多数Server在内存中成功修改数据才认为数据修改成功。 Zookeeper写的流程为客户端Client首先和一个Server或者bserve通信发起写请求然后Server将写请求转发给LeaderLeader再将写请求转发给其它Server其它Server在接收到写请求后写入数据并响应LeaderLeader在接收到大多数写成功回应后认为数据写成功最后响应client完成一次写操作过程。
四、zookeeper在kafka中的作用
1.Broker注册 Broker是分布式部署并且相互之间相互独立但是需要有一个注册系统能够将整个集群中的Broker管理起来此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点 /brokers/ids 每个Broker在启动时都会到Zookeeper上进行注册即到/brokers/ids下创建属于自己的节点如/brokers/ids/[0……N] Kafka使用了全局唯一的数字来指代每个Broker服务器不同的Broker必须使用不同的BrokerID进行注册创建完节点后每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中Broker创建的节点类型是临时节点一旦Broker岩机则对应的临时节点也会被自动删除。
2.Topic注册 在Kafka中同一个Topic的消息会被分成多个分区并将其分布在多个Broker上这些分区信息及与Broker的对应关系也都是由Zookeeper在维护由专门的节点来记录。
3.生产者负载均衡 由于同一个Topic消息会被分区并将其分布在多个Broker上因此生产者需要将消息合理地发送到这些分布式的Broker上那么如何实现生产者的负载均衡Kafka支持传统的四层负载均衡也支持Zookeeper方式实现负载均衡
1四层负载均衡 根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常一个生产者只会对应单个Broker然后该生产者产生的消息都发往该Broker。这种方式逻辑简单每个生产者不需要同其他系统建立额外的TCP连接只需要和Broker维护单个TCP连接即可。但是其无法做到真正的负载均衡因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的如果有些生产者产生的消息远多于其他生产者的话那么会导致不同的Broker接收到的消息总数差异巨大同时生产者也无法实时感知到Broker的新增和删除
2使用Zookeeper进行负载均衡 由于每个Broker启动时都会完成Broker注册过程生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更这样就可以实现动态的负载均衡机制
4.消费者负载均衡 与生产者类似Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息每个消费者分组包含若干消费者每条消息都只会发送给分组中的个消费者不同的消费者分组消费白己特定的Topic下面的消息互不干扰
5.记录信息分区与消费者关系 消费组ConsumerGroup下有多个Consumer消费者。对于每个消费者组ConsumerGroupKafka都会为其分配一个全局唯一的GroupIDGroup内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer当然该分区还可以被分配给其他group。 同时Kafka为每个消费者分配一个ConsumerID通常采用Hostname:UUID形式表示 在Kafka中规定了每个消总分区只能被同组的一个消费者进行消费因此需要在 Zookeeper上记录消息分区与consumer之间的关系每个消费者一旦确定了对一个消息分区的消费权力需要将其consumer ID写入到Zookeeper对应消息分区的临时节点上例如 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中[broker_id-partition_id]就是一个消息分区的标识节点内容就是该消息分区上消费者的ConsumerID。
6、消息消费进度Offset记录 在消费者对指定消分区进行消息消费的过程中需要定时地将分区消息的消费进度Offset记录到zookeeper上以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后能够从之前的进度开始继续进行消息消费。ffset在Zookeeper中由一个专门节点进行记录其节点路径为/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 节点内容就是Offset的值
7.消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下:
1注册到消费者分组 每个消费者服务器启动时都会到zookeeper的指定节点下创建一个属于自己的消费者节点例如/consumers/[group_id]/ids/[consumer_id]完成节点创建后消费者就会将自己订阅的Topic信息写入该临时节点。
2对消费者分组中的消费者的变化注册监听 每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听一旦发现消费者新增或减少就触发消费者的负载均衡。
3对Broker服务器变化注册监听 消费者需要对/broker/ids/[-N]中的节点进行监听如果发现Broker服务器列表发生变化那么就根据具体情况来决定是否需要进行消费者负载均衡
4进行消费者负载均衡
为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消 息分区分配的过程通常对于一个消费者分组如果组内的消费者服务器发生变更或Broker服务器发生变更会发出消费者负载均衡 五.集群部署kafka
1修改主机hosts文件所有主机都配置
[rootkafka1 ~]# vim /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3
2zookeeper的部署
1安装zookeeper三个节点的配置相同
[rootkafka1 ~]# systemctl stop firewalld
[rootkafka1 ~]# setenforce 0
[rootkafka1 ~]# yum -y install java
[rootkafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[rootkafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper2创建数据保存目录三个节点的配置相同
[rootkafka1 ~]# cd /etc/zookeeper/
[rootkafka1 zookeeper]# mkdir zookeeper-data
3修改配置文件三个节点的配置相同
[rootkafka1 zookeeper]# cd /etc/zookeeper/conf
[rootkafka1 ~]# mv zoo_sample.cfg zoo.cfg
[rootkafka1 ~]# vim zoo.cfg
dataDir/etc/zookeeper/zookeeper-data
clientPort2181
server.1192.168.10.101:2888:3888
server.2192.168.10.102:2888:3888
server.3192.168.10.103:2888:3888
注释zookeeper只用的端口
2181对cline端提供服务
3888选举leader使用
2888集群内机器通讯使用Leader监听此端口
4创建节点id文件按server编号设置这个id三个机器不同
节点1
[rootkafka1 conf]# echo 1 /etc/zookeeper/zookeeper-data/myid
节点2
[rootkafka2 conf]# echo 2 /etc/zookeeper/zookeeper-data/myid
节点3
[rootkafka3 conf]# echo 3 /etc/zookeeper/zookeeper-data/myid
5三个节点启动zookeeper进程
[rootkafka1 conf]# cd /etc/zookeeper/
[rootkafka1 zookeeper]# ./bin/zkServer.sh start
[rootkafka1 zookeeper]# ./bin/zkServer.sh status 3kafka的部署
1kafka的安装三个节点的配置相同
[rootkafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz[rootkafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
2修改配置文件
[rootkafka1 ~]# cd /etc/kafka/[rootkafka2 kafka]# vim config/server.propertiesbroker.id1##21行 修改注意其他两个的id分别是2和3listenersPLAINTEXT://192.168.10.101:9092#31行 修改其他节点改成各自的IP地址log.dirs/etc/kafka/kafka-logs## 60行 修改num.partitions1##65行 分片数量不能超过节点数zookeeper.connect192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218##123行填写集群中各节点的地址和端口
注释
9092是kafka的监听端口
3创建日志目录三个节点的配置相同
[rootkafka1 kafka]# mkdir /etc/kafka/kafka-logs
4在所有kafka节点上执行开启命令生成kafka群集三个节点的配置相同
[rootkafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties
如果启动不了可以将/etc/kafka/kafka-logs中的数据清除再试试
4测试
创建topic任意一个节点
bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
列出topic任意一个节点
bin/kafka-topics.sh --list --zookeeper kafka1:2181bin/kafka-topics.sh --list --zookeeper kafka2:2181bin/kafka-topics.sh --list --zookeeper kafka3:2181
生产消息
bin/kafka-console-producer.sh --broker-list kafka1:9092 -topic test
消费消息
bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test
输入完成会可以去生产者方测试输入看消费者方有无信息
删除topic
bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test