园林绿化网站建设,怎么网上宣传自己的产品,phpcms v9 网站建设入门,怎么做网站的软文推广Kakfa集群有主题#xff0c;每一个主题下又有很多分区#xff0c;为了保证防止丢失数据#xff0c;在分区下分Leader副本和Follower副本#xff0c;而kafka的某个分区的Leader和Follower数据如何同步呢#xff1f;下面就是讲解的这个 首先要知道#xff0c;Follower的数据… Kakfa集群有主题每一个主题下又有很多分区为了保证防止丢失数据在分区下分Leader副本和Follower副本而kafka的某个分区的Leader和Follower数据如何同步呢下面就是讲解的这个 首先要知道Follower的数据是通过Fetch线程异步从Leader拉取的数据不懂的可以看一下Kafka——副本Replica机制 一、Broker接收处理分区的Leader和Follower的API二、针对分区副本的Leader和Follower的处理逻辑1、如果是Follower则准备创建Fetcher线程好异步执行向Leader拉取数据2、遍历分区Follower副本判断是否有向目标broker现成的Fetcher如果是则复用否则创建3、创建Fetcher线程的实现 一、Broker接收处理分区的Leader和Follower的API
在kafkaApis.scala中 //处理Leader和followerISR的请求case ApiKeys.LEADER_AND_ISR handleLeaderAndIsrRequest(request)def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit {val zkSupport metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))//从请求头中获取关联ID//从请求体中获取LeaderAndIsrRequest对象。val correlationId request.header.correlationIdval leaderAndIsrRequest request.body[LeaderAndIsrRequest]//对请求进行集群操作的授权验证。authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)//检查broker的代数是否过时。if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {//省略代码} else {//调用replicaManager.becomeLeaderOrFollower方法处理请求获取响应val response replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))requestHelper.sendResponseExemptThrottle(request, response)}}经过一些检验后调用becomeLeaderOrFollower获得响应结果
二、针对分区副本的Leader和Follower的处理逻辑
def becomeLeaderOrFollower(correlationId: Int,leaderAndIsrRequest: LeaderAndIsrRequest,onLeadershipChange: (Iterable[Partition], Iterable[Partition]) Unit): LeaderAndIsrResponse {//首先记录了方法开始的时间戳val startMs time.milliseconds()//副本状态改变锁replicaStateChangeLock synchronized {//从leaderAndIsrRequest中获取一些请求信息包括controller的ID、分区状态等val controllerId leaderAndIsrRequest.controllerIdval requestPartitionStates leaderAndIsrRequest.partitionStates.asScalaval response {//处理过程中会检查请求的controller epoch是否过时if (leaderAndIsrRequest.controllerEpoch controllerEpoch) {//省略} else {val responseMap new mutable.HashMap[TopicPartition, Errors]controllerEpoch leaderAndIsrRequest.controllerEpochval partitions new mutable.HashSet[Partition]()//要成功Leader分区的集合val partitionsToBeLeader new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()//要成为Follower分区的集合val partitionsToBeFollower new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()val topicIdUpdateFollowerPartitions new mutable.HashSet[Partition]()//遍历requestPartitionStates其中包含了来自控制器controller的分区状态请求requestPartitionStates.foreach { partitionState val topicPartition new TopicPartition(partitionState.topicName, partitionState.partitionIndex)//对于每个分区状态请求首先检查分区是否存在如果不存在则创建一个新的分区。val partitionOpt getPartition(topicPartition) match {case HostedPartition.Offline stateChangeLogger.warn(sIgnoring LeaderAndIsr request from scontroller $controllerId with correlation id $correlationId sepoch $controllerEpoch for partition $topicPartition as the local replica for the partition is in an offline log directory)responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)Nonecase HostedPartition.Online(partition) Some(partition)case HostedPartition.None val partition Partition(topicPartition, time, this)allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))Some(partition)}//接下来检查分区的主题ID和Leader的epoch版本号等信息。partitionOpt.foreach { partition val currentLeaderEpoch partition.getLeaderEpochval requestLeaderEpoch partitionState.leaderEpochval requestTopicId topicIdFromRequest(topicPartition.topic)val logTopicId partition.topicIdif (!hasConsistentTopicId(requestTopicId, logTopicId)) {//如果主题ID不一致则记录错误并将其添加到响应映射responseMap中。stateChangeLogger.error(sTopic ID in memory: ${logTopicId.get} does not s match the topic ID for partition $topicPartition received: s${requestTopicId.get}.)responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)} //如果Leader的epoch大于当前的epoch则记录控制器controller的epoch并将分区添加到要成为Leader或Follower的集合中。 else if (requestLeaderEpoch currentLeaderEpoch) { //分区副本确定是当前broker的则添加到partitionsToBeLeader或者partitionsToBeFollower//如果分区副本是leader并且broker是当前broker则加入partitionsToBeLeader//其他的放入到partitionsToBeFollower//这样保证后续操作partitionsToBeLeader或者partitionsToBeFollower只操作当前broker的if (partitionState.replicas.contains(localBrokerId)) {partitions partitionif (partitionState.leader localBrokerId) {partitionsToBeLeader.put(partition, partitionState)} else {partitionsToBeFollower.put(partition, partitionState)}} //省略代码.....}//创建高水位线检查点val highWatermarkCheckpoints new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)//如果partitionsToBeLeader非空则调用makeLeaders方法将指定的分区设置为Leader并返回这些分区的集合否则返回空集合val partitionsBecomeLeader if (partitionsToBeLeader.nonEmpty)//这个是处理Leader的逻辑makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//如果partitionsToBeFollower非空则调用makeFollowers方法将指定的分区副本设置为Follower并返回这些分区的集合否则返回空集合。val partitionsBecomeFollower if (partitionsToBeFollower.nonEmpty)//这个是处理Follower的逻辑makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//根据partitionsBecomeFollower集合获取Follower分区的主题集合并更新相关指标。val followerTopicSet partitionsBecomeFollower.map(_.topic).toSetupdateLeaderAndFollowerMetrics(followerTopicSet)//如果topicIdUpdateFollowerPartitions非空则调用updateTopicIdForFollowers方法更新Follower分区的主题ID。if (topicIdUpdateFollowerPartitions.nonEmpty)updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest) //启动高水位检查点线程。startHighWatermarkCheckPointThread()//根据参数初始化日志目录获取器maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)//关闭空闲的副本获取器线程//todo FetcherThreadsreplicaFetcherManager.shutdownIdleFetcherThreads()replicaAlterLogDirsManager.shutdownIdleFetcherThreads()//省略代码.... }}//省略代码.... }}因为这篇文章主要写Follower如何拉取数据所以只需要关注上面代码中的makeFollowers就可以了
1、如果是Follower则准备创建Fetcher线程好异步执行向Leader拉取数据
/*1. 从领导者分区集中删除这些分区。2. 将这些 partition 标记为 follower之后这些 partition 就不会再接收 produce 的请求了3. 停止对这些 partition 的副本同步这样这些副本就不会再有来自副本请求线程的数据进行追加了4.对这些 partition 的 offset 进行 checkpoint如果日志需要截断就进行截断操作5. 清空 purgatory 中的 produce 和 fetch 请求6.如果代理未关闭向这些 partition 的新 leader 启动副本同步线程* 执行这些步骤的顺序可确保转换中的副本在检查点偏移之前不会再接收任何消息以便保证检查点之前的所有消息都刷新到磁盘*如果此函数中抛出意外错误它将被传播到 KafkaAPIS其中将在每个分区上设置错误消息因为我们不知道是哪个分区导致了它。否则返回由于此方法而成为追随者的分区集*/private def makeFollowers(controllerId: Int,controllerEpoch: Int,partitionStates: Map[Partition, LeaderAndIsrPartitionState],correlationId: Int,responseMap: mutable.Map[TopicPartition, Errors],highWatermarkCheckpoints: OffsetCheckpoints,topicIds: String Option[Uuid]) : Set[Partition] {val traceLoggingEnabled stateChangeLogger.isTraceEnabled//省略代码。。。。//创建一个可变的Set[Partition]对象partitionsToMakeFollower,用于统计follower的集合val partitionsToMakeFollower: mutable.Set[Partition] mutable.Set()try {partitionStates.forKeyValue { (partition, partitionState) //遍历partitionStates中的每个分区根据分区的leader是否可用来改变分区的状态。val newLeaderBrokerId partitionState.leadertry {if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {//如果分区的leader可用将分区设置为follower并将其添加到partitionsToMakeFollower中。// Only change partition state when the leader is availableif (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {partitionsToMakeFollower partition}} else {//省略代码。。} catch {//省略代码。。。}}//删除针对partitionsToMakeFollower中 partition 的副本同步线程replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))stateChangeLogger.info(sStopped fetchers as part of become-follower request from controller $controllerId sepoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions)//对于每个分区完成延迟的抓取或生产请求。partitionsToMakeFollower.foreach { partition completeDelayedFetchOrProduceRequests(partition.topicPartition)}//如果正在关闭服务器跳过添加抓取器的步骤。if (isShuttingDown.get()) {//省略代码} else {//对于每个分区获取分区的leader和抓取偏移量并构建partitionsToMakeFollowerWithLeaderAndOffset映射。val partitionsToMakeFollowerWithLeaderAndOffset partitionsToMakeFollower.map { partition val leaderNode partition.leaderReplicaIdOpt.flatMap(leaderId metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())val leader new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())val log partition.localLogOrExceptionval fetchOffset initialFetchOffset(log)partition.topicPartition - InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)}.toMap//添加抓取器以获取partitionsToMakeFollowerWithLeaderAndOffset中的分区。replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)}} catch {//省略代码}//省略代码。。。}2、遍历分区Follower副本判断是否有向目标broker现成的Fetcher如果是则复用否则创建
之后执行replicaFetcherManager.addFetcherForPartitions把信息添加到指定的Fetcher线程中 // to be defined in subclass to create a specific fetcherdef createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是将分区和偏移量添加到相应的Fetcher线程中def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit {lock synchronized {//首先对partitionAndOffsets进行分组按照BrokerAndFetcherId来分组val partitionsPerFetcher partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))}def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,brokerIdAndFetcherId: BrokerIdAndFetcherId): T {//创建Fetcher线程 val fetcherThread createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)//把线程放入到fetcherThreadMapfetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)//线程启动fetcherThread.start()fetcherThread}for ((brokerAndFetcherId, initialFetchOffsets) - partitionsPerFetcher) {val brokerIdAndFetcherId BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)//将启动的FetcherThread线程添加到fetcherThreadMap中val fetcherThread fetcherThreadMap.get(brokerIdAndFetcherId) match {// //检查是否已经存在一个与当前broker和fetcher id相匹配的Fetcher线程。case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() brokerAndFetcherId.broker // reuse the fetcher thread//如果存在则重用该线程currentFetcherThreadcase Some(f) //如果之前有,fetcher线程则先关闭在创建一个新的Fetcher线程并启动f.shutdown()addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)case None //创建一个新的Fetcher线程并启动addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)}//将分区添加到相应的Fetcher线程中// failed partitions are removed when added partitions to threadaddPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)}}}上面可能有疑问为什么有重用fetcher线程 答案是broker并不一定会为每一个主题分区的Follower都启动一个 fetcher 线程对于一个目的 broker只会启动 num.replica.fetchers 个线程具体这个 topic-partition 会分配到哪个 fetcher 线程上是根据 topic 名和 partition id 进行计算得到实现所示 // Visibility for testingprivate[server] def getFetcherId(topicPartition: TopicPartition): Int {lock synchronized {Utils.abs(31 * topicPartition.topic.hashCode() topicPartition.partition) % numFetchersPerBroker}}继续往下其中createFetcherThread的实现是下面
3、创建Fetcher线程的实现
class ReplicaFetcherManager(brokerConfig: KafkaConfig,protected val replicaManager: ReplicaManager,metrics: Metrics,time: Time,threadNamePrefix: Option[String] None,quotaManager: ReplicationQuotaManager,metadataVersionSupplier: () MetadataVersion,brokerEpochSupplier: () Long)extends AbstractFetcherManager[ReplicaFetcherThread](name ReplicaFetcherManager on broker brokerConfig.brokerId,clientId Replica,numFetchers brokerConfig.numReplicaFetchers) {override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread {val prefix threadNamePrefix.map(tp s$tp:).getOrElse()val threadName s${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}val logContext new LogContext(s[ReplicaFetcher replicaId${brokerConfig.brokerId}, leaderId${sourceBroker.id}, sfetcherId$fetcherId] )val endpoint new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,sbroker-${brokerConfig.brokerId}-fetcher-$fetcherId, logContext)val fetchSessionHandler new FetchSessionHandler(logContext, sourceBroker.id)val leader new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)// 创建了一个ReplicaFetcherThread对象它的构造函数接受多个参数用于副本的获取和管理。new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,quotaManager, logContext.logPrefix, metadataVersionSupplier)}def shutdown(): Unit {info(shutting down)closeAllFetchers()info(shutdown completed)}
}其中new ReplicaFetcherThread返回一个创建的线程
class ReplicaFetcherThread(name: String,leader: LeaderEndPoint,brokerConfig: KafkaConfig,failedPartitions: FailedPartitions,replicaMgr: ReplicaManager,quota: ReplicaQuota,logPrefix: String,metadataVersionSupplier: () MetadataVersion)extends AbstractFetcherThread(name name,clientId name,leader leader,failedPartitions,fetchTierStateMachine new ReplicaFetcherTierStateMachine(leader, replicaMgr),fetchBackOffMs brokerConfig.replicaFetchBackoffMs,isInterruptible false,replicaMgr.brokerTopicStats) {override def doWork(): Unit {super.doWork()completeDelayedFetchRequests()}}而ReplicaFetcherThread继承的AbstractFetcherThread类AbstractFetcherThread又继承自ShutdownableThread类其中ShutdownableThread中的run方法是线程的执行函数
abstract class AbstractFetcherThread(name: String,clientId: String,val leader: LeaderEndPoint,failedPartitions: FailedPartitions,val fetchTierStateMachine: TierStateMachine,fetchBackOffMs: Int 0,isInterruptible: Boolean true,val brokerTopicStats: BrokerTopicStats) //BrokerTopicStatss lifecycle managed by ReplicaManagerextends ShutdownableThread(name, isInterruptible) with Logging {override def doWork(): Unit {maybeTruncate()maybeFetch()}
}public abstract class ShutdownableThread extends Thread {//省略代码public abstract void doWork();public void run() {isStarted true;log.info(Starting);try {while (isRunning())doWork();} catch (FatalExitError e) {shutdownInitiated.countDown();shutdownComplete.countDown();log.info(Stopped);Exit.exit(e.statusCode());} catch (Throwable e) {if (isRunning())log.error(Error due to, e);} finally {shutdownComplete.countDown();}log.info(Stopped);}
} ShutdownableThread中的run函数调用子类的doWork() 而doWork中的执行顺序如下
//是否截断maybeTruncate()//抓取maybeFetch()//处理延时抓取请求completeDelayedFetchRequests()其中 maybeFetch()就是正常拼接fetch请求并向目标broker发送请求调用broker的case ApiKeys.FETCH handleFetchRequest(request)
private def maybeFetch(): Unit {//分区映射锁val fetchRequestOpt inLock(partitionMapLock) {val ResultWithPartitions(fetchRequestOpt, partitionsWithError) leader.buildFetch(partitionStates.partitionStateMap.asScala)handlePartitionsWithErrors(partitionsWithError, maybeFetch)if (fetchRequestOpt.isEmpty) {trace(sThere are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request)partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)}fetchRequestOpt}fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) processFetchRequest(sessionPartitions, fetchRequest)}}