企业网站推广渠道有哪些,举例说明什么是网络营销,成都产品包装设计,福州seo公司技术带着疑问学源码#xff0c;第七篇#xff1a;Elasticsearch 分片恢复分析 代码分析基于#xff1a;https://github.com/jiankunking/elasticsearch Elasticsearch 8.0.0-SNAPSHOT 目的
在看源码之前先梳理一下#xff0c;自己对于分片恢复的疑问点#xff1a;
网上对于E… 带着疑问学源码第七篇Elasticsearch 分片恢复分析 代码分析基于https://github.com/jiankunking/elasticsearch Elasticsearch 8.0.0-SNAPSHOT 目的
在看源码之前先梳理一下自己对于分片恢复的疑问点
网上对于ElasticSearch分片恢复的逻辑说法一抓一把网上说的对不对新版本中有没有更新在分片恢复的时候如果收到Api _forcemerge请求这时候会如何处理?(因为副本恢复的第一节点是复制segment文件) 这部分等看/_forcemerge api的时候再解答一下。 分片恢复的第二阶段是同步translog,这一步会不会加锁不加锁的话如何确保是同步完成了 如果说看源码有捷径的话那么找到网上一篇写的比较权威的源码分析文章跟着看那不失为一种好方法。 下面源码分析部分将参考腾讯云的:Elasticsearch 底层系列之分片恢复解析一边参考一边印证。 源码分析
目标节点请求恢复
先找到分片恢复的入口:IndicesClusterStateService.createOrUpdateShards
在这里会判断本地节点是否在routingNodes中如果在说明本地节点有分片创建或更新的需求否则跳过。
private void createOrUpdateShards(final ClusterState state) {// 节点到索引分片的映射关系主要用于分片分配、均衡决策// 具体的内容可以看下:https://jiankunking.com/elasticsearch-cluster-state.htmlRoutingNode localRoutingNode state.getRoutingNodes().node(state.nodes().getLocalNodeId());if (localRoutingNode null) {return;}DiscoveryNodes nodes state.nodes();RoutingTable routingTable state.routingTable();for (final ShardRouting shardRouting : localRoutingNode) {ShardId shardId shardRouting.shardId();// failedShardsCache:https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L116// 恢复过程中失败的碎片列表我们跟踪这些碎片以防止在每次集群状态更新时重复恢复这些碎片if (failedShardsCache.containsKey(shardId) false) {AllocatedIndex? extends Shard indexService indicesService.indexService(shardId.getIndex());assert indexService ! null : index shardId.getIndex() should have been created by createIndices;Shard shard indexService.getShardOrNull(shardId.id());if (shard null) { // shard不存在则需创建assert shardRouting.initializing() : shardRouting should have been removed by failMissingShards;createShard(nodes, routingTable, shardRouting, state);} else { // 存在则更新updateShard(nodes, shardRouting, shard, routingTable, state);}}}}副本分片恢复走的是createShard分支在该方法中首先获取shardRouting的类型如果恢复类型为PEER说明该分片需要从远端获取则需要找到源节点然后调用IndicesService.createShard
RecoverySource的Type有以下几种
EMPTY_STORE,
EXISTING_STORE,//主分片本地恢复
PEER,//副分片从远处主分片恢复
SNAPSHOT,//从快照恢复
LOCAL_SHARDS//从本节点其它分片恢复(shrink时)createShard代码如下:
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {assert shardRouting.initializing() : only allow shard creation for initializing shard but was shardRouting;DiscoveryNode sourceNode null;// 如果恢复方式是peer则会找到shard所在的源节点进行恢复if (shardRouting.recoverySource().getType() Type.PEER) {sourceNode findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);if (sourceNode null) {logger.trace(ignoring initializing shard {} - no source node can be found., shardRouting.shardId());return;}}try {final long primaryTerm state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());logger.debug({} creating shard with primary term [{}], shardRouting.shardId(), primaryTerm);indicesService.createShard(shardRouting,recoveryTargetService,new RecoveryListener(shardRouting, primaryTerm),repositoriesService,failedShardHandler,this::updateGlobalCheckpointForShard,retentionLeaseSyncer,nodes.getLocalNode(),sourceNode);} catch (Exception e) {failAndRemoveShard(shardRouting, true, failed to create shard, e, state);}}/*** Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard* routing to *require* peer recovery, use {link ShardRouting#recoverySource()} to check if its needed or not.*/private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,ShardRouting shardRouting) {DiscoveryNode sourceNode null;if (!shardRouting.primary()) {ShardRouting primary routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();// only recover from started primary, if we cant find one, we will do it next roundif (primary.active()) {// 找到primary shard所在节点sourceNode nodes.get(primary.currentNodeId());if (sourceNode null) {logger.trace(cant find replica source node because primary shard {} is assigned to an unknown node., primary);}} else {logger.trace(cant find replica source node because primary shard {} is not active., primary);}} else if (shardRouting.relocatingNodeId() ! null) {// 找到搬迁的源节点sourceNode nodes.get(shardRouting.relocatingNodeId());if (sourceNode null) {logger.trace(cant find relocation source node for shard {} because it is assigned to an unknown node [{}].,shardRouting.shardId(), shardRouting.relocatingNodeId());}} else {throw new IllegalStateException(trying to find source node for peer recovery when routing state means no peer recovery: shardRouting);}return sourceNode;}源节点的确定分两种情况如果当前shard本身不是primary shard则源节点为primary shard所在节点否则如果当前shard正在搬迁中(从其他节点搬迁到本节点)则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard在该方法中调用方法IndexShard.startRecovery开始恢复。
public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,ConsumerMappingMetadata mappingUpdateConsumer,IndicesService indicesService) {// TODO: Create a proper object to encapsulate the recovery context// all of the current methods here follow a pattern of:// resolve context which isnt really dependent on the local shards and then async// call some external method with this pointer.// with a proper recovery context object we can simply change this to:// startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {// markAsRecovery(from source.getShortDescription(), recoveryState);// threadPool.generic().execute() {// onFailure () { listener.failure() };// doRun() {// if (source.recover(this)) {// recoveryListener.onRecoveryDone(recoveryState);// }// }// }}// }assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());switch (recoveryState.getRecoverySource().getType()) {case EMPTY_STORE:case EXISTING_STORE:executeRecovery(from store, recoveryState, recoveryListener, this::recoverFromStore);break;case PEER:try {markAsRecovering(from recoveryState.getSourceNode(), recoveryState);recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);} catch (Exception e) {failShard(corrupted preexisting index, e);recoveryListener.onRecoveryFailure(recoveryState,new RecoveryFailedException(recoveryState, null, e), true);}break;case SNAPSHOT:final String repo ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();executeRecovery(from snapshot,recoveryState, recoveryListener, l - restoreFromRepository(repositoriesService.repository(repo), l));break;case LOCAL_SHARDS:final IndexMetadata indexMetadata indexSettings().getIndexMetadata();final Index resizeSourceIndex indexMetadata.getResizeSourceIndex();final ListIndexShard startedShards new ArrayList();final IndexService sourceIndexService indicesService.indexService(resizeSourceIndex);final SetShardId requiredShards;final int numShards;if (sourceIndexService ! null) {requiredShards IndexMetadata.selectRecoverFromShards(shardId().id(),sourceIndexService.getMetadata(), indexMetadata.getNumberOfShards());for (IndexShard shard : sourceIndexService) {if (shard.state() IndexShardState.STARTED requiredShards.contains(shard.shardId())) {startedShards.add(shard);}}numShards requiredShards.size();} else {numShards -1;requiredShards Collections.emptySet();}if (numShards startedShards.size()) {assert requiredShards.isEmpty() false;executeRecovery(from local shards, recoveryState, recoveryListener,l - recoverFromLocalShards(mappingUpdateConsumer,startedShards.stream().filter((s) - requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));} else {final RuntimeException e;if (numShards -1) {e new IndexNotFoundException(resizeSourceIndex);} else {e new IllegalStateException(not all required shards of index resizeSourceIndex are started yet, expected numShards found startedShards.size() cant recover shard shardId());}throw e;}break;default:throw new IllegalArgumentException(Unknown recovery source recoveryState.getRecoverySource());}}对于恢复类型为PEER的任务恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中首先调用getStartRecoveryRequest获取shard的metadataSnapshot该结构中包含shard的段信息如syncid、checksum、doc数等然后封装为StartRecoveryRequest通过RPC发送到源节点
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {final String actionName;final TransportRequest requestToSend;final StartRecoveryRequest startRequest;final RecoveryState.Timer timer;try (RecoveryRef recoveryRef onGoingRecoveries.getRecovery(recoveryId)) {if (recoveryRef null) {logger.trace(not running recovery with id [{}] - can not find it (probably finished), recoveryId);return;}final RecoveryTarget recoveryTarget recoveryRef.target();timer recoveryTarget.state().getTimer();if (preExistingRequest null) {try {final IndexShard indexShard recoveryTarget.indexShard();indexShard.preRecovery();assert recoveryTarget.sourceNode() ! null : can not do a recovery without a source node;logger.trace({} preparing shard for peer recovery, recoveryTarget.shardId());indexShard.prepareForIndexRecovery();final long startingSeqNo indexShard.recoverLocallyUpToGlobalCheckpoint();assert startingSeqNo UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() RecoveryState.Stage.TRANSLOG :unexpected recovery stage [ recoveryTarget.state().getStage() ] starting seqno [ startingSeqNo ];// 构造recovery request startRequest getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);requestToSend startRequest;actionName PeerRecoverySourceService.Actions.START_RECOVERY;} catch (final Exception e) {// this will be logged as warning later on...logger.trace(unexpected error while preparing shard for peer recovery, failing recovery, e);onGoingRecoveries.failRecovery(recoveryId,new RecoveryFailedException(recoveryTarget.state(), failed to prepare shard for recovery, e), true);return;}logger.trace({} starting recovery from {}, startRequest.shardId(), startRequest.sourceNode());} else {startRequest preExistingRequest;requestToSend new ReestablishRecoveryRequest(recoveryId, startRequest.shardId(), startRequest.targetAllocationId());actionName PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY;logger.trace({} reestablishing recovery from {}, startRequest.shardId(), startRequest.sourceNode());}}// 向源节点发送请求请求恢复transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend,new RecoveryResponseHandler(startRequest, timer));}/*** Prepare the start recovery request.** param logger the logger* param localNode the local node of the recovery target* param recoveryTarget the target of the recovery* param startingSeqNo a sequence number that an operation-based peer recovery can start with.* This is the first operation after the local checkpoint of the safe commit if exists.* return a start recovery request*/public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,RecoveryTarget recoveryTarget, long startingSeqNo) {final StartRecoveryRequest request;logger.trace({} collecting local files for [{}], recoveryTarget.shardId(), recoveryTarget.sourceNode());Store.MetadataSnapshot metadataSnapshot;try {metadataSnapshot recoveryTarget.indexShard().snapshotStoreMetadata();// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.try {final String expectedTranslogUUID metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);final long globalCheckpoint Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);assert globalCheckpoint 1 startingSeqNo : invalid startingSeqNo startingSeqNo globalCheckpoint;} catch (IOException | TranslogCorruptedException e) {logger.warn(new ParameterizedMessage(error while reading global checkpoint from translog, resetting the starting sequence number from {} to unassigned and recovering as if there are none, startingSeqNo), e);metadataSnapshot Store.MetadataSnapshot.EMPTY;startingSeqNo UNASSIGNED_SEQ_NO;}} catch (final org.apache.lucene.index.IndexNotFoundException e) {// happens on an empty folder. no need to logassert startingSeqNo UNASSIGNED_SEQ_NO : startingSeqNo;logger.trace({} shard folder empty, recovering all files, recoveryTarget);metadataSnapshot Store.MetadataSnapshot.EMPTY;} catch (final IOException e) {if (startingSeqNo ! UNASSIGNED_SEQ_NO) {logger.warn(new ParameterizedMessage(error while listing local files, resetting the starting sequence number from {} to unassigned and recovering as if there are none, startingSeqNo), e);startingSeqNo UNASSIGNED_SEQ_NO;} else {logger.warn(error while listing local files, recovering as if there are none, e);}metadataSnapshot Store.MetadataSnapshot.EMPTY;}logger.trace({} local file count [{}], recoveryTarget.shardId(), metadataSnapshot.size());request new StartRecoveryRequest(recoveryTarget.shardId(),recoveryTarget.indexShard().routingEntry().allocationId().getId(),recoveryTarget.sourceNode(),localNode,metadataSnapshot,recoveryTarget.state().getPrimary(),recoveryTarget.recoveryId(),startingSeqNo);return request;}注意请求的发送是异步的。
源节点处理恢复请求
源节点接收到请求后会调用恢复的入口函数PeerRecoverySourceService.messageReceived#recover:
class StartRecoveryTransportRequestHandler implements TransportRequestHandlerStartRecoveryRequest {Overridepublic void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {recover(request, new ChannelActionListener(channel, Actions.START_RECOVERY, request));}}recover方法根据request得到shard并构造RecoverySourceHandler对象然后调用handler.recoverToTarget进入恢复的执行体
private void recover(StartRecoveryRequest request, ActionListenerRecoveryResponse listener) {final IndexService indexService indicesService.indexServiceSafe(request.shardId().getIndex());final IndexShard shard indexService.getShard(request.shardId().id());final ShardRouting routingEntry shard.routingEntry();if (routingEntry.primary() false || routingEntry.active() false) {throw new DelayRecoveryException(source shard [ routingEntry ] is not an active primary);}if (request.isPrimaryRelocation() (routingEntry.relocating() false ||routingEntry.relocatingNodeId().equals(request.targetNode().getId()) false)) {logger.debug(delaying recovery of {} as source shard is not marked yet as relocating to {},request.shardId(), request.targetNode());throw new DelayRecoveryException(source shard is not marked yet as relocating to [ request.targetNode() ]);}RecoverySourceHandler handler ongoingRecoveries.addNewRecovery(request, shard);logger.trace([{}][{}] starting recovery to {}, request.shardId().getIndex().getName(), request.shardId().id(),request.targetNode());handler.recoverToTarget(ActionListener.runAfter(listener, () - ongoingRecoveries.remove(shard, handler)));}/*** performs the recovery from the local engine to the target*/public void recoverToTarget(ActionListenerRecoveryResponse listener) {addListener(listener);final Closeable releaseResources () - IOUtils.close(resources);try {cancellableThreads.setOnCancel((reason, beforeCancelEx) - {final RuntimeException e;if (shard.state() IndexShardState.CLOSED) { // check if the shard got closed on use new IndexShardClosedException(shard.shardId(), shard is closed and recovery was canceled reason [ reason ]);} else {e new CancellableThreads.ExecutionCancelledException(recovery was canceled reason [ reason ]);}if (beforeCancelEx ! null) {e.addSuppressed(beforeCancelEx);}IOUtils.closeWhileHandlingException(releaseResources, () - future.onFailure(e));throw e;});final ConsumerException onFailure e - {assert Transports.assertNotTransportThread(RecoverySourceHandler.this [onFailure]);IOUtils.closeWhileHandlingException(releaseResources, () - future.onFailure(e));};final SetOnceRetentionLease retentionLeaseRef new SetOnce();runUnderPrimaryPermit(() - {final IndexShardRoutingTable routingTable shard.getReplicationGroup().getRoutingTable();ShardRouting targetShardRouting routingTable.getByAllocationId(request.targetAllocationId());if (targetShardRouting null) {logger.debug(delaying recovery of {} as it is not listed as assigned to target node {}, request.shardId(),request.targetNode());throw new DelayRecoveryException(source node does not have the shard listed in its state as allocated on the node);}assert targetShardRouting.initializing() : expected recovery target to be initializing but was targetShardRouting;retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));}, shardId validating recovery target [ request.targetAllocationId() ] registered ,shard, cancellableThreads, logger);// 获取一个保留锁使得translog不被清理final Closeable retentionLock shard.acquireHistoryRetentionLock();resources.add(retentionLock);final long startingSeqNo;// 判断是否可以从SequenceNumber恢复// 除了异常检测和版本号检测主要在shard.hasCompleteHistoryOperations()方法中判断请求的序列号是否小于主分片节点的localCheckpoint// 以及translog中的数据是否足以恢复(有可能因为translog数据太大或者过期删除而无法恢复)final boolean isSequenceNumberBasedRecovery request.startingSeqNo() ! SequenceNumbers.UNASSIGNED_SEQ_NO isTargetSameHistory() shard.hasCompleteHistoryOperations(peer-recovery, request.startingSeqNo()) ((retentionLeaseRef.get() null shard.useRetentionLeasesInPeerRecovery() false) ||(retentionLeaseRef.get() ! null retentionLeaseRef.get().retainingSequenceNumber() request.startingSeqNo()));// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. Its// possible there are other cases where we cannot satisfy all leases, because thats not a property we currently expect to hold.// Also its pretty cheap when soft deletes are enabled, and itd be a disaster if we tried a sequence-number-based recovery// without having a complete history.if (isSequenceNumberBasedRecovery retentionLeaseRef.get() ! null) {// all the history we need is retained by an existing retention lease, so we do not need a separate retention lockretentionLock.close();logger.trace(history is retained by {}, retentionLeaseRef.get());} else {// all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations()// and before acquiring the safe commit well be using, so we can be certain that all operations after the safe commits// local checkpoint will be retained for the duration of this recovery.logger.trace(history is retained by retention lock);}final StepListenerSendFileResult sendFileStep new StepListener();final StepListenerTimeValue prepareEngineStep new StepListener();final StepListenerSendSnapshotResult sendSnapshotStep new StepListener();final StepListenerVoid finalizeStep new StepListener();// 若可以基于序列号进行恢复则获取开始的序列号if (isSequenceNumberBasedRecovery) {// 如果基于SequenceNumber恢复则startingSeqNo取值为恢复请求中的序列号// 从请求的序列号开始快照translog。否则取值为0快照完整的translog。logger.trace(performing sequence numbers based recovery. starting at [{}], request.startingSeqNo());// 获取开始序列号startingSeqNo request.startingSeqNo();if (retentionLeaseRef.get() null) {createRetentionLease(startingSeqNo, sendFileStep.map(ignored - SendFileResult.EMPTY));} else {// 发送的文件设置为空sendFileStep.onResponse(SendFileResult.EMPTY);}} else {final Engine.IndexCommitRef safeCommitRef;try {// Releasing a safe commit can access some commit files.safeCommitRef acquireSafeCommit(shard);resources.add(safeCommitRef);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, snapshot failed, e);}// Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being// able to recover other replicas using operations-based recoveries. If we are not using retention leases then we// conservatively copy all available operations. If we are using retention leases then enough operations is just the// operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains// at least as much history as anything else. The safe commit will often contain all the history retained by the current set// of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a// retention lease for some history that this primary already discarded, since we discard history when the global checkpoint// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled// down.startingSeqNo Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) 1L;logger.trace(performing file-based recovery followed by history replay starting at [{}], startingSeqNo);try {final int estimateNumOps estimateNumberOfHistoryOperations(startingSeqNo);final Releasable releaseStore acquireStore(shard.store());resources.add(releaseStore);sendFileStep.whenComplete(r - IOUtils.close(safeCommitRef, releaseStore), e - {try {IOUtils.close(safeCommitRef, releaseStore);} catch (final IOException ex) {logger.warn(releasing snapshot caused exception, ex);}});final StepListenerReplicationResponse deleteRetentionLeaseStep new StepListener();runUnderPrimaryPermit(() - {try {// If the target previously had a copy of this shard then a file-based recovery might move its global// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a// new one later on in the recovery.shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),new ThreadedActionListener(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,deleteRetentionLeaseStep, false));} catch (RetentionLeaseNotFoundException e) {logger.debug(no peer-recovery retention lease for request.targetAllocationId());deleteRetentionLeaseStep.onResponse(null);}}, shardId removing retention lease for [ request.targetAllocationId() ],shard, cancellableThreads, logger);// 第一阶段deleteRetentionLeaseStep.whenComplete(ignored - {assert Transports.assertNotTransportThread(RecoverySourceHandler.this [phase1]);phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () - estimateNumOps, sendFileStep);}, onFailure);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, sendFileStep failed, e);}}assert startingSeqNo 0 : startingSeqNo must be non negative. got: startingSeqNo;sendFileStep.whenComplete(r - {assert Transports.assertNotTransportThread(RecoverySourceHandler.this [prepareTargetForTranslog]);// 等待phase1执行完毕主分片节点通知副分片节点启动此分片的Engine:prepareTargetForTranslog // 该方法会阻塞处理直到分片 Engine 启动完毕。// 待副分片启动Engine 完毕就可以正常接收写请求了。// 注意此时phase2尚未开始此分片的恢复流程尚未结束。// 等待当前操作处理完成后以startingSeqNo为起始点对translog做快照开始执行phase2// For a sequence based recovery, the target can keep its local translogprepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);}, onFailure);prepareEngineStep.whenComplete(prepareEngineTime - {assert Transports.assertNotTransportThread(RecoverySourceHandler.this [phase2]);/** add shard to replication group (shard will receive replication requests from this point on) now that engine is open.* This means that any document indexed into the primary after this will be replicated to this replica as well* make sure to do this before sampling the max sequence number in the next step, to ensure that we send* all documents up to maxSeqNo in phase2.*/runUnderPrimaryPermit(() - shard.initiateTracking(request.targetAllocationId()),shardId initiating tracking of request.targetAllocationId(), shard, cancellableThreads, logger);final long endingSeqNo shard.seqNoStats().getMaxSeqNo();logger.trace(snapshot for recovery; current size is [{}], estimateNumberOfHistoryOperations(startingSeqNo));final Translog.Snapshot phase2Snapshot shard.newChangesSnapshot(peer-recovery, startingSeqNo, Long.MAX_VALUE, false);resources.add(phase2Snapshot);retentionLock.close();// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values// are at least as high as the corresponding values on the primary when any of these operations were executed on it.final long maxSeenAutoIdTimestamp shard.getMaxSeenAutoIdTimestamp();final long maxSeqNoOfUpdatesOrDeletes shard.getMaxSeqNoOfUpdatesOrDeletes();final RetentionLeases retentionLeases shard.getRetentionLeases();final long mappingVersionOnPrimary shard.indexSettings().getIndexMetadata().getMappingVersion();// 第二阶段发送translogphase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);}, onFailure);// Recovery target can trim all operations startingSeqNo as we have sent all these operations in the phase 2final long trimAboveSeqNo startingSeqNo - 1;sendSnapshotStep.whenComplete(r - finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);finalizeStep.whenComplete(r - {final long phase1ThrottlingWaitTime 0L; // TODO: return the actual throttle timefinal SendSnapshotResult sendSnapshotResult sendSnapshotStep.result();final SendFileResult sendFileResult sendFileStep.result();final RecoveryResponse response new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis());try {future.onResponse(response);} finally {IOUtils.close(resources);}}, onFailure);} catch (Exception e) {IOUtils.closeWhileHandlingException(releaseResources, () - future.onFailure(e));}}/*** Checks if we have a completed history of operations since the given starting seqno (inclusive).* This method should be called after acquiring the retention lock; See {link #acquireHistoryRetentionLock()}*/public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) {return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);}
从上面的代码可以看出恢复主要分两个阶段第一阶段恢复segment文件第二阶段发送translog。这里有个关键的地方在恢复前首先需要获取translogView及segment snapshottranslogView的作用是保证当前时间点到恢复结束时间段的translog不被删除segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:
/*** Perform phase1 of the recovery operations. Once this {link IndexCommit}* snapshot has been performed no commit operations (files being fsyncd)* are effectively allowed on this index until all recovery phases are done* p* Phase1 examines the segment files on the target node and copies over the* segments that are missing. Only segments that have the same size and* checksum can be reused*/void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListenerSendFileResult listener) {cancellableThreads.checkForCancel();//拿到shard的存储信息final Store store shard.store();try {StopWatch stopWatch new StopWatch().start();final Store.MetadataSnapshot recoverySourceMetadata;try {// 拿到snapshot的metadatarecoverySourceMetadata store.getMetadata(snapshot);} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {shard.failShard(recovery, ex);throw ex;}for (String name : snapshot.getFileNames()) {final StoreFileMetadata md recoverySourceMetadata.get(name);if (md null) {logger.info(Snapshot differs from actual index for file: {} meta: {}, name, recoverySourceMetadata.asMap());throw new CorruptIndexException(Snapshot differs from actual index - maybe index was removed metadata has recoverySourceMetadata.asMap().size() files, name);}}// 如果syncid相等再继续比较下文档数如果都相同则不用恢复if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) false) {final ListString phase1FileNames new ArrayList();final ListLong phase1FileSizes new ArrayList();final ListString phase1ExistingFileNames new ArrayList();final ListLong phase1ExistingFileSizes new ArrayList();// Total size of segment files that are recoveredlong totalSizeInBytes 0;// Total size of segment files that were able to be re-usedlong existingTotalSizeInBytes 0;// Generate a diff of all the identical, different, and missing// segment files on the target node, using the existing files on// the source node// 找出target和source有差别的segment// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/store/Store.java#L971final Store.RecoveryDiff diff recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());for (StoreFileMetadata md : diff.identical) {phase1ExistingFileNames.add(md.name());phase1ExistingFileSizes.add(md.length());existingTotalSizeInBytes md.length();if (logger.isTraceEnabled()) {logger.trace(recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}], size [{}], md.name(), md.checksum(), md.length());}totalSizeInBytes md.length();}ListStoreFileMetadata phase1Files new ArrayList(diff.different.size() diff.missing.size());phase1Files.addAll(diff.different);phase1Files.addAll(diff.missing);for (StoreFileMetadata md : phase1Files) {if (request.metadataSnapshot().asMap().containsKey(md.name())) {logger.trace(recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}],md.name(), request.metadataSnapshot().asMap().get(md.name()), md);} else {logger.trace(recovery [phase1]: recovering [{}], does not exist in remote, md.name());}phase1FileNames.add(md.name());phase1FileSizes.add(md.length());totalSizeInBytes md.length();}logger.trace(recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}],phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));final StepListenerVoid sendFileInfoStep new StepListener();final StepListenerVoid sendFilesStep new StepListener();final StepListenerRetentionLease createRetentionLeaseStep new StepListener();final StepListenerVoid cleanFilesStep new StepListener();cancellableThreads.checkForCancel();recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);// 将需要恢复的文件发送到target nodesendFileInfoStep.whenComplete(r -sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), listener::onFailure);sendFilesStep.whenComplete(r - createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);createRetentionLeaseStep.whenComplete(retentionLease -{final long lastKnownGlobalCheckpoint shard.getLastKnownGlobalCheckpoint();assert retentionLease null || retentionLease.retainingSequenceNumber() - 1 lastKnownGlobalCheckpoint: retentionLease vs lastKnownGlobalCheckpoint;// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on// the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);},listener::onFailure);final long totalSize totalSizeInBytes;final long existingTotalSize existingTotalSizeInBytes;cleanFilesStep.whenComplete(r - {final TimeValue took stopWatch.totalTime();logger.trace(recovery [phase1]: took [{}], took);listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,phase1ExistingFileSizes, existingTotalSize, took));}, listener::onFailure);} else {logger.trace(skipping [phase1] since source and target have identical sync id [{}], recoverySourceMetadata.getSyncId());// but we must still create a retention leasefinal StepListenerRetentionLease createRetentionLeaseStep new StepListener();createRetentionLease(startingSeqNo, createRetentionLeaseStep);createRetentionLeaseStep.whenComplete(retentionLease - {final TimeValue took stopWatch.totalTime();logger.trace(recovery [phase1]: took [{}], took);listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),Collections.emptyList(), 0L, took));}, listener::onFailure);}} catch (Exception e) {throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e);}}从上面代码可以看出phase1的具体逻辑是首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId根据request拿到recoveryTargetSyncId比较两边的syncid如果相同再比较源和目标的文档数如果也相同说明在当前提交点之前源和目标的shard对应的segments都相同因此不用恢复segment文件(canSkipPhase1方法中比对的)。如果两边的syncid不同说明segment文件有差异则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性可以找出所有有差别的segment文件。这块逻辑如下
/*** Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:* ul* liidentical: they exist in both snapshots and they can be considered the same ie. they dont need to be recovered/li* lidifferent: they exist in both snapshots but their they are not identical/li* limissing: files that exist in the source but not in the target/li* /ul* This method groups file into per-segment files and per-commit files. A file is treated as* identical if and on if all files in its group are identical. On a per-segment level files for a segment are treated* as identical iff:* ul* liall files in this segment have the same checksum/li* liall files in this segment have the same length/li* lithe segments {code .si} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {code .si} file content as its hash/li* /ul* p* The {code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be* unique segment identifiers in there hardening this method further.* p* The per-commit files handles very similar. A commit is composed of the {code segments_N} files as well as generational files* like deletes ({code _x_y.del}) or field-info ({code _x_y.fnm}) files. On a per-commit level files for a commit are treated* as identical iff:* ul* liall files belonging to this commit have the same checksum/li* liall files belonging to this commit have the same length/li* lithe segments file {code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {code segments_N} file content as its hash/li* /ul* p* NOTE: this diff will not contain the {code segments.gen} file. This file is omitted on recovery.*/public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {final ListStoreFileMetadata identical new ArrayList();// 相同的file final ListStoreFileMetadata different new ArrayList();// 不同的filefinal ListStoreFileMetadata missing new ArrayList();// 缺失的filefinal MapString, ListStoreFileMetadata perSegment new HashMap();final ListStoreFileMetadata perCommitStoreFiles new ArrayList();for (StoreFileMetadata meta : this) {if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacycontinue; // we dont need that file at all}final String segmentId IndexFileNames.parseSegmentName(meta.name());final String extension IndexFileNames.getExtension(meta.name());if (IndexFileNames.SEGMENTS.equals(segmentId) ||DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {// only treat del files as per-commit files fnm files are generational but only for upgradable DVperCommitStoreFiles.add(meta);} else {perSegment.computeIfAbsent(segmentId, k - new ArrayList()).add(meta);}}final ArrayListStoreFileMetadata identicalFiles new ArrayList();for (ListStoreFileMetadata segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {identicalFiles.clear();boolean consistent true;for (StoreFileMetadata meta : segmentFiles) {StoreFileMetadata storeFileMetadata recoveryTargetSnapshot.get(meta.name());if (storeFileMetadata null) {consistent false;missing.add(meta);// 该segment在target node中不存在则加入到missing} else if (storeFileMetadata.isSame(meta) false) {consistent false;different.add(meta);// 存在但不相同则加入到different} else {identicalFiles.add(meta);// 存在且相同}}if (consistent) {identical.addAll(identicalFiles);} else {// make sure all files are added - this can happen if only the deletes are differentdifferent.addAll(identicalFiles);}}RecoveryDiff recoveryDiff new RecoveryDiff(Collections.unmodifiableList(identical),Collections.unmodifiableList(different), Collections.unmodifiableList(missing));assert recoveryDiff.size() this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0): some files are missing recoveryDiff size: [ recoveryDiff.size() ] metadata size: [ this.metadata.size() ] contains segments.gen: [ metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ];return recoveryDiff;}这里将所有的segment file分为三类identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后源节点还会向目标节点发送消息以通知目标节点清理临时文件然后也会发送消息通知目标节点打开引擎准备接收translog。
第二阶段的逻辑比较简单只需将translog view到当前时间之间的所有translog发送给源节点即可。 第二阶段使用当前translog的快照而不获取写锁(但是translog快照是translog的时间点视图)。然后它将每个translog操作发送到目标节点以便将其重播到新的shard中。 /*** Perform phase two of the recovery process.* p* Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new* shard.** param startingSeqNo the sequence number to start recovery from, or {link SequenceNumbers#UNASSIGNED_SEQ_NO} if all* ops should be sent* param endingSeqNo the highest sequence number that should be sent* param snapshot a snapshot of the translog* param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary* param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.* param listener a listener which will be notified with the local checkpoint on the target.*/void phase2(final long startingSeqNo,final long endingSeqNo,final Translog.Snapshot snapshot,final long maxSeenAutoIdTimestamp,final long maxSeqNoOfUpdatesOrDeletes,final RetentionLeases retentionLeases,final long mappingVersion,final ActionListenerSendSnapshotResult listener) throws IOException {if (shard.state() IndexShardState.CLOSED) {throw new IndexShardClosedException(request.shardId());}logger.trace(recovery [phase2]: sending transaction log operations (from [ startingSeqNo ] to [ endingSeqNo ]);final StopWatch stopWatch new StopWatch().start();final StepListenerVoid sendListener new StepListener();final OperationBatchSender sender new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp,maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener);sendListener.whenComplete(ignored - {final long skippedOps sender.skippedOps.get();final int totalSentOps sender.sentOps.get();final long targetLocalCheckpoint sender.targetLocalCheckpoint.get();assert snapshot.totalOperations() snapshot.skippedOperations() skippedOps totalSentOps: String.format(Locale.ROOT, expected total [%d], overridden [%d], skipped [%d], total sent [%d],snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps);stopWatch.stop();final TimeValue tookTime stopWatch.totalTime();logger.trace(recovery [phase2]: took [{}], tookTime);listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));}, listener::onFailure);sender.start();}目标节点开始恢复
接收segment
对应上一小节源节点恢复的第一阶段源节点将所有有差异的segment发送给目标节点目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk: 真正执行的位置MultiFileWriter.innerWriteFileChunk public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {final Store store store();final String name fileMetaData.name();... ...if (position 0) {indexOutput openAndPutIndexOutput(name, fileMetaData, store);} else {indexOutput getOpenIndexOutput(name); // 加一层前缀组成临时文件}... ...while((scratch iterator.next()) ! null) { indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件}... ...store.directory().sync(Collections.singleton(temporaryFileName)); // 这里会调用fsync落盘
}打开引擎
经过上面的过程目标节点完成了追数据的第一步。接收完segment后目标节点打开shard对应的引擎准备接收translog注意这里打开引擎后正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下 /*** Opens the engine on top of the existing lucene engine and translog.* The translog is kept but its operations wont be replayed.*/public void openEngineAndSkipTranslogRecovery() throws IOException {assert routingEntry().recoverySource().getType() RecoverySource.Type.PEER : not a peer recovery [ routingEntry() ];recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);loadGlobalCheckpointToReplicationTracker();innerOpenEngineAndTranslog(replicationTracker);getEngine().skipTranslogRecovery();}private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {assert Thread.holdsLock(mutex) false : opening engine under mutex;if (state ! IndexShardState.RECOVERING) {throw new IndexShardNotRecoveringException(shardId, state);}final EngineConfig config newEngineConfig(globalCheckpointSupplier);// we disable deletes since we allow for operations to be executed against the shard while recovering// but we need to make sure we dont loose deletes until we are done recoveringconfig.setEnableGcDeletes(false);// 恢复过程中不删除translogupdateRetentionLeasesOnReplica(loadRetentionLeases());assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() false || getRetentionLeases().leases().isEmpty(): expected empty set of retention leases with recovery source [ recoveryState.getRecoverySource() ] but got getRetentionLeases();synchronized (engineMutex) {assert currentEngineReference.get() null : engine is running;verifyNotClosed();// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).final Engine newEngine engineFactory.newReadWriteEngine(config);// 创建engineonNewEngine(newEngine);currentEngineReference.set(newEngine);// We set active because we are now writing operations to the engine; this way,// we can flush if we go idle after some time and become inactive.active.set(true);}// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.onSettingsChanged();assert assertSequenceNumbersInCommit();recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);}接收并重放translog
打开引擎后便可以根据translog中的命令进行相应的回放动作回放的逻辑和正常的写入、删除类似这里需要根据translog还原出操作类型和操作数据并根据操作数据构建相应的数据对象然后再调用上一步打开的engine执行相应的操作这块逻辑如下 IndexShard#runTranslogRecovery IndexShard#applyTranslogOperation // 重放translog快照中的translog操作到当前引擎。
// 在成功回放每个translog操作后会通知回调onOperationRecovered。/*** Replays translog operations from the provided translog {code snapshot} to the current engine using the given {code origin}.* The callback {code onOperationRecovered} is notified after each translog operation is replayed successfully.*/int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,Runnable onOperationRecovered) throws IOException {int opsRecovered 0;Translog.Operation operation;while ((operation snapshot.next()) ! null) {try {logger.trace([translog] recover op {}, operation);Engine.Result result applyTranslogOperation(engine, operation, origin);switch (result.getResultType()) {case FAILURE:throw result.getFailure();case MAPPING_UPDATE_REQUIRED:throw new IllegalArgumentException(unexpected mapping update: result.getRequiredMappingUpdate());case SUCCESS:break;default:throw new AssertionError(Unknown result type [ result.getResultType() ]);}opsRecovered;onOperationRecovered.run();} catch (Exception e) {// TODO: Dont enable this leniency unless users explicitly opt-inif (origin Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY ExceptionsHelper.status(e) RestStatus.BAD_REQUEST) {// mainly for MapperParsingException and Failure to detect xcontentlogger.info(ignoring recovery of a corrupt translog entry, e);} else {throw ExceptionsHelper.convertToRuntime(e);}}}return opsRecovered;}private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation,Engine.Operation.Origin origin) throws IOException {// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.final VersionType versionType (origin Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;final Engine.Result result;switch (operation.opType()) {// 还原出操作类型及操作数据并调用engine执行相应的动作case INDEX:final Translog.Index index (Translog.Index) operation;// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all// autoGeneratedID docs that are coming from the primary are updated correctly.result applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin,new SourceToParse(shardId.getIndexName(), index.id(), index.source(),XContentHelper.xContentType(index.source()), index.routing()));break;case DELETE:final Translog.Delete delete (Translog.Delete) operation;result applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.id(),versionType, UNASSIGNED_SEQ_NO, 0, origin);break;case NO_OP:final Translog.NoOp noOp (Translog.NoOp) operation;result markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);break;default:throw new IllegalStateException(No operation defined for [ operation ]);}return result;}通过上面的步骤translog的重放完毕此后需要做一些收尾的工作包括refresh让回放后的最新数据可见打开translog gc /*** perform the last stages of recovery once all translog operations are done.* note that you should still call {link #postRecovery(String)}.*/public void finalizeRecovery() {recoveryState().setStage(RecoveryState.Stage.FINALIZE);Engine engine getEngine();engine.refresh(recovery_finalization);engine.config().setEnableGcDeletes(true);}到这里replica shard恢复的两个阶段便完成了由于此时shard还处于INITIALIZING状态还需通知master节点启动已恢复的shard IndicesClusterStateService#RecoveryListener Overridepublic void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) {shardStateAction.shardStarted(shardRouting,primaryTerm,after state.getRecoverySource(),timestampMillisFieldRange,SHARD_STATE_ACTION_LISTENER);}至此shard recovery的所有流程都已完成。
小结
疑问点1网上说的对不对新版本中有没有更新
到这里完整跟着腾讯云的文档走了一遍主体的流程在Elasticsearch 8.0.0-SNAPSHOT中基本一样只是部分方法有些调整。
所以下面这个流程还是正确的
ES副本分片恢复主要涉及恢复的目标节点和源节点目标节点即故障恢复的节点源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求源节点接收到请求后主要分两阶段来处理。第一阶段对需要恢复的shard创建snapshot然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过否则对比shard的segment文件差异将有差异的segment文件发送给target node。第二阶段为了保证target node数据的完整性需要将本地的translog发送给target node且对接收到的translog进行回放。整体流程如下图所示
疑问点2在分片恢复的时候如果收到Api _forcemerge请求这时候会如何处理?
这部分等看/_forcemerge api的时候再解答一下。
疑问点3片恢复的第二阶段是同步translog,这一步会不会加锁不加锁的话如何确保是同步完成了
完整性
首先phase1阶段保证了存量的历史数据可以恢复到从分片。phase1阶段完成后从分片引擎打开可以正常处理index、delete请求而translog覆盖完了整个phase1阶段因此在phase1阶段中的index/delete操作都将被记录下来在phase2阶段进行translog回放时副本分片正常的index和delete操作和translog是并行执行的这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片保证了数据的完整性。如下图所示 一致性
由于phase1阶段完成后从分片便可正常处理写入操作而此时从分片的写入和phase2阶段的translog回放时并行执行的如果translog的回放慢于正常的写入操作那么可能会导致老的数据后写入造成数据不一致。ES为了保证数据的一致性在进行写入操作时会比较当前写入的版本和lucene文档版本号如果当前版本更小说明是旧数据则不会将文档写入lucene。相关代码如下
// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L993
final OpVsLuceneDocStatus opVsLucene compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {plan IndexingStrategy.processAsStaleOp(index.version(), 0);
}拓展:如何保证副分片和主分片一致 整理自:《Elasticsearch源码解析与优化实战》 索引恢复过程的一个难点在于如何维护主副分片的一致性。假设副分片恢复期间一直有写操作如何实现一致呢 我们先看看早期的做法在2.0版本之前副分片恢复要经历三个阶段。
phase1将主分片的 Lucene 做快照发送到 target。期间不阻塞索引操作新增数据写到主分片的translog。phase2将主分片translog做快照发送到target重放期间不阻塞索引操作。phase3为主分片加写锁将剩余的translog发送到target。此时数据量很小写入过程的阻塞很短。
从理论上来说只要流程上允许将写操作阻塞一段时间实现主副一致是比较容易的。
但是后来(从2.0版本开始)也就是引入translog.view概念的同时phase3被删除。
phase3被删除这个阶段是重放操作(operations)同时防止新的写入 Engine。这是不必要的因为自恢复开始标准的 index 操作会发送所有的操作到正在恢复中的分片。重放恢复开始时获取的view中的所有操作足够保证不丢失任何操作。
阻塞写操作的phase3被删除恢复期间没有任何写阻塞过程。接下来需要处理的就是解决phase1和phase2之间的写操作与phase2重放操作之间的时序和冲突问题。在副分片节点phase1结束后假如新增索引操作和 translog 重放操作并发执行因为时序的关系会出现新老数据交替。如何实现主副分片一致呢
假设在第一阶段执行期间有客户端索引操作要求将docA的内容写为1主分片执行了这个操作而副分片由于尚未就绪所以没有执行。第二阶段期间客户端索引操作要求写 docA 的内容为2此时副分片已经就绪先执行将docA写为2的新增请求然后又收到了从主分片所在节点发送过来的translog重复写docA为1的请求该如何处理具体流程如下图所示。 答案是在写流程中做异常处理通过版本号来过滤掉过期操作。写操作有三种类型索引新文档、更新、删除。索引新文档不存在冲突问题更新和删除操作采用相同的处理机制。每个操作都有一个版本号这个版本号就是预期doc版本它必须大于当前Lucene中的doc版本号否则就放弃本次操作。对于更新操作来说预期版本号是Lucene doc版本号1。主分片节点写成功后新数据的版本号会放到写副本的请求中这个请求中的版本号就是预期版本号。
这样时序上存在错误的操作被忽略对于特定doc只有最新一次操作生效保证了主副分片一致。
我们分别看一下写操作三种类型的处理机制。
1索引新文档
不存在冲突问题不需要处理。
2更新
判断本次操作的版本号是否小于Lucene中doc的版本号如果小于则放弃本次操作。
Index、Delete都继承自Operation每个Operation都有一个版本号这个版本号就是doc版本号。对于副分片的写流程来说正常情况下是主分片写成功后相应doc写入的版本号被放到转发写副分片的请求中。对于更新来说就是通过主分片将原doc版本号1后转发到副分片实现的。在对比版本号的时候
expectedVersion 写副分片请求中的 version 写主分片成功后的version
通过下面的方法判断当前操作的版本号是否低于Lucene中的版本号
// VersionType#isVersionConflictForWritesEXTERNAL((byte) 1) {Overridepublic boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {if (currentVersion Versions.NOT_FOUND) {return false;}if (expectedVersion Versions.MATCH_ANY) {return true;}if (currentVersion expectedVersion) {return true;}return false;}}如果translog重放的操作在写一条老数据则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {assert op.seqNo() ! SequenceNumbers.UNASSIGNED_SEQ_NO : resolving ops based on seq# but no seqNo is found;final OpVsLuceneDocStatus status;VersionValue versionValue getVersionFromMap(op.uid().bytes());assert incrementVersionLookup();if (versionValue ! null) {status compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);} else {// load from indexassert incrementIndexVersionLookup();try (Searcher searcher acquireSearcher(load_seq_no, SearcherScope.INTERNAL)) {final DocIdAndSeqNo docAndSeqNo VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid());if (docAndSeqNo null) {status OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;} else if (op.seqNo() docAndSeqNo.seqNo) {status OpVsLuceneDocStatus.OP_NEWER;} else if (op.seqNo() docAndSeqNo.seqNo) {assert localCheckpointTracker.hasProcessed(op.seqNo()) :local checkpoint tracker is not updated seq_no op.seqNo() id op.id();status OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;} else {status OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;}}}return status;}副分片在InternalEngine#index函数中通过plan判断是否写到Lucene
// non-primary mode (i.e., replica or recovery)
final IndexingStrategy plan indexingStrategyForOperation(index);在 indexingStrategyForOperation函数中plan的最终结果就是plan IndexingStrategy.processButSkipLucene后面会跳过写Lucene和translog的逻辑。
3 删除
判断本次操作中的版本号是否小于Lucene中doc的版本号如果小于则放弃本次操作。
通过compareOpToLuceneDocBasedOnSeqNo方法判断本次操作是否小于Lucenne中doc的版本号与Index操作时使用相同的比较函数。
类似的在InternalEngine#delete函数中判断是否写到Lucene
final DeletionStrategy plan deletionStrategyForOperation(delete);如果translog重放的是一个老的删除操作则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。
plan的最终结果就是planDeletionStrategy.processButSkipLucene后面会跳过Lucene删除的逻辑。 /*** the status of the current doc version in lucene, compared to the version in an incoming* operation*/enum OpVsLuceneDocStatus {/** the op is more recent than the one that last modified the doc found in lucene*/OP_NEWER,/** the op is older or the same as the one that last modified the doc found in lucene*/OP_STALE_OR_EQUAL,/** no doc was found in lucene */LUCENE_DOC_NOT_FOUND}