电子商务平台网站源码,做防伪的网站,软件外包行业分析,做网站被骗了怎么办1.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程#xff1a; 1)Kafka 会将发送消息包装为 ProducerRecord 对象#xff0c; ProducerRecord 对象包含了目标主题和要发送的内容#xff0c;同时还可以指定键和分区。在发送 ProducerRecord 对象前#xff0c…1.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程 1)Kafka 会将发送消息包装为 ProducerRecord 对象 ProducerRecord 对象包含了目标主题和要发送的内容同时还可以指定键和分区。在发送 ProducerRecord 对象前生产者会先把键和值对象序列化成字节数组这样它们才能够在网络上传输。 2) 接下来数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区那么分区器就不会再做任何事情。如果没有指定分区 那么分区器会根据 ProducerRecord 对象的键来选择一个分区紧接着这条记录被添加到一个记录批次里这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。 3) 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka就返回一个 RecordMetaData 对象它包含了主题和分区信息以及记录在分区里的偏移量。如果写入失败则会返回一个错误。生产者在收到错误之后会尝试重新发送消息如果达到指定的重试次数后还没有成功则直接抛出异常不再重试。 2.创建生产者 2.1 项目依赖 本项目采用 Maven 构建想要调用 Kafka 生产者 API需要导入 kafka-clients 依赖如下 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.0/version
/dependency2.2 创建生产者 创建 Kafka 生产者时以下三个属性是必须指定的 bootstrap.servers指定 broker 的地址清单清单里不需要包含所有的 broker 地址生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错 key.serializer指定键的序列化器 value.serializer指定值的序列化器。 创建的示例代码如下 public class SimpleProducer {public static void main(String[] args) {String topicName Hello-Kafka;Properties props new Properties();props.put(bootstrap.servers, hadoop001:9092);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);/*创建生产者*/ProducerString, String producer new KafkaProducer(props);for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topicName, hello i, world i);/* 发送消息*/producer.send(record);}/*关闭生产者*/producer.close();}
}2.3 测试 2.3.1启动Kakfa Kafka 的运行依赖于 zookeeper需要预先启动可以启动 Kafka 内置的 zookeeper也可以启动自己安装的 # zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties 启动单节点 kafka 用于测试 # bin/kafka-server-start.sh config/server.properties 2.3.2 创建topic # 创建用于测试主题
bin/kafka-topics.sh --create \--bootstrap-server hadoop001:9092 \--replication-factor 1 --partitions 1 \--topic Hello-Kafka# 查看所有主题bin/kafka-topics.sh --list --bootstrap-server hadoop001:90922.3.3 启动消费者 启动一个控制台消费者用于观察写入情况启动命令如下 # bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning2.3.4 运行项目 此时可以看到消费者控制台输出如下这里 kafka-console-consumer 只会打印出值信息不会打印出键信息。 2.4 可能出现的问题 在这里可能出现的一个问题是生产者程序在启动后一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下此时需要对 server.properties 文件中的 listeners 配置进行更改 # hadoop001 为我启动kafka服务的主机名你可以换成自己的主机名或者ip地址
listenersPLAINTEXT://hadoop001:9092 3.发送消息 上面的示例程序调用了 send 方法发送消息后没有做任何操作在这种情况下我们没有办法知道消息发送的结果。想要知道消息发送的结果可以使用同步发送或者异步发送来实现。 3.1 同步发送 在调用 send 方法后可以接着调用 get() 方法send 方法的返回值是一个 FutureRecordMetadata对象RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下 for (int i 0; i 10; i) {try {ProducerRecordString, String record new ProducerRecord(topicName, k i, world i);/*同步发送消息*/RecordMetadata metadata producer.send(record).get();System.out.printf(topic%s, partition%d, offset%s \n,metadata.topic(), metadata.partition(), metadata.offset());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
} 此时得到的输出如下偏移量和调用次数有关所有记录都分配到了 0 分区这是因为在创建 Hello-Kafka 主题时候使用 --partitions 指定其分区数为 1即只有一个分区。 topicHello-Kafka, partition0, offset40
topicHello-Kafka, partition0, offset41
topicHello-Kafka, partition0, offset42
topicHello-Kafka, partition0, offset43
topicHello-Kafka, partition0, offset44
topicHello-Kafka, partition0, offset45
topicHello-Kafka, partition0, offset46
topicHello-Kafka, partition0, offset47
topicHello-Kafka, partition0, offset48
topicHello-Kafka, partition0, offset49 3.2 异步发送 通常我们并不关心发送成功的情况更多关注的是失败的情况因此 Kafka 提供了异步发送和回调函数。 代码如下 for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(topicName, k i, world i);/*异步发送消息并监听回调*/producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception ! null) {System.out.println(进行异常处理);} else {System.out.printf(topic%s, partition%d, offset%s \n,metadata.topic(), metadata.partition(), metadata.offset());}}});
}4.自定义分区器 Kafka 有着默认的分区机制 如果键值为 null 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上 如果键值不为 null那么 Kafka 会使用内置的散列算法对键进行散列然后分布到各个分区上。 某些情况下你可能有着自己的分区需求这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例 4.1 自定义分区器 /*** 自定义分区器*/
public class CustomPartitioner implements Partitioner {private int passLine; Overridepublic void configure(MapString, ? configs) {/*从生产者配置中获取分数线*/passLine (Integer) configs.get(pass.line);} Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {/*key 值为分数当分数大于分数线时候分配到 1 分区否则分配到 0 分区*/return (Integer) key passLine ? 1 : 0;} Overridepublic void close() {System.out.println(分区器关闭);}
}需要在创建生产者时指定分区器和分区器所需要的配置参数 public class ProducerWithPartitioner {public static void main(String[] args) {String topicName Kafka-Partitioner-Test;Properties props new Properties();props.put(bootstrap.servers, hadoop001:9092);props.put(key.serializer, org.apache.kafka.common.serialization.IntegerSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);/*传递自定义分区器*/props.put(partitioner.class, com.heibaiying.producers.partitioners.CustomPartitioner);/*传递分区器所需的参数*/props.put(pass.line, 6);ProducerInteger, String producer new KafkaProducer(props);for (int i 0; i 10; i) {String score score: i;ProducerRecordInteger, String record new ProducerRecord(topicName, i, score);/*异步发送消息*/producer.send(record, (metadata, exception) -System.out.printf(%s, partition%d, \n, score, metadata.partition()));}producer.close();}
}4.2 测试 需要创建一个至少有两个分区的主题 bin/kafka-topics.sh --create \--bootstrap-server hadoop001:9092 \--replication-factor 1 --partitions 2 \--topic Kafka-Partitioner-Test此时输入如下可以看到分数大于等于 6 分的都被分到 1 分区而小于 6 分的都被分到了 0 分区。 score:6, partition1,
score:7, partition1,
score:8, partition1,
score:9, partition1,
score:10, partition1,
score:0, partition0,
score:1, partition0,
score:2, partition0,
score:3, partition0,
score:4, partition0,
score:5, partition0,
分区器关闭5.生产者其他属性 上面生产者的创建都仅指定了服务地址键序列化器、值序列化器实际上 Kafka 的生产者还有很多可配置属性如下 1. acks acks 参数指定了必须要有多少个分区副本收到消息生产者才会认为消息写入是成功的 acks0 消息发送出去就认为已经成功了不会等待任何来自服务器的响应 acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应 acksall只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。 2. buffer.memory 设置生产者内存缓冲区的大小。 3. compression.type 默认情况下发送的消息不会被压缩。如果想要进行压缩可以配置此参数可选值有 snappygziplz4。 4. retries 发生错误后消息重发的次数。如果达到设定值生产者就会放弃重试并返回错误。 5. batch.size 当有多个消息需要被发送到同一个分区时生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小按照字节数计算。 6. linger.ms 该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。 7. clent.id 客户端 id,服务器用来识别消息的来源。 8. max.in.flight.requests.per.connection 指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高就会占用越多的内存不过也会提升吞吐量把它设置为 1 可以保证消息是按照发送的顺序写入服务器即使发生了重试。 9. timeout.ms, request.timeout.ms metadata.fetch.timeout.ms - timeout.ms 指定了 borker 等待同步副本返回消息的确认时间 - request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间 - metadata.fetch.timeout.ms 指定了生产者在获取元数据比如分区首领是谁时等待服务器返回响应的时间。 10. max.block.ms 指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满或者没有可用的元数据时这些方法会阻塞。在阻塞时间达到 max.block.ms 时生产者会抛出超时异常。 11. max.request.size 该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值也可以指单个请求里所有消息总的大小。例如假设这个值为 1000K 那么可以发送的单个最大消息为 1000K 或者生产者可以在单个请求里发送一个批次该批次包含了 1000 个消息每个消息大小为 1K。 12. receive.buffer.bytes send.buffer.byte 这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小-1 代表使用操作系统的默认值。