前端网站优化,免费个人主页网站,什么网站可以做音乐相册,做卖衣服网站源代码目录 前言封装数据库封装内存操作内存的设计思想 应答模式 代码实现测试代码 前言
我们之前已经将 数据库 的操作 和文件的操作 都完成了, 但是对于上层调用来说, 并不关心是于数据库中存储数据还是往文件中存储数据, 因此 我们提供一个类, 封装一下 上述俩个类中的操作, 并将… 目录 前言封装数据库封装内存操作内存的设计思想 应答模式 代码实现测试代码 前言
我们之前已经将 数据库 的操作 和文件的操作 都完成了, 但是对于上层调用来说, 并不关心是于数据库中存储数据还是往文件中存储数据, 因此 我们提供一个类, 封装一下 上述俩个类中的操作, 并将这个类 提供给上层调用
封装数据库
package com.example.demo.mqServer.dataCenter;import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.Binding;
import com.example.demo.mqServer.core.Exchange;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;/*
* 管理所有硬盘上的数据
* 1: 数据库: 交换机 绑定 队列
* 2: 数据文件 : 消息
* 上层逻辑如果需要操作硬盘, 统一通过这个类来使用 , 上层代码不关系当前数据是存储在数据库还是文件中的*/
public class DiskDataCenter {
// 用来管理数据库中的数据private DataBaseManager dataBaseManager new DataBaseManager();
// 用来管理数据文件中的数据private MessageFileManager messageFileManager new MessageFileManager();
// 对上述实例进行初始化public void init(){dataBaseManager.init();messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchange(){return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue){dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName){dataBaseManager.deleteQueue(queueName);}public ListMSGQueue selectAllQueue(){return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBinding(){return dataBaseManager.selectAllBindings();}// 封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue, message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue, message);// 对于删除消息, 我们要查看是否需要垃圾回收if (messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}}
封装内存操作
上面我们完成的都是往硬盘上面保存数据的操作, 无论是 往数据库中添加 队列 交换机 类, 还是往文件存储消息 都是针对硬盘上的操作 ,所以我们现在写关于内存的操作
内存的设计思想
我们关于交换机和队列, 我们都使用哈希表来存储 key 是各自的名字也是主键, value 是对应的 交换机和队列 绑定:使用嵌套的哈希表来存储 , 第一个key是 交换机名称, 第二个key 是队列名称 消息: 使用哈希表来存储, key 是消息ID , value 是消息对象(每一个消息都是一个哈希表) 队列和消息之前的关系: 使用嵌套的哈希表来存储 , key是队列名, value是一个LinkedList 里面存放的消息 还有关于确认应答: 我们采用嵌套的哈希表来存储 , key是队列名, value 是哈希表,( key是消息ID, value是消息)
应答模式
我们这里实现的MQ消息队列, 支持俩种应答模式 第一种: 自动应答, 也就是说当消费者取走了元素, 就当做这个消息被应答了, 此时这个消息就可以被当做成了垃圾了 第二种: 确认应答 : 消费者取走了元素, 还不能被当做是应答了, 必须消费者主动调用一个确认应答方法, 此时才能认为这个消息无用了, 可以删除了
代码实现
package com.example.demo.mqServer.dataCenter;import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.Binding;
import com.example.demo.mqServer.core.Exchange;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;// 使用这个类来统一管理内存中的数据
// 处理各种映射关系
// 该类提供的方法, 可能在多线程情况下处理, 所以使用线程安全的哈希表
public class MemoryDataCenter {// 根据交换机名称来获取交换机对象 , key 是 exchangeName, value 是 Exchange 对象private ConcurrentHashMapString , Exchange exchangeMap new ConcurrentHashMap();// 根据队列名来获取队列对象 , key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMapString , MSGQueue queueMap new ConcurrentHashMap();//根据交换机名, 和队列名 来获取 绑定对象 , 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivate ConcurrentHashMapString , ConcurrentHashMapString , Binding bindingsMap new ConcurrentHashMap();//根据消息名称 来获取 消息对象, key 是 messageId, value 是 Message 对象private ConcurrentHashMapString , Message messageMap new ConcurrentHashMap();// 根据队列名称, 来获取消息列表 , key 是 队列名称, value 是 Message 对象private ConcurrentHashMapString, LinkedListMessage queueMessageMap new ConcurrentHashMap();// 手动应答 , 第一个 key 是 queueName, 第二个 key 是 messageId ,private ConcurrentHashMapString, ConcurrentHashMapString, Message queueMessageWaitAckMap new ConcurrentHashMap();public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(),exchange);System.out.println([MemoryDataCenter] 新交换机添加成功! exchangeName exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println([MemoryDataCenter] 交换机删除成功! exchangeName exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(),queue);System.out.println([MemoryDataCenter] 新队列添加成功! queueName queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println([MemoryDataCenter] 队列删除成功! queueName queueName);}public void insertBinding(Binding binding) throws MqException {ConcurrentHashMapString , Binding bindingMap bindingsMap.get(binding.getExchangeName());if (bindingMap null){bindingMap new ConcurrentHashMap();bindingsMap.put(binding.getExchangeName(), bindingMap);}// 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.synchronized (binding){if (bindingMap.get(binding.getMsgQueueName()) ! null) {throw new MqException([MemoryDataCenter] 绑定已经存在! exchangeName binding.getExchangeName() , queueName binding.getMsgQueueName());}bindingMap.put(binding.getMsgQueueName(), binding);}System.out.println([MemoryDataCenter] 新绑定添加成功! exchangeName binding.getExchangeName() , queueName binding.getMsgQueueName());}// 获取绑定, 写两个版本:// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding// 2. 根据 exchangeName 获取到所有的 Bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMapString , Binding bindingMap bindingsMap.get(exchangeName);if (bindingMap null){return null;}return bindingMap.get(queueName);}public ConcurrentHashMapString, Binding getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString , Binding bindingMap bindingsMap.get(binding.getExchangeName());if (bindingMap null){// 该交换机没有绑定任何队列. 报错.throw new MqException([MemoryDataCenter] 绑定不存在! exchangeName binding.getExchangeName() , queueName binding.getMsgQueueName());}bindingMap.remove(binding.getMsgQueueName());System.out.println([MemoryDataCenter] 绑定删除成功! exchangeName binding.getExchangeName() , queueName binding.getMsgQueueName());}// 添加消息public void addMessage(Message message) {messageMap.put(message.getMessageId(),message);}// 根据 id 查询消息public Message getMessage(String messageId) {return messageMap.get(messageId);}// 根据 id 删除消息public void removeMessage(String messageId) {messageMap.remove(messageId);}// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中.// 先根据队列的名字, 找到该队列对应的消息链表.LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), k - new LinkedList());// 再把数据加到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body)addMessage(message);System.out.println([MemoryDataCenter] 消息被投递到队列中! messageId message.getMessageId());}// 从队列中取消息public Message pollMessage(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表.LinkedListMessage messages queueMessageMap.get(queueName);if (messages null) {return null;}synchronized (messages) {// 如果没找到, 说明队列中没有任何消息.if (messages.size() 0) {return null;}// 链表中有元素, 就进行头删.Message currentMessage messages.remove(0);System.out.println([MemoryDataCenter] 消息从队列中取出! messageId currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数public int getMessageCount(String queueName) {LinkedListMessage messages queueMessageMap.get(queueName);if (messages null) {// 队列中没有消息return 0;}synchronized (messages) {return messages.size();}}// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());messageHashMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 消息进入待确认队列! messageId message.getMessageId());}// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());if (messageHashMap null)return;messageHashMap.remove(messageId);}// 获取指定的未确认的消息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());if (messageHashMap null)return null;return messageHashMap.get(messageId);}// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {ListExchange exchanges diskDataCenter.selectAllExchange();exchangeMap.clear();for (Exchange exchange:exchanges) {exchangeMap.put(exchange.getName(),exchange);}ListMSGQueue queues diskDataCenter.selectAllQueue();queueMap.clear();for (MSGQueue q :queues) {queueMap.put(q.getName(),q);}ListBinding bindings diskDataCenter.selectAllBinding();bindingsMap.clear();for (Binding binding : bindings) {ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());bindingMap.put(binding.getMsgQueueName(), binding);}// 4. 恢复所有的消息数据// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息.messageMap.clear();queueMessageMap.clear();for (MSGQueue queue : queues) {LinkedListMessage messages diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);for (Message message : messages) {messageMap.put(message.getMessageId(), message);}}// 注意!! 针对 未确认的消息 这部分内存中的数据, 不需要从硬盘恢复. 之前考虑硬盘存储的时候, 也没设定这一块.// 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 未被确认的消息, 就恢复成 未被取走的消息 .// 这个消息在硬盘上存储的时候, 就是当做 未被取走}}
测试代码
package com.example.demo.mqServer.dataCenter;import com.example.demo.Common.MqException;
import com.example.demo.MqApplication;
import com.example.demo.mqServer.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;import static org.junit.jupiter.api.Assertions.*;
SpringBootTest
class MemoryDataCenterTest {private MemoryDataCenter memoryDataCenter null;BeforeEachpublic void setUp() {memoryDataCenter new MemoryDataCenter();}AfterEachpublic void tearDown() {memoryDataCenter null;}// 创建一个测试交换机private Exchange createTestExchange(String exchangeName) {Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.direct);exchange.setAutoDelete(false);exchange.setDurable(true);return exchange;}// 创建一个测试队列private MSGQueue createTestQueue(String queueName) {MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);return queue;}// 针对交换机进行测试Testpublic void testExchange() {// 1. 先构造一个交换机并插入.Exchange expectedExchange createTestExchange(testExchange);memoryDataCenter.insertExchange(expectedExchange);// 2. 查询出这个交换机, 比较结果是否一致. 此处直接比较这俩引用指向同一个对象.Exchange actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertEquals(expectedExchange, actualExchange);// 3. 删除这个交换机memoryDataCenter.deleteExchange(testExchange);// 4. 再查一次, 看是否就查不到了actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertNull(actualExchange);}// 针对队列进行测试Testpublic void testQueue() {// 1. 构造一个队列, 并插入MSGQueue expectedQueue createTestQueue(testQueue);memoryDataCenter.insertQueue(expectedQueue);// 2. 查询这个队列, 并比较MSGQueue actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertEquals(expectedQueue, actualQueue);// 3. 删除这个队列memoryDataCenter.deleteQueue(testQueue);// 4. 再次查询队列, 看是否能查到actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertNull(actualQueue);}// 针对绑定进行测试Testpublic void testBinding() throws MqException {Binding expectedBinding new Binding();expectedBinding.setExchangeName(testExchange);expectedBinding.setMsgQueueName(testQueue);expectedBinding.setBindingKey(testBindingKey);memoryDataCenter.insertBinding(expectedBinding);Binding actualBinding memoryDataCenter.getBinding(testExchange, testQueue);Assertions.assertEquals(expectedBinding, actualBinding);ConcurrentHashMapString, Binding bindingMap memoryDataCenter.getBindings(testExchange);Assertions.assertEquals(1, bindingMap.size());Assertions.assertEquals(expectedBinding, bindingMap.get(testQueue));memoryDataCenter.deleteBinding(expectedBinding);actualBinding memoryDataCenter.getBinding(testExchange, testQueue);Assertions.assertNull(actualBinding);}// 测试消息的增删查private Message createTestMessage(String content) {Message message Message.createMessageWithId(testRoutingKey, null, content.getBytes());return message;}Testpublic void testMessage() {Message expectedMessage createTestMessage(testMessage);memoryDataCenter.addMessage(expectedMessage);Message actualMessage memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage, actualMessage);memoryDataCenter.removeMessage(expectedMessage.getMessageId());actualMessage memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}Testpublic void testSendMessage() {// 1. 创建一个队列, 创建 10 条消息, 把这些消息都插入队列中.MSGQueue queue createTestQueue(testQueue);ListMessage expectedMessages new ArrayList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);memoryDataCenter.sendMessage(queue, message);expectedMessages.add(message);}// 2. 从队列中取出这些消息.ListMessage actualMessages new ArrayList();while (true) {Message message memoryDataCenter.pollMessage(testQueue);if (message null) {break;}actualMessages.add(message);}// 3. 比较取出的消息和之前的消息是否一致.Assertions.assertEquals(expectedMessages.size(), actualMessages.size());for (int i 0; i expectedMessages.size(); i) {Assertions.assertEquals(expectedMessages.get(i), actualMessages.get(i));}}Testpublic void testMessageWaitAck() {Message expectedMessage createTestMessage(expectedMessage);memoryDataCenter.addMessageWaitAck(testQueue, expectedMessage);Message actualMessage memoryDataCenter.getMessageWaitAck(testQueue, expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage, actualMessage);memoryDataCenter.removeMessageWaitAck(testQueue, expectedMessage.getMessageId());actualMessage memoryDataCenter.getMessageWaitAck(testQueue, expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}Testpublic void testRecovery() throws IOException, MqException, ClassNotFoundException {// 由于后续需要进行数据库操作, 依赖 MyBatis. 就需要先启动 SpringApplication, 这样才能进行后续的数据库操作.MqApplication.context SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange expectedExchange createTestExchange(testExchange);diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue createTestQueue(testQueue);diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding new Binding();expectedBinding.setExchangeName(testExchange);expectedBinding.setMsgQueueName(testQueue);expectedBinding.setBindingKey(testBindingKey);diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage createTestMessage(testContent);diskDataCenter.sendMessage(expectedQueue, expectedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchange memoryDataCenter.getExchange(testExchange);Assertions.assertEquals(expectedExchange.getName(), actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(), actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(), actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(), actualExchange.isAutoDelete());MSGQueue actualQueue memoryDataCenter.getQueue(testQueue);Assertions.assertEquals(expectedQueue.getName(), actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(), actualQueue.isDurable());Assertions.assertEquals(expectedQueue.isAutoDelete(), actualQueue.isAutoDelete());Assertions.assertEquals(expectedQueue.isExclusive(), actualQueue.isExclusive());Binding actualBinding memoryDataCenter.getBinding(testExchange, testQueue);Assertions.assertEquals(expectedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getMsgQueueName(), actualBinding.getMsgQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(), actualBinding.getBindingKey());Message actualMessage memoryDataCenter.pollMessage(testQueue);Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());// 4. 清理硬盘的数据, 把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录).MqApplication.context.close();File dataDir new File(./data);FileUtils.deleteDirectory(dataDir);}
}