网站建设丿金手指15,凡科网产品矩阵,北京建设网站哪里好,优化网站结构【ES】数据同步集群3.数据同步3.1.思路分析3.1.1.同步调用3.1.2.异步通知3.1.3.监听binlog3.1.4.选择3.2.实现数据同步3.2.1.思路3.2.2.导入demo3.2.3.声明交换机、队列1#xff09;引入依赖2#xff09;声明队列交换机名称3#xff09;声明队列交换机3.2.4.发送MQ消息…
【ES】数据同步集群3.数据同步3.1.思路分析3.1.1.同步调用3.1.2.异步通知3.1.3.监听binlog3.1.4.选择3.2.实现数据同步3.2.1.思路3.2.2.导入demo3.2.3.声明交换机、队列1引入依赖2声明队列交换机名称3声明队列交换机3.2.4.发送MQ消息3.2.5.接收MQ消息4.集群4.1.搭建ES集群4.2.集群脑裂问题4.2.1.集群职责划分4.2.2.脑裂问题4.2.3.小结4.3.集群分布式存储4.3.1.分片存储测试4.3.2.分片存储原理4.4.集群分布式查询4.5.集群故障转移3.数据同步
elasticsearch中的酒店数据来自于mysql数据库因此mysql数据发生改变时elasticsearch也必须跟着改变这个就是elasticsearch与mysql之间的数据同步。 3.1.思路分析
常见的数据同步方案有三种
同步调用异步通知监听binlog 3.1.1.同步调用
方案一同步调用 基本步骤如下
hotel-demo对外提供接口用来修改elasticsearch中的数据酒店管理服务在完成数据库操作后直接调用hotel-demo提供的接口 3.1.2.异步通知
方案二异步通知 流程如下
hotel-admin对mysql数据库数据完成增、删、改后发送MQ消息hotel-demo监听MQ接收到消息后完成elasticsearch数据修改 3.1.3.监听binlog
方案三监听binlog 流程如下
给mysql开启binlog功能mysql完成增、删、改操作都会记录在binlog中hotel-demo基于canal监听binlog变化实时更新elasticsearch中的内容 3.1.4.选择
方式一同步调用
优点实现简单粗暴缺点业务耦合度高
方式二异步通知
优点低耦合实现难度一般缺点依赖mq的可靠性
方式三监听binlog
优点完全解除服务间耦合缺点开启binlog增加数据库负担、实现复杂度高 3.2.实现数据同步 3.2.1.思路
利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时要求对elasticsearch中数据也要完成相同操作。
步骤 导入课前资料提供的hotel-admin项目启动并测试酒店数据的CRUD 声明exchange、queue、RoutingKey 在hotel-admin中的增、删、改业务中完成消息发送 在hotel-demo中完成消息监听并更新elasticsearch中数据 启动并测试数据同步功能 3.2.2.导入demo
导入课前资料提供的hotel-admin项目 运行后访问 http://localhost:8099 其中包含了酒店的CRUD功能 3.2.3.声明交换机、队列
MQ结构如图 1引入依赖
在hotel-admin、hotel-demo中引入rabbitmq的依赖
!--amqp--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2声明队列交换机名称
在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts包下新建一个类MqConstants
package cn.itcast.hotel.constatnts;public class MqConstants {/*** 交换机*/public final static String HOTEL_EXCHANGE hotel.topic;/*** 监听新增和修改的队列*/public final static String HOTEL_INSERT_QUEUE hotel.insert.queue;/*** 监听删除的队列*/public final static String HOTEL_DELETE_QUEUE hotel.delete.queue;/*** 新增或修改的RoutingKey*/public final static String HOTEL_INSERT_KEY hotel.insert;/*** 删除的RoutingKey*/public final static String HOTEL_DELETE_KEY hotel.delete;
}3声明队列交换机
在hotel-demo中定义配置类声明队列、交换机
package cn.itcast.hotel.config;import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MqConfig {Beanpublic TopicExchange topicExchange(){return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);}Beanpublic Queue insertQueue(){return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);}Beanpublic Queue deleteQueue(){return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);}Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);}Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}
}3.2.4.发送MQ消息
在hotel-admin中的增、删、改业务中分别发送MQ消息 3.2.5.接收MQ消息
hotel-demo接收到MQ消息要做的事情包括
新增消息根据传递的hotel的id查询hotel信息然后新增一条数据到索引库删除消息根据传递的hotel的id删除索引库中的一条数据
1首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、删除业务
void deleteById(Long id);void insertById(Long id);2给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务
Override
public void deleteById(Long id) {try {// 1.准备RequestDeleteRequest request new DeleteRequest(hotel, id.toString());// 2.发送请求client.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}Override
public void insertById(Long id) {try {// 0.根据id查询酒店数据Hotel hotel getById(id);// 转换为文档类型HotelDoc hotelDoc new HotelDoc(hotel);// 1.准备Request对象IndexRequest request new IndexRequest(hotel).id(hotel.getId().toString());// 2.准备Json文档request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求client.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}3编写监听器
在hotel-demo中的cn.itcast.hotel.mq包新增一个类
package cn.itcast.hotel.mq;import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Component
public class HotelListener {Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务* param id 酒店id*/RabbitListener(queues MqConstants.HOTEL_INSERT_QUEUE)public void listenHotelInsertOrUpdate(Long id){hotelService.insertById(id);}/*** 监听酒店删除的业务* param id 酒店id*/RabbitListener(queues MqConstants.HOTEL_DELETE_QUEUE)public void listenHotelDelete(Long id){hotelService.deleteById(id);}
}4.集群
单机的elasticsearch做数据存储必然面临两个问题海量数据存储问题、单点故障问题。
海量数据存储问题将索引库从逻辑上拆分为N个分片shard存储到多个节点单点故障问题将分片数据在不同节点备份replica
ES集群相关概念: 集群cluster一组拥有共同的 cluster name 的 节点。 节点node) 集群中的一个 Elasticearch 实例 分片shard索引可以被拆分为不同的部分进行存储称为分片。在集群环境下一个索引的不同分片可以拆分到不同的节点中 解决问题数据量太大单点存储量有限的问题。 此处我们把数据分成3片shard0、shard1、shard2 主分片Primary shard相对于副本分片的定义。 副本分片Replica shard每个主分片可以有一个或者多个副本数据和主分片一样。
数据备份可以保证高可用但是每个分片备份一份所需要的节点数量就会翻一倍成本实在是太高了
为了在高可用和成本间寻求平衡我们可以这样做
首先对数据分片存储到不同节点然后对每个分片进行备份放到对方节点完成互相备份
这样可以大大减少所需要的服务节点数量如图我们以3分片每个分片备份一份为例 现在每个分片都有1个备份存储在3个节点
node0保存了分片0和1node1保存了分片0和2node2保存了分片1和2 4.1.搭建ES集群
参考课前资料的文档 其中的第四章节 4.2.集群脑裂问题 4.2.1.集群职责划分
elasticsearch中集群节点有不同的职责划分 默认情况下集群中的任何一个节点都同时具备上述四种角色。
但是真实的集群一定要将集群职责分离
master节点对CPU要求高但是内存要求第data节点对CPU和内存要求都高coordinating节点对网络带宽、CPU要求高
职责分离可以让我们根据不同节点的需求分配不同的硬件去部署。而且避免业务之间的互相干扰。
一个典型的es集群职责划分如图 4.2.2.脑裂问题
脑裂是因为集群中的节点失联导致的。
例如一个集群中主节点与其它节点失联 此时node2和node3认为node1宕机就会重新选主 当node3当选后集群继续对外提供服务node2和node3自成集群node1自成集群两个集群数据不同步出现数据差异。
当网络恢复后因为集群中有两个master节点集群状态的不一致出现脑裂的情况 解决脑裂的方案是要求选票超过 ( eligible节点数量 1 / 2 才能当选为主因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes在es7.0以后已经成为默认配置因此一般不会发生脑裂问题
例如3个节点形成的集群选票必须超过 3 1 / 2 也就是2票。node3得到node2和node3的选票当选为主。node1只有自己1票没有当选。集群中依然只有1个主节点没有出现脑裂。 4.2.3.小结
master eligible节点的作用是什么
参与集群选主主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
data节点的作用是什么
数据的CRUD
coordinator节点的作用是什么 路由请求到其它节点 合并查询到的结果返回给用户 4.3.集群分布式存储
当新增文档时应该保存到不同分片保证数据均衡那么coordinating node如何确定数据该存储到哪个分片呢 4.3.1.分片存储测试
插入三条数据 测试可以看到三条数据分别在不同分片 结果 4.3.2.分片存储原理
elasticsearch会通过hash算法来计算文档应该存储到哪个分片 说明
_routing默认是文档的id算法与分片数量有关因此索引库一旦创建分片数量不能修改
新增文档的流程如下 解读
1新增一个id1的文档2对id做hash运算假如得到的是2则应该存储到shard-23shard-2的主分片在node3节点将数据路由到node34保存文档5同步给shard-2的副本replica-2在node2节点6返回结果给coordinating-node节点 4.4.集群分布式查询
elasticsearch的查询分成两个阶段 scatter phase分散阶段coordinating node会把请求分发到每一个分片 gather phase聚集阶段coordinating node汇总data node的搜索结果并处理为最终结果集返回给用户 4.5.集群故障转移
集群的master节点会监控集群中的节点状态如果发现有节点宕机会立即将宕机节点的分片数据迁移到其它节点确保数据安全这个叫做故障转移。
1例如一个集群结构如图 现在node1是主节点其它两个节点是从节点。
2突然node1发生了故障 宕机后的第一件事需要重新选主例如选中了node2 node2成为主节点后会检测集群监控状态发现shard-1、shard-0没有副本节点。因此需要将node1上的数据迁移到node2、node3 学习笔记 from 黑马程序员 By – Suki 2023/4/9