企业网站设计推广方案,网站建设 调研报告,太原建设北路小学网站,做直播网站需要学什么软件有哪些目录 一、生产者提高吞吐量参数设置二、产者提高吞吐量代码示例 一、生产者提高吞吐量参数设置
batch.size#xff1a;设置批次大小#xff0c;默认16klinger.ms#xff1a;设置等待时间#xff0c;修改为5-100msbuffer.memory#xff1a;设置缓冲区大小#xff0c; 默认… 目录 一、生产者提高吞吐量参数设置二、产者提高吞吐量代码示例 一、生产者提高吞吐量参数设置
batch.size设置批次大小默认16klinger.ms设置等待时间修改为5-100msbuffer.memory设置缓冲区大小 默认 32Mcompression.type设置压缩默认 none可配置值 gzip、snappy、lz4 和 zstd
二、产者提高吞吐量代码示例 代码示例 package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/*** 生产者提高吞吐量* 1、设置批次大小batch.size 默认 16K* 2、设置等待时间linger.ms 默认 0* 3、设置缓冲区大小buffer.memory 默认 32M* 4、设置压缩 compression.type 默认 none可配置值 gzip、snappy、lz4 和 zstd* */
public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties new Properties();//2、给 kafka 配置对象添加配置信息bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092);//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());/*** 4、提高吞吐量* *///设置缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//设置批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//设置等待时间 linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);//设置压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,snappy);//5、创建生产者KafkaProducerString, String kafkaProducer new KafkaProducer(properties);//6、调用 send 方法,发送消息for (int i 0; i 5; i) {kafkaProducer.send(new ProducerRecord(news, hello kafka i), new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null){System.out.println(主题 metadata.topic() 分区 metadata.partition());}}});Thread.sleep(2);}//7、关闭资源kafkaProducer.close();}
} 在三台服务器上开启 Kafka 消费者 [rootlocalhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.29:9092 --topic news在 IDEA 中执行代码观察 三台服务器控制台中是否接收到消息如下图