为什么要在南极建站,青岛北方现货交易平台代理,价格信息网,服务器建设网站软件下载1、官网下载最新版本的kafka#xff0c;里面已经集成zookeeper。直接解压到D盘
2、配置文件修改#xff0c;config目录下面的zookeeper.properties. 设置zookeeper数据目录
dataDirD:/kafka_2.12-3.6.0/tmp/zookeeper 3、修改kafka的配置文件server.properties. 主要修…1、官网下载最新版本的kafka里面已经集成zookeeper。直接解压到D盘
2、配置文件修改config目录下面的zookeeper.properties. 设置zookeeper数据目录
dataDirD:/kafka_2.12-3.6.0/tmp/zookeeper 3、修改kafka的配置文件server.properties. 主要修改内容如下
zookeeper.connectlocalhost:2181
log.dirsD:\\kafka_2.12-3.6.0\\logs
listenersPLAINTEXT://localhost:9092
其他默认即可。
4、修改完成后进入bin目录启动zookeeper和kafka命令如下
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
5、命令行创建topic命令如下
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic hello 6、创建生产者和消费者测试。生产者输入消息消费者就会收到相应的消息了
kafka-console-producer.bat --broker-list localhost:9092 --topic hello kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic hello--from-beginning 7、创建springboot工程测试
引入依赖
dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency 8、yml文件配置kafka
spring: kafka: bootstrap-servers: localhost:9092 producer: acks: 1 retries: 3 batch-size: 16384 properties: linger: ms: 0 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: helloGroup enable-auto-commit: false auto-commit-interval: 1000 auto-offset-reset: latest properties: request: timeout: ms: 18000 session: timeout: ms: 12000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 9、使用springboot KafkaTemplate发送消息
RequestMapping(value /sendMessage, method RequestMethod.GET) public String sendMessage(String message) { kafkaTemplate.send(hello, message); return 发送成功~; }
10、消息消费 KafkaListener(topics hello) public void receiveMessage(ConsumerRecordString, String record) { String topic record.topic(); long offset record.offset(); int partition record.partition(); String message record.value(); System.out.println(topic topic); System.out.println(offset offset); System.out.println(partition partition); System.out.println(message message); }