简单的手机网站模板下载安装,专业做招聘网站,学校网站制作模板,电商主图设计网站Disruptor是LMAX公司开源的高性能内存消息队列#xff0c;单线程处理能力可达600w订单/秒。本文将使用该框架实现生产-消费者模式。一、框架的maven依赖 !-- https://mvnrepository.com/artifact/com.lmax/disruptor --dependencygroupIdcom.lmax… Disruptor是LMAX公司开源的高性能内存消息队列单线程处理能力可达600w订单/秒。本文将使用该框架实现生产-消费者模式。一、框架的maven依赖 !-- https://mvnrepository.com/artifact/com.lmax/disruptor --dependencygroupIdcom.lmax/groupIdartifactIddisruptor/artifactIdversion3.4.2/version/dependency二、消息事件
package com.monika.main.system.mq;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;import java.util.EventObject;/*** author:whh* date: 2024-12-04 20:27* p/p*/
public class MsgEvent {private String data;public String getData() {return data;}public void setData(String data) {this.data data;}
}
三、消息事件处理器
package com.monika.main.system.mq;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;/*** author:whh* date: 2024-12-04 22:28* p* * * /p*/
public class MsgEventHandler implements EventHandlerMsgEvent, WorkHandlerMsgEvent {private String name;public MsgEventHandler(String name) {this.name name;}Overridepublic void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {System.out.println(name-----start-----sequence);Thread.sleep(1000*10);System.out.println(ThreadName: Thread.currentThread().getName());System.out.println(event.getData() end seq: sequence);}Overridepublic void onEvent(MsgEvent event) throws Exception {System.out.println(name-----start-----);Thread.sleep(1000*10);System.out.println(ThreadName: Thread.currentThread().getName());System.out.println(event.getData());System.out.println(name-----end-----);}
}
该消息处理器实现了两个接口EventHandler接口该接口实现统一消费一个消息会被所有消费者消费WorkHandler接口该接口实现分组消费一个消息只能被一个消费者消费多消费者轮询处理。
四、Disruptor配置
package com.monika.main.system.mq;import cn.hutool.core.thread.NamedThreadFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** author:whh* date: 2024-12-04 20:33* p/p*/Configuration
public class RingBufferConfig {Beanpublic RingBufferMsgEvent ringBuffer(){NamedThreadFactory threadFactory new NamedThreadFactory(MsgEvent-,true);EventFactoryMsgEvent eventFactory new EventFactoryMsgEvent() {Overridepublic MsgEvent newInstance() {return new MsgEvent();}};DisruptorMsgEvent disruptor new Disruptor(eventFactory,1024, threadFactory);//定义两个消费者MsgEventHandler m1 new MsgEventHandler(m1);MsgEventHandler m2 new MsgEventHandler(m2);//disruptor.handleEventsWith(m1,m2); //统一消费一个消息会被所有消费者消费disruptor.handleEventsWithWorkerPool(m1,m2);//分组消费一个消息只能被一个消费者消费多消费者轮询处理//disruptor.handleEventsWith(m1).then(m2); //顺序消费1、3先并行处理然后2再处理disruptor.start();//配置多消费者每个消费者将有单独的线程处理return disruptor.getRingBuffer();}
}
五、消息生产者MsgPublish
package com.monika.main.system.mq;import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** author:whh* date: 2024-12-04 20:45* p/p*/
Component
public class MsgPublish {public static void publish(String message){/*** 返回布尔值表示事件是否发布成功如果失败可根据此值进行业务逻辑判断*/boolean b ringBuffer.tryPublishEvent(TRANSLATOR, message);}private static final EventTranslatorOneArgMsgEvent,String TRANSLATOR new EventTranslatorOneArgMsgEvent,String() {Overridepublic void translateTo(MsgEvent event, long sequence, String arg0) {event.setData(arg0);}};private static RingBufferMsgEvent ringBuffer;Autowiredpublic void setRingBuffer(RingBufferMsgEvent ringBuffer) {MsgPublish.ringBuffer ringBuffer;}
}
六、测试
本次测试使用的是分组模式可以发现一个消息只能被一个消费者消费且每个消费者都由单独的线程处理。
七、总结
本次只是简单的应用disruptor框架实现生产-消费者模式对于disruptor的原理主要是RingBuffer环形数组这个咱们后续再进一步研究。