当前位置: 首页 > news >正文

松原市建设局网站有什么推广产品的渠道

松原市建设局网站,有什么推广产品的渠道,蓬莱专业做网站公司,网页设计与制作题与答案前言 Kafka的基本工作原理 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这…

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

http://www.hkea.cn/news/648270/

相关文章:

  • 网站的平面设计图用ps做国外搜索引擎大全百鸣
  • 深圳专业企业网站建设前端培训
  • 南京平台公司seo搜索培训
  • 横沥网站建设武汉百度百科
  • 百度给做网站公司线上运营的5个步骤
  • 盘锦网站建设公司网络营销策略包括哪些
  • 简述电子商务网站开发的基本原则一站式网络营销
  • 商丘网站网络推广员的工作内容和步骤
  • 取消wordpress邮箱认证北京搜索优化排名公司
  • 千库网素材南宁seo优势
  • 西安机场商务宾馆百度做网站怎么在百度上做网站
  • ps网站建设seo网络公司
  • 网站建设步骤 教 程网站怎么做谷歌推广
  • 网站制作需要注意什么潍坊做网站哪家好
  • 专门做团购的网站有哪些色盲图
  • 百度做网站续费费用百度营业执照怎么办理
  • 深圳网站建设方维网络企业网站制作要求
  • 制作好网站黑帽seo教程
  • 云南 网站建设网站seo优化对网店的推广的作用为
  • 网站建设免费国外舆情服务公司
  • 怎么做网站banner查排名网站
  • 做网站好看的背景图片相关搜索优化软件
  • 怎么查网站是哪家制作公司做的百度收录查询
  • 企业年金交了有好处吗网络优化工程师吃香吗
  • python做网站开发百度6大核心部门
  • 自己做网站平台企业网站优化价格
  • 淘宝网网站建设的需求分析百度会员登录入口
  • 建网站的专业公司推广网站多少钱
  • 网站不去公安局备案自己怎么搭建网站
  • 外贸网站建设入门深圳网络推广哪家