网站后台怎么做友情链接,代做网站排名,深圳歌剧院设计方案,常州网站公司怎么选消息#xff08;Record#xff09;
在kafka中传递的数据我们称之为消息#xff08;message#xff09;或记录(record)#xff0c;所以Kafka发送数据前#xff0c;需要将待发送的数据封装为指定的数据模型#xff1a; 相关属性必须在构建数据模型时指定#xff0c;其中…消息Record
在kafka中传递的数据我们称之为消息message或记录(record)所以Kafka发送数据前需要将待发送的数据封装为指定的数据模型 相关属性必须在构建数据模型时指定其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题那么Topic主题可以不存在。value就是我们需要真正传递的数据了而在未指定分区器或者未指定分区得情况下Key可以用于数据的分区定位。 根据前面提供的配置信息创建生产者对象通过这个生产者对象向Kafka服务器节点发送数据而具体的发送是由生产者对象创建时内部构建的多个组件实现的多个组件的关系有点类似于生产者消费者模式。
生产者Producer是一个关键组件负责将消息发送到Kafka集群。Kafka生产者主要由三个核心部分组成
KafkaProducerRecordAccumulatorSender 数据生产者KafkaProducer
作用
KafkaProducer是生产者客户端的核心接口为生产者对象用于对我们的数据进行必要的转换和处理将处理后的数据放入到数据收集器中类似于生产者消费者模式下的生产者负责提供向Kafka集群发布消息的功能。
组成
KafkaProducer由以下关键部分组成
配置ProducerConfig用于初始化和配置生产者客户端的参数。拦截器Interceptor用于在消息发送前后进行自定义处理。如果配置拦截器栈interceptor.classes那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。序列化器Serializer因为发送的数据为KV数据所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。分区器Partitioner计算数据所发送的分区位置。
工作流程
初始化根据配置参数初始化客户端包括序列化器、分区器等。消息发送消息经过拦截器处理、序列化、分区选择后放入RecordAccumulator中。事务支持处理事务消息的发送和事务边界如果配置了事务如幂等。 数据收集器RecordAccumulator
作用
RecordAccumulator用于收集转换我们产生的数据类似于生产者消费者模式下的缓冲区。为了优化数据的传输Kafka并不是生产一条数据就向Broker发送一条数据而是通过合并单条消息进行批量批次发送提高吞吐量减少带宽消耗。
组成
RecordAccumulator由以下部分组成
内存缓冲区BufferPool管理消息缓冲区的内存分配。消息队列Deque每个分区对应一个消息队列用于存储批次消息。批次ProducerBatch用于存储一批待发送的消息。
内部工作
默认情况下一个发送批次的数据容量为16K这个可以通过参数batch.size进行改善。批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并形成一个批次。将消息追加到对应分区的批次中如果当前批次已满或达到时间限制创建新的批次。这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据等待发送
重要参数
batch.size每个批次的大小默认16K。linger.ms发送前的等待时间限制默认0s。buffer.memory内存缓冲区的总大小默认32M。 数据发送器Sender
作用
Sender是一个后台线程负责从RecordAccumulator中取出消息批次向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中
组成
Sender由以下部分组成
网络客户端NetworkClient负责与Kafka Broker进行网络通信。元数据管理Metadata获取和更新Kafka集群的元数据信息。请求管理ClientRequest/ClientResponse管理发送的请求和接收的响应。
内部工作
因为数据真正发送的地方是Broker节点不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。将组合后的节点List批次的数据封装成客户端请求发送到网络客户端对象的缓冲区由网络客户端对象通过网络发送给Broker节点。Broker节点获取客户端请求并根据请求键进行后续的数据处理向分区中增加数据。
重要参数
retries重试次数。retry.backoff.ms重试间隔时间。request.timeout.ms请求超时时间。 协作机制 消息发送流程 用户通过KafkaProducer.send()方法发送消息。消息经过序列化、分区选择和拦截器处理后进入RecordAccumulator。RecordAccumulator将消息存入对应分区的消息队列中形成批次。 消息传输流程 Sender后台线程不断从RecordAccumulator中取出已准备好的消息批次。Sender通过NetworkClient将消息批次发送到Kafka Broker。如果发送成功Sender接收响应并通知KafkaProducer如果发送失败根据重试策略进行重试。
通过这种协作机制Kafka生产者实现了高效、可靠的消息发送。KafkaProducer负责接口和配置管理RecordAccumulator负责消息缓存和批量处理Sender负责消息的实际传输和重试逻辑。 生产者代码
// TODO 配置属性集合
MapString, Object configMap new HashMap();
// TODO 配置属性Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
// TODO 配置属性Kafka生产的数据为KV对所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);
// TODO 创建Kafka生产者对象建立Kafka连接
// 构造对象时需要传递配置参数
KafkaProducerString, String producer new KafkaProducer(configMap);
// TODO 准备数据,定义泛型
// 构造对象时需要传递 【Topic主题名称】【Key】【Value】三个参数
ProducerRecordString, String record new ProducerRecordString, String(test, key1, value1
);
// TODO 生产发送数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();拦截器
生产者API在数据准备好发送给Kafka服务器之前允许我们对生产的数据进行统一的处理比如校验整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能所以可以根据实际的情况自行选择使用。
但是要注意这里的拦截器是可以配置多个的。执行时会按照声明顺序执行完一个后再执行下一个。并且某一个拦截器如果出现异常只会跳出当前拦截器逻辑并不会影响后续拦截器的处理。所以开发时需要将拦截器的这种处理方法考虑进去。 自定义拦截器
要想自定义拦截器只需要创建一个类然后实现Kafka提供的分区类接口ProducerInterceptor接下来重写方法。这里我们只关注onSend方法即可。 import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** TODO 自定义数据拦截器* 1. 实现Kafka提供的生产者接口ProducerInterceptor* 2. 定义数据泛型 K, V* 3. 重写方法* onSend* onAcknowledgement* close* configure*/
public class KafkaInterceptorMock implements ProducerInterceptorString, String {Override// 数据发送前会执行此方法进行数据发送前的预处理public ProducerRecordString, String onSend(ProducerRecordString, String record) {return record;}Override // 数据发送后获取应答时会执行此方法public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Override// 生产者关闭时会执行此方法完成一些资源回收和释放的操作public void close() {}Override// 创建生产者对象的时候会执行此方法可以根据场景对生产者对象的配置进行统一修改或转换。public void configure(MapString, ? configs) {}
}使用拦截器
// 仅需在配置properties的时候指定自定义拦截器器即可
configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());同步发送和异步发送
1. 异步发送
如果Kafka通过主线程代码将一条数据放入到缓冲区后无需等待数据的后续发送过程就直接发送一下条数据的场合我们就称之为异步发送。
import org.apache.kafka.clients.producer.*;import java.util.HashMap;
import java.util.Map;public class KafkaProducerASynTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}});// TODO 发送当前数据System.out.println(发送数据);}producer.close();}
}2. 同步发送
如果Kafka通过主线程代码将一条数据放入到缓冲区后需等待数据的后续发送操作的应答状态才能发送一下条数据的场合我们就称之为同步发送。所以这里的所谓同步就是生产数据的线程需要等待发送线程的应答响应结果。 import org.apache.kafka.clients.producer.*;import java.util.HashMap;
import java.util.Map;public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 循环生产数据for ( int i 0; i 10; i ) {// TODO 创建数据ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后会回调此方法System.out.println(数据发送成功 recordMetadata.timestamp());}}).get();// TODO 发送当前数据System.out.println(发送数据);}producer.close();}
}分区器
如果指定了分区直接使用如果指定了自己的分区器通过分区器计算分区编号如果有效直接使用如果指定了数据Key且使用Key选择分区的场合采用murmur2非加密散列算法类似于hash计算数据Key序列化后的值的散列值然后对主题分区数量模运算取余最后的结果就是分区编号。hash(key)%numPartitions 分区号如果未指定数据Key或不使用Key选择分区那么Kafka会自动分区。
自定义分区器
只需要创建一个类然后实现Kafka提供的分区类接口Partitioner接下来重写方法。这里我们只关注partition方法即可因为此方法的返回结果就是需要的分区编号。
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** TODO 自定义分区器实现步骤* 1. 实现Partitioner接口* 2. 重写方法* partition : 返回分区编号从0开始* close* configure*/
public class KafkaPartitionerMock implements Partitioner {/*** 分区算法 - 根据业务自行定义即可* param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes The serialized key to partition on( or null if no key)* param value The value to partition on or null* param valueBytes The serialized value to partition on or null* param cluster The current cluster metadata* return 分区编号从0开始*/Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}Overridepublic void close() {}Overridepublic void configure(MapString, ? configs) {}
}使用分区器
// 仅需在配置properties的时候指定自定义分区器即可
configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());消息可靠性Acknowledgement
ACK 0
当生产数据时生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候Kafka就对当前的数据请求进行了响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。
ACK 1
当生产数据时Kafka Leader副本将数据接收到并写入到了日志文件后就会对当前的数据请求进行响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。
ACK -1/all (默认) 最可靠
当生产数据时Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后再对当前的数据请求进行响应确认应答如果是同步发送数据此时就可以发送下一条数据了。如果是异步发送数据回调方法就会被触发。 数据重试导致的数据重复
由于网络或服务节点的故障Kafka在传输数据时可能会导致数据丢失所以我们才会设置ACK应答机制尽可能提高数据的可靠性。
在某些场景中数据并不是真正地丢失比如将ACK应答设置为1Leader副本将数据写入文件后Kafka就可以对请求进行响应。此时假设网络故障的原因Kafka并没有成功将ACK应答信息发送给Producer那么此时对于Producer来讲以为kafka没有收到数据所以就会一直等待响应一旦超过某个时间阈值就会发生超时错误也就是说在Kafka Producer眼里数据已经丢了所以在这种情况下kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。如果此时发送成功Kafka就又收到了数据两条数据一样也就是说数据的重复。 数据乱序
数据重试(retry)功能除了可能会导致数据重复以外还可能会导致数据乱序。
假设需要将编号为123的三条连续数据发送给Kafka。如果在发送过程中1因为网络原因发送失败2、3发生成功则此时在Broker的缓存中为消息2、3生产者重发消息1则此时在Broker的缓存中为消息2、3、1
这就产生了数据的乱序 幂等
为了解决Kafka传输数据时所产生的数据重复和乱序问题Kafka引入了幂等性操作所谓的幂等性就是Producer同样的一条数据无论向Kafka发送多少次kafka都只会存储一条。注意这里的同样的一条数据指的不是内容一致的数据而是指的不断重试的数据。
// 幂等需要手动开启
enable.idempotence配置为true开启幂等性后为了保证数据不会重复那么就需要给每一个请求批次的数据增加唯一性标识kafka中这个标识采用的是连续的序列号数字sequencenum但是不同的生产者Producer可能序列号是一样的所以仅仅靠seqnum还无法唯一标记数据所以还需要同时对生产者进行区分所以Kafka采用申请生产者IDproducerid的方式对生产者进行区分。这样在发送数据前我们就需要提前申请producerid以及序列号sequencenum Broker中会给每一个分区记录生产者的生产状态采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试均衡空间效率和时间效率所得到的值所以为固定值无法配置且不能修改。 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的如果有相同的那么说明有重复当前批次数据不做任何处理。 如果Broker当前的请求批次数据在缓存中没有相同的那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1如果是说明是连续的顺序没乱。那么继续如果不是那么说明数据已经乱了发生异常。 Broker根据异常返回响应通知Producer进行重试。Producer重试前需要在缓冲区中将数据重新排序保证正确的顺序后。再进行重试即可。 如果请求批次不重复且有序那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾将队列的第一个移除保证队列中缓冲的数据最多5个。
从上面的流程可以看出Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重在一些对数据敏感的业务中是十分重要的。但是通过原理咱们也能明白这种幂等性还是有缺陷的
幂等性的producer仅做到单分区上的幂等性即单分区消息有序不重复多分区无法保证幂等性。只能保持生产者单个会话的幂等性无法实现跨会话的幂等性也就是说如果一个producer挂掉再重启那么重启前和重启后的producer对象会被当成两个独立的生产者从而获取两个不同的独立的生产者ID导致broker端无法获取之前的状态信息所以无法实现跨会话的幂等要想解决这个问题就需要采用事务功能。 什么是生产者事务
在Kafka中生产者事务Producer Transactions允许生产者以原子方式向一个或多个主题写入消息。事务可以确保消息要么全部成功写入要么全部失败从而保证数据的一致性。下面详细解释Kafka中生产者事务的原理
事务组件
在Kafka中事务涉及以下几个组件
Transactional Producer事务生产者负责在事务中写入消息。Transaction Coordinator事务协调器管理事务的生命周期包括开始事务、提交事务和中止事务。Broker代理Kafka集群中的服务器存储消息并协助协调事务。
事务处理的详细步骤
初始化事务事务生产者向事务协调器注册事务协调器为该生产者分配一个唯一的Transactional ID和Producer Epoch。开始事务生产者调用beginTransaction时事务协调器记录事务开始状态。发送消息生产者发送消息时消息会标记为“待处理”状态并包含事务ID和epoch。提交事务生产者调用commitTransaction时事务协调器将事务状态更新为“提交中”然后通知所有相关分区代理提交消息。完成提交所有分区代理确认消息已写入日志后事务协调器更新事务状态为“已提交”通知生产者事务完成。中止事务在任何步骤发生错误时生产者可以调用abortTransaction事务协调器将事务状态更新为“中止中”通知相关分区代理丢弃消息最后更新事务状态为“已中止”。
实现事务的要点
幂等性确保消息的幂等性以避免重复写入。Kafka通过Producer ID和Sequence Number实现幂等性。协调和日志事务协调器负责管理事务状态事务日志记录事务的所有状态变化。原子提交确保事务提交的原子性通过两阶段提交协议2PC实现。 第一阶段预提交 生产者发送消息时代理记录Producer ID和Sequence Number。消息标记为“待提交”。 第二阶段提交或中止 当生产者提交事务时事务协调器通知代理将消息标记为“已提交”。如果事务中止代理丢弃消息不进行处理。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;public class ProducerTransactionTest {public static void main(String[] args) {MapString, Object configMap new HashMap();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 配置幂等性configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// TODO 配置事务IDconfigMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, my-tx-id);// TODO 配置事务超时时间configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5);// TODO 创建生产者对象KafkaProducerString, String producer new KafkaProducer(configMap);// TODO 初始化事务producer.initTransactions();try {// TODO 启动事务producer.beginTransaction();// TODO 生产数据for ( int i 0; i 10; i ) {ProducerRecordString, String record new ProducerRecordString, String(test, key i, value i);final FutureRecordMetadata send producer.send(record);}// TODO 提交事务producer.commitTransaction();} catch ( Exception e ) {e.printStackTrace();// TODO 终止事务producer.abortTransaction();}// TODO 关闭生产者对象producer.close();}
}如何确保跨会话中的幂等生产者崩溃后事务恢复生产者即为跨会话
确保幂等性的过程主要依赖于Kafka的Producer ID、Producer Epoch和Sequence Number机制。以下是详细的过程描述
1. 初始生产者启动和消息发送 生产者初始化 生产者在初始化时Kafka为其分配一个唯一的Producer ID (PID)。Producer ID标识当前生产者实例。 开始发送消息 生产者发送消息时每个消息附带一个Sequence Number。Sequence Number在每个分区内是递增的用于标识消息的顺序。
2. 记录Producer ID和Sequence Number
Kafka代理Broker会记录每个分区的Producer ID和最新的Sequence Number。每条消息发送时代理会检查当前Producer ID和Sequence Number确保消息的顺序和唯一性。
3. 生产者崩溃和重启 生产者崩溃 假设生产者在事务过程中崩溃。 生产者重启 生产者重启后使用相同的Transactional ID重新初始化。Kafka为重启的生产者分配一个新的Producer ID。同时Producer Epoch递增标识这是生产者的一个新的会话。
4. 恢复未完成的事务
事务协调器负责管理事务状态恢复未完成的事务。当生产者重启并调用initTransactions时事务协调器会 检查与Transactional ID相关的未完成事务。根据事务日志记录决定是提交还是中止这些事务。
5. 幂等性保障机制 新的Producer ID和递增的Producer Epoch Kafka代理识别新的Producer ID和递增的Producer Epoch确保重启后的生产者实例与之前的实例区分开。 更新记录 代理更新记录新的Producer ID和Sequence Number。新的Producer ID和递增的Sequence Number确保消息不重复处理。 消息去重 代理通过检查Producer ID和Sequence Number确保同一个Producer ID的消息不会被处理两次。即使生产者在重启后重新发送消息代理能够识别并忽略重复的消息。
6. 事务性消息处理 第一阶段预提交 生产者发送消息时代理记录Producer ID和Sequence Number。消息标记为“待提交”。 第二阶段提交或中止 当生产者提交事务时事务协调器通知代理将消息标记为“已提交”。如果事务中止代理丢弃消息不进行处理。
总结
通过以上机制Kafka确保生产者在不同会话中的幂等性
Producer ID和Producer Epoch标识生产者实例和会话阶段。Sequence Number确保每个分区中的消息顺序和唯一性。事务协调器管理事务状态确保事务的一致性和原子性。
即使生产者崩溃并重启通过这些机制Kafka能够保证消息的幂等性和事务一致性避免重复处理消息确保数据可靠性和一致性。