浅谈电子商务网站建设与管理论文,网站备案流程图,企业网站建设情况汇报,免费接收邮箱验证码平台Kafka在Java项目中的应用
Docker 安装Kafka
一.首先需要安装docker,可看这篇文章安装docker
二.拉取zookeeper和KafKa镜像
docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper
三.启动zookeeper…Kafka在Java项目中的应用
Docker 安装Kafka
一.首先需要安装docker,可看这篇文章安装docker
二.拉取zookeeper和KafKa镜像
docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper
三.启动zookeeper、kafka组件
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECTzookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAMElocalhost --env KAFKA_ADVERTISED_PORT9092 wurstmeister/kafka启动成功界面如下,status即为running(运行中)
四.创建Springboot项目
4.1 添加依赖
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scopeexclusionsexclusiongroupIdorg.junit.vintage/groupIdartifactIdjunit-vintage-engine/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependency/dependencies4.2 application.yml文件
server:port: 9090
spring:kafka:bootstrap-servers: localhost:9092consumer:# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerretries: 3 # 重试次数
kafka:topic:my-topic: my-topicmy-topic2: my-topic2
4.3 创建实体类Book
public class Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id id;this.name name;}public Long getId() {return id;}public void setId(Long id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}Overridepublic String toString() {return Book{ id id , name name \ };}
}4.4 配置KafKa信息
Configuration
public class KafkaConfig {Value(${kafka.topic.my-topic})String myTopic;Value(${kafka.topic.my-topic2})String myTopic2;/*** JSON消息转换器*/Beanpublic RecordMessageConverter jsonConverter() {return new StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic如果 topic 已存在则会忽略。*/Beanpublic NewTopic myTopic() {return new NewTopic(myTopic, 2, (short) 1);}Beanpublic NewTopic myTopic2() {return new NewTopic(myTopic2, 1, (short) 1);}
}4.5 controller代码
RestController
RequestMapping(value /book)
public class BookController {Value(${kafka.topic.my-topic})String myTopic;Value(${kafka.topic.my-topic2})String myTopic2;BookProducerService producer;private AtomicLong atomicLong new AtomicLong();BookController(BookProducerService producer) {this.producer producer;}GetMapping(/send)public String sendMessageToKafkaTopic(RequestParam(name) String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));return name : 消息已经发送!;}
}4.6 book 的生成者业务
Service
public class BookProducerService {private static final Logger logger LoggerFactory.getLogger(BookProducerService.class);private final KafkaTemplateString, Object kafkaTemplate;//通过构造方法进行注入public BookProducerService(KafkaTemplateString, Object kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}public void sendMessage(String topic, Object o) {ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, o);future.addCallback(result - logger.info(生产者成功发送消息到topic:{} partition:{}的消息,result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex - logger.error(生产者发送消失败原因{}, ex.getMessage()));}}4.7 book的消费者业务
Service
public class BookConsumerService {Value(${kafka.topic.my-topic})private String myTopic;Value(${kafka.topic.my-topic2})private String myTopic2;private final Logger logger LoggerFactory.getLogger(BookProducerService.class);private final ObjectMapper objectMapper new ObjectMapper();KafkaListener(topics {${kafka.topic.my-topic}}, groupId group1)public void consumeMessage(ConsumerRecordString, String bookConsumerRecord) {try {Book book objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info(消费者消费topic:{} partition:{}的消息 - {}, bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}KafkaListener(topics {${kafka.topic.my-topic2}}, groupId group2)public void consumeMessage2(Book book,ConsumerRecordString,String bookConsumerRecord) throws JsonProcessingException {Book value objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info(消费者消费topic:{} partition:{}的消息 - {}, bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());logger.info(消费者消费{}的消息 - {}, myTopic2, book.toString());}
}代码整体目录如下 4.8 启动成功界面 4.9 浏览器访问 4.10 控制台显示 至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!