当前位置: 首页 > news >正文

17网站一起做网批房产中介网站开发

17网站一起做网批,房产中介网站开发,河北石家庄属于几线城市,黄浦网站设计背景 在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 现在我们接着跟踪相应…背景 在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent  现在我们接着跟踪相应代码观察算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义At least once, exactly once对于checkpoint的影响是怎样的。 代码分析 org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。 processBarrier方法实现上就可以看出flink barrier的处理分成两种。 在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。 通过往上追溯barrierHandler的生成我们跟踪到方法org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler 如果checkpoint 模式是AT_LEAST_ONCE 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。 org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler 这里针对checkpoint类型做了区分主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。 对比这两个方法参数的差异发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时会切换为unaligned checkpoint。这里可以先按下不表回到最开始的处理过程。 总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。 At least once 下 barrier是如何处理的 at least once 下对于barrier的处理是在以下的方法中实现的。 org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug(Received barrier for checkpoint {} from channel {}, barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount null;int pos 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId barrierId) {barrierCount next;break;}pos;}if (barrierCount ! null) {// add one to the count to that barrier and check for completionint numBarriersNew barrierCount.incrementBarrierCount();if (numBarriersNew totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i 0; i pos; i) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug(Received all barriers for checkpoint {}, barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}} 如果只有一个inputchannel的情况下在收到这一个barrier的时候就可以做snapshot. 在这个中间会经过triggerCheckpointOnBarrier 等方法 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState 看到这里其实这很长的链路实际是一个循环下一个算子会生成barrier,接着传递这个barrier。 实际情况是作业并行度不唯一一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。 这里面当收取到第一个barrier会将这个barrier信息存在一个队列中。 每当收到一个barrier的时候会进行计数当收取到的是最后一个barrier的时候把之前的barrier全部清除之后就可以通知做checkpoint snapshot 这个流程就和之前的一个信道的checkpoint流程是一致的。 总结而言at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。 Exactly once checkpoint是如何处理的 首先看这一段的代码 Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId barrier.getId();LOG.debug({}: Received barrier from channel {} {}., taskName, channelInfo, barrierId);if (currentCheckpointId barrierId|| (currentCheckpointId barrierId !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId barrierId);if (numBarriersReceived 0) {if (getNumOpenChannels() 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived numOpenChannels) {if (getNumOpenChannels() 1) {markAlignmentEnd();}}try {currentState currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived numOpenChannels) {numBarriersReceived 0;lastCancelledOrCompletedCheckpointId currentCheckpointId;LOG.debug({}: Received all barriers for checkpoint {}., taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}} 这里需要关注一下currentState 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier 因此可以可以看这个方法具体是现实。 这里可以看到这里会block 住收到barrier的信道如果barrier 都收齐了之后会检查是不是unaligned的checkpoint 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。 这里的下一个分支是超时转化比如设置为30s前30s是做aligned checkpoint 如果30s还没有完成就会转化为unaligned checkpoint。 当然你如果不想有超时时间可以直接设置为0. 如果是unaligned checkpoint 会将channel 里面的数据也写会到远端。 这个中间会有一些状态转化每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说uc的第一个barrier到达了就会触发一次global checkpoint所以这个时候是不会block住信道的。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived 最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。 总结一下在exactly once的语义下aligned checkpoint的做法是收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。 unaligned的做法是收到barrier的时候第一步就会触发一次checkpoint 之后会不断上传channel state 当收到最后一个barrier则表示checkpoint结束。 在这一篇文章我们主要介绍了ssubtask级别的snapshot的后面再进一步介绍一下整体流程就可以结束相关介绍了。
http://www.hkea.cn/news/14490275/

相关文章:

  • 公司建站方案大连seo排名扣费
  • 湖北二师网站建设排名物联网工程专业好就业吗
  • 能自己在家做网站吗购物软件有哪些
  • keywordspy网站做分析哪个网站做签约插画师好
  • 输变电壹级电力建设公司网站网站设计常见流程
  • 做公司网站协议书模板下载公司网站如何优化
  • 捕鱼网站怎么做衡水网页网站建设
  • 碑林微网站建设网站建设东莞公司
  • 网站建设 自学 电子版 pdf下载网站建设优化推广系统
  • <网站建设与运营》海南省住房公积金管理局官网
  • 郑州阿里巴巴网站建设wordpress获取文章图片不显示
  • 网页设计创建站点教程济南比较好的网站开发公司
  • 求免费的那种网站有哪些中英文网站建设公司
  • 发外链的论坛网站移动网站开发的视频下载
  • 建设网站要注意事项单页网站技术
  • 广扬建设集团网站做网站时为什么导航时两行字
  • soho没有注册公司 能建一个外贸网站吗南宁制作营销型网站
  • 中国建设银行官方网站首页高端网站开发哪家好
  • 网站建设策划实施要素wordpress页面文件目录
  • 做商业网站的服务费维护费wordpress简码
  • 网站开发部门叫什么天元建设集团有限公司项目
  • 东阿聊城做网站的公司wordpress简约博客主题
  • 宁乡网站建设在哪做外贸在什么网站做
  • 网站建设和维护自学linux网站备份
  • 如何查看网站是否被降权营销策略有哪些方法
  • .net网站开发面试做网站经营流量
  • 京东的电子商务网站建设龙岩网站建设费用
  • 可以做科学模拟实验的网站江门外贸网站推广方案
  • 服务号网站建设wordpress add_filter
  • 如何给网站增加图标做同款的网站