当前位置: 首页 > news >正文

网站开发数据库分析模板专业建设目标

网站开发数据库分析模板,专业建设目标,网站建设公司浩森宇特,浙江纸巾包装设计公司版本说明 本文所有的讨论均在如下版本进行#xff0c;其他版本可能会有所不同。 Kafka: 3.6.0Pulsar: 2.9.0RabbitMQ 3.7.8RocketMQ 5.0Go1.21github.com/segmentio/kafka-go v0.4.45 结论先行 Kafka 只能保证单一分区内的顺序消息#xff0c;无法保证多分区间的顺序消息…版本说明 本文所有的讨论均在如下版本进行其他版本可能会有所不同。 Kafka: 3.6.0Pulsar: 2.9.0RabbitMQ 3.7.8RocketMQ 5.0Go1.21github.com/segmentio/kafka-go v0.4.45 结论先行 Kafka 只能保证单一分区内的顺序消息无法保证多分区间的顺序消息。具体来说要在 Kafka 完全实现顺序消息至少需要保证以下几个条件 同一生产者生产消息同步发送消息到 Kafka broker所有消息发布到同一个分区同一消费者同步按照顺序消费消息。 而要满足第 3 点常用的有 2 种思路 固定消息的 key生产端采用 key hash 的方式写入 broker自定义分区策略要保证顺序的消息都写入到指定的分区。 消息队列中的顺序消息如何实现 顺序消息定义 生产端发送出来的消息的顺序和消费端接收到消息的顺序是一样的。 消息存储结构 一般来说消息队列都是基于顺序存储结构来存储数据的不需要 B 树、B 树等复杂数据结构利用文件的顺序读写性能也很高。所以理想情况下生产者按顺序发送消息broker 会按顺序存储消息消费者再按顺序消费消息那么天然就实现了我们要的顺序消息了如下 基本条件 但是一般情况下消息队列为了支持更高的并发和吞吐大多数都有分区partition和消费者组consumer group机制而为了高可用一般也会有副本replica机制所以情况就复杂得多了如下面几个例子就会导致消息失序 多个生产者同时发送消息那么到达 broker 的时间也是不确定的所以 broker 就无法保证落盘的顺序性了单个生产者但是采用异步发送因为异步线程是并发执行的由 CPU 进行调度且有可能会因为发送失败而重试所以也无法保证消息可以按照顺序到达 broker同理消费者异步处理消息也无法保证顺序性一个 topic 有多个分区那么即使是同一个生产者由于分区策略消息可能会被分发到多个分区中消费者也就无法保证顺序性了。 所以到这里我们可以总结出实现顺序消息至少需要满足以下 3 点 单一生产者同步发送单一分区单一消费者同步消费 第 1、3 点比较简单Kafka 通过分区和 offset 的方式保证了消息的顺序。每个分区都是一个有序的、不可变的消息序列每个消息在分区中都有一个唯一的序数标识称为 offset。生产者在发送消息到分区时Kafka 会自动为消息分配一个 offset。消费者在读取消息时会按照 offset 的顺序来读取从而保证了消息的顺序。 下面我们主要来谈一谈第 2 点。 Kafka 顺序消息的实现 写入消息的过程 配置生产者首先你需要配置 Kafka 生产者。这包括指定 Kafka 集群的地址和端口以及其他相关配置项如消息序列化器、分区策略等。创建生产者实例在应用程序中你需要创建一个 Kafka 生产者的实例。这个实例将用于与 Kafka 集群进行通信。序列化消息在将消息发送到 Kafka 集群之前你需要将消息进行序列化。Kafka 使用字节数组来表示消息的内容因此你需要将消息对象序列化为字节数组。这通常涉及将消息对象转换为 JSON、Avro、Protobuf 等格式。选择分区Kafka 的主题topic被分为多个分区partition每个分区都是有序且持久化的消息日志。当你发送消息时你可以选择将消息发送到特定的分区或者让 Kafka 根据分区策略自动选择分区。发送消息一旦消息被序列化并选择了目标分区你可以使用 Kafka 生产者的 send() 方法将消息发送到 Kafka 集群。发送消息时生产者会将消息发送到对应分区的 leader 副本。异步发送Kafka 生产者通常使用异步方式发送消息这样可以提高吞吐量。生产者将消息添加到一个发送缓冲区send buffer中并在后台线程中批量发送消息到 Kafka 集群。消息持久化一旦消息被发送到 Kafka 集群的 leader 副本它将被持久化并复制到其他副本以确保数据的高可靠性和冗余性。只有当消息被成功写入到指定数量的副本后生产者才会收到确认acknowledgement。错误处理和重试如果发送消息时发生错误生产者可以根据配置进行错误处理和重试。你可以设置重试次数、重试间隔等参数来控制重试行为。 实现单一分区 再 Kafka 中我们要实现将消息写入到同一个分区有 3 种思路 配置 num.partitions1 或者创建 topic 的时候指定只有 1 个分区但这会显著降低 Kafka 的吞吐量。固定消息的 key然后采用 key hash 的分区策略这样就可以让所有消息都被分到同一个分区中。实现并指定自定义分区策略可以根据业务需求将需要顺序消费的消息都分到固定一个分区中。 // 如下例子所有使用same-key作为key的消息都会被发送到同一个Partition ProducerRecordString, String record new ProducerRecordString, String(topic, same-key, message); producer.send(record);重平衡带来的问题 如果采用上述的第 2 种思路固定消息 key依靠 key hash 分区策略实现单一分区。在我们只有 1 个消费者的情况下是没有问题的但是如果我们使用的是消费者组那么在发生重平衡操作的时候就可能会有问题了。 Kafka 的重平衡Rebalance是指 Kafka 消费者组Consumer Group中的消费者实例对分区的重新分配。这个过程主要发生在以下几种情况 消费者组中新的消费者加入。消费者组中的消费者离开或者挂掉。订阅的 Topic 的分区数发生变化。消费者调用了 #unsubscribe() 或者 #subscribe() 方法。 重平衡的过程主要包括以下几个步骤 Revoke首先Kafka 会撤销消费者组中所有消费者当前持有的分区。Assignment然后Kafka 会重新计算分区的分配情况然后将分区分配给消费者。Resume最后消费者会开始消费新分配到的分区。 重平衡的目的是为了保证消费者组中的消费者能够公平地消费 Topic 的分区。通过重平衡Kafka 可以在消费者的数量发生变化时动态地调整消费者对分区的分配从而实现负载均衡。 然而当发生重平衡时分区可能会被重新分配给不同的消费者这可能会影响消息的消费顺序。 举个例子 假设消费者 A 正在消费分区 P 的消息它已经消费了消息 1消息 2正在处理消息 3。此时发生了重平衡分区 P 被重新分配给了消费者 B。消费者 B 开始消费分区 P它会从上一次提交的偏移量offset开始消费。假设消费者 A 在处理消息 3 时发生了故障没有提交偏移量那么消费者 B 会从消息 3 开始消费。这样消息 3 可能会被消费两次而且如果消费者 B 处理消息 3 的速度快于消费者A那么消息 3 可能会在消息 2 之后被处理这就打破了消息的顺序性。 再举个例子 topic-A 本来只有 3 个分区按照 key hashkey 为 same-key 的消息应该都发到 第 2 个分区但是后来 topic-A 变成了 4 个分区按照 key hashkey 为 same-key 的消息可能就被发到第 3 个分区了这就无法做到单一分区可能会导致消息失序。 当然这个例子不是由重平衡直接引起的但是这种情况也是有可能导致消息失序的。 缓解重平衡的问题 避免动态改变分区数在需要严格保持消息顺序的场景下应避免动态地改变分区数。这意味着在设计 Kafka 主题时应提前规划好所需的分区数以避免日后需要进行更改。使用单个分区对于严格顺序要求的场景可以考虑使用单分区主题。虽然这会限制吞吐量和并发性但可以保证消息的全局顺序。使用其他策略保持顺序在某些情况下可以通过在应用层实现逻辑来保持顺序比如在消息中包含顺序号或时间戳并在消费时根据这些信息重建正确的顺序。使用静态成员功能它允许消费者在断开和重新连接时保持其消费者组内的身份这可以减少因短暂的网络问题或消费者重启导致的不必要的重平衡。 上面这些措施只能减少重平衡带来的问题并无法根除如果非要实现严格意义上的顺序消息要么在消息中加入时间戳等标记在业务层保证顺序消费要么就只能采用 单一生产者同步发送 单一分区 单一消费者同步消费 这种模式了。 静态成员功能 Kafka 2.3.0 版本引入了一项新功能静态成员Static Membership。这个功能主要是为了减少由于消费者重平衡rebalance引起的开销和延迟。在传统的 Kafka 消费者组中当新的消费者加入或离开消费者组时会触发重平衡。这个过程可能会导致消息的处理延迟并且在高吞吐量的场景下可能会对性能造成影响。静态成员功能旨在缓解这些问题。以下是它的一些关键点 静态成员的工作原理 静态成员标识消费者在加入消费者组时可以提供一个静态成员标识Static Member ID。这允许 Kafka Broker 识别特定的消费者实例而不是仅仅依赖于消费者组内的动态分配。 重平衡优化当使用静态成员功能时如果一个已知的消费者由于某种原因如网络问题短暂断开后重新连接Kafka 不会立即触发重平衡。相反Kafka 会等待一个预设的超时期限session.timeout.ms在此期间如果消费者重新连接它将保留原来的分区分配。 减少重平衡次数这大大减少了由于消费者崩溃和恢复、网络问题或维护操作引起的不必要的重平衡次数。 使用静态成员的优点 提高稳定性减少重平衡可以提高消费者组的整体稳定性尤其是在大型消费者组和高吞吐量的情况下。 减少延迟由于减少了重平衡的次数可以减少因重平衡导致的消息处理延迟。 持久的消费者分区分配这使得消费者在分区分配上更加持久有助于更好地管理和优化消息的消费。 如何使用 要使用静态成员功能需要在 Kafka 消费者的配置中设置 group.instance.id。这个 ID 应该是唯一的并且在消费者重启或重新连接时保持不变。同时还需要配置 session.timeout.ms以决定在触发重平衡之前消费者可以离线多长时间。 注意事项 虽然静态成员功能可以减少重平衡的发生但它不会完全消除重平衡。在消费者组成员的长期变化如新消费者的加入或永久离开时仍然会发生重平衡。需要合理设置 session.timeout.ms以避免消费者由于短暂的网络问题或其他原因的断开而过早触发重平衡。 静态成员功能在处理大规模 Kafka 应用时尤其有用它提供了一种机制来优化消费者组的性能和稳定性。 幂等性 Kafka 0.11 版本后提供了幂等性生产者这意味着即使生产者因为某些错误重试发送相同的消息这些消息也只会被记录一次。这是通过给每一批发送到 Kafka 的消息分配一个序列号实现的broker 使用这个序列号来删除重复发送的消息。使用幂等性生产者可以减少重复消息的风险这意味着即使在网络重试等情况下消息的顺序也能得到更好的保证。因为重复消息不会被多次记录所以不会破坏已有消息的顺序。 其他常见消息队列顺序消息的实现 Pulsar Pulsar 和 Kafka 一样都是通过生产端按 Key Hash 的方案将数据写入到同一个分区。 RabbitMQ RabbitMQ 在生产时没有生产分区分配的过程。它是通过 Exchange 和 Route Key 机制来实现顺序消息的。Exchange 会根据设置好的 Route Key 将数据路由到不同的 Queue 中存储。此时 Route Key 的作用和 Kafka 的消息的 Key 是一样的。 RocketMQ RocektMQ 支持消息组MessageGroup的概念。在生产端指定消息组则同一个消息组的消息就会被发送到同一个分区中。此时这个消息组起到的作用和 Kakfa 的消息的 Key 是一样的。 实战 Kafka 实现顺序消息 代码仓库https://github.com/hedon954/kafka-go-examples/tree/master/orderedmsg 下面我们来写一写实战用例更加直观地感受一下 Kafka 顺序消息的实现细节。 首先我们在集群上创建一个 topic ordered-msg-topic分区为 3 个运行以下命令 /opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic ordered-msg-topic --partitions 3 --replication-factor 1搭建 Kafka 集群可以看这两篇Kafka集群搭建(Zookeeper)、Kafka集群搭建(KRaft)。 单生产者单消费者 正常情况下使用单一生产者同步发送和单一消费者同步发送只要我们保证 key 是固定的则所有消息都会写到同一个分区是可以实现顺序消息的。 代码目录如下 ├─config │ config.go # 常量定义 ├─consumer │ consumer.go # 消费者 └─producerproducer.go # 生产者首先我们先定义一些常量 import github.com/segmentio/kafka-govar (Topic ordered-msg-topicBrokers []string{kafka1.com:9092, kafka2.com:9092, kafka3.com:9092}Addr kafka.TCP(Brokers...)GroupId ordered-msg-groupMessageKey []byte(message-key) )我们先实现生产者端主要是不断往 ordered-msg-topic 中写入数据 package mainimport (contextfmttimekafka-go-examples/orderedmsg/configgithub.com/segmentio/kafka-go )func NewProducer() *kafka.Writer {return kafka.Writer{Addr: config.Addr,Topic: config.Topic,Balancer: kafka.Hash{}, // 哈希分区} }func NewMessages(count int) []kafka.Message {res : make([]kafka.Message, count)for i : 0; i count; i {res[i] kafka.Message{Key: config.MessageKey,Value: []byte(fmt.Sprintf(msg-%d, i1)),}}return res }func main() {producer : NewProducer()messages : NewMessages(100)if err : producer.WriteMessages(context.Background(), messages...); err ! nil {panic(err)}_ producer.Close() }我们再来实现消费者目前我们就启动 1 个消费者 package mainimport (contextfmttimekafka-go-examples/orderedmsg/configgithub.com/segmentio/kafka-go )type Consumer struct {Id string*kafka.Reader }// NewConsumer 创建一个消费者它属于 config.GroupId 这个消费者组 func NewConsumer(id string) *Consumer {c : Consumer{Id: id,Reader: kafka.NewReader(kafka.ReaderConfig{Brokers: config.Brokers,GroupID: config.GroupId,Topic: config.Topic,Dialer: kafka.Dialer{ClientID: id,},}),}return c }// Read 读取消息intervalMs 用来控制消费者的消费速度 func (c *Consumer) Read(intervalMs int) {fmt.Printf(%s start read\n, c.Id)for {msg, err : c.ReadMessage(context.Background())if err ! nil {fmt.Printf(%s read msg err: %v\n, c.Id, err)return}// 模拟消费速度time.Sleep(time.Millisecond * time.Duration(intervalMs))fmt.Printf(%s read msg: %s, time: %s\n, c.Id, string(msg.Value), time.Now().Format(03-04-05))} }func main() {c1 : NewConsumer(consumer-1)c1.Read(500) }启动生产者生产消息然后启动消费者观察控制台不难看出这种情况下就是顺序消费 consumer-1 read msg: msg-10, time: 04:29:10 consumer-1 read msg: msg-11, time: 04:29:11 consumer-1 read msg: msg-12, time: 04:29:12 consumer-1 read msg: msg-13, time: 04:29:13 consumer-1 read msg: msg-14, time: 04:29:14 consumer-1 read msg: msg-15, time: 04:29:15 consumer-1 read msg: msg-16, time: 04:29:16重平衡带来的问题 我们先重建 topic清楚掉之前的数据 /opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ordered-msg-topic/opt/kafka-3.6.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic ordered-msg-topic --partitions 3 --replication-factor 1下面我们来采用消费者组的形式消费消息在这期间我们不断往消费者组中新增消费者使其发生重平衡我们来观察下消息的消费情况。 修改消费者端的 main() func main() {// 先启动 c1c1 : NewConsumer(consumer-1)go func() {c1.Read(500)}()// 5 秒后启动 c2time.Sleep(5 * time.Second)go func() {c2 : NewConsumer(consumer-2)c2.Read(300)}()// 再 10 秒后启动 c3 和 c4time.Sleep(10 * time.Second)go func() {c3 : NewConsumer(consumer-3)c3.Read(100)}()go func() {c4 : NewConsumer(consumer-4)c4.Read(100)}()select {} }先启动生产者重新生产数据然后再启动消费者消费数据观察控制台 consumer-1 start read consumer-1 read msg: msg-1, time: 04:44:28 consumer-1 read msg: msg-2, time: 04:44:28 consumer-1 read msg: msg-3, time: 04:44:29 # consumer-1 按顺序消费 consumer-2 start read # consumer-2 进来 consumer-1 read msg: msg-4, time: 04:44:30 consumer-1 read msg: msg-5, time: 04:44:30 consumer-1 read msg: msg-6, time: 04:44:31 # 这里相差了 6s就是在进行重平衡 consumer-2 read msg: msg-7, time: 04:44:37 # 重平衡后发现原来的分区给 consumer-2 消费了 consumer-1 read msg: msg-7, time: 04:44:37 # 这里发生了重复消费 consumer-2 read msg: msg-8, time: 04:44:37 consumer-2 read msg: msg-9, time: 04:44:37 consumer-2 read msg: msg-10, time: 04:44:38 consumer-2 read msg: msg-11, time: 04:44:38 consumer-2 read msg: msg-12, time: 04:44:38 consumer-2 read msg: msg-13, time: 04:44:39 consumer-2 read msg: msg-14, time: 04:44:39 consumer-2 read msg: msg-15, time: 04:44:39 # consumer-2 按顺序消息 consumer-4 start read # consumer-3 和 consumer-4 进来 consumer-3 start read consumer-2 read msg: msg-16, time: 04:44:40 consumer-4 read msg: msg-17, time: 04:44:46 # 这里发生重平衡 consumer-4 read msg: msg-18, time: 04:44:46 # 重平衡后由 consumer-4 负责该分区 consumer-2 read msg: msg-17, time: 04:44:46 # 这里由于 2 的速度比 4 慢很多所以就乱序了还重复消费 consumer-4 read msg: msg-19, time: 04:44:46 consumer-4 read msg: msg-20, time: 04:44:46 # ...总结 当我们采用消费者组的时候由于重平衡机制的存在单纯从 Kafka 的角度来说是无法完全实现顺序消息的只能通过静态成员功能、避免分区数量变化和减少消费者组成员数量变化等方式来尽可能减少重平衡的发生进而尽可能维持消息的顺序性。 参考 极客时间 - 深入拆解消息队列 47 讲许文强《Kafka 权威指南第 2 版》Pulsar 官方文档-分区topic-顺序保证RocketMQ 官方文档-功能特性-顺序消息RabbitMQ 官方文档
http://www.hkea.cn/news/14409657/

相关文章:

  • 大专建设工程管理有用吗seo sem培训
  • 外贸网站搭建服务商凡科网和wordpress
  • amp网站建设企业建设网站的需求分析
  • 网站的标准企业logo设计合同
  • 官方网站建设 都来磐石网络马鞍山做网站公司排名
  • 什么网站做电子相册比加快wordpress 百度站长
  • 岗顶网站开发创建一个平台需要什么
  • 白酒企业网站建设地推接单大厅app
  • 设计教学网站推荐无锡网络公司
  • 网站建设沈阳凯鸿誉铭摄影网站
  • 网上购物网站大全怎样提升网站关键词
  • 柯城网站建设网页设计素材资讯
  • 设计师每天都上的网站不用vip会员也能观看的软件
  • 京东商城网站风格创意模板
  • 百度网站排名关键词整站优化引航博景网站做的好吗
  • 网站代码检查页面做的好看的网站
  • 帮别人设计网站的网站吗成都大型广告公司有哪些
  • 做个企业网网站怎么做网站seo监测
  • 双语网站系统影视网站cpa 如何做
  • 重庆网站空间键词排名企业资质查询
  • 深圳网站 商城制作网站正能量视频不懂我意思吧
  • 网站几个数据库简单的html网页制作
  • 微云做网站广州模板建站系统
  • 河南省住房和城乡建设网站分类信息网站建设系统
  • 自主建站网站app下载软件免费下载
  • 淘宝网站建设类别报价表
  • 南宁中小企业网站制作自建网站如何盈利
  • 常州建设工程监理员挂证网站做服务网站要多少钱
  • 网站怎么做二级页面莆田有哪些网站建设公司
  • 厦门网站建设技术支持菜鸟式网站建设图书