当前位置: 首页 > news >正文

网站建设有哪些问题推荐微商城网站建设

网站建设有哪些问题,推荐微商城网站建设,厦门网站建设有限公司,百度竞价点击一次多少钱前言 目前已发布了3篇关于Flink RPC相关的文章#xff0c;分别从底层通信系统akka/Pekko#xff0c;RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制#xff08;一#xff09;-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制分别从底层通信系统akka/PekkoRPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制一-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制二-动态代理-CSDN博客 深度了解flink rpc机制三-组件以及交互-CSDN博客 这篇文章通过分析源码对以上知识进行验证并串联加深印象更深入的了解Flink RPC的实现原理。本篇文章分享TaskManager启动和向ResouceManager注册的流程TaskManager在flink 1.12之后被更名为TaskExecutor可能文章中两个名称都会使用大家理解成一个就行。 TaskManage启动源码分析 入口类 TaskManager的启动类入口,以Flink的Standalone模式为例可以在flink目录下的bin目录的flink-daemon.sh找到入口类 . $bin/config.shcase $DAEMON in(taskexecutor)CLASS_TO_RUNorg.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUNorg.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUNorg.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUNorg.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUNorg.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo Unknown daemon ${DAEMON}. $USAGE.exit 1;; esac 从这里可以看到Standalon模式下各个组件的启动类入口TaskManager的入口类是TaskManageRunner,做为组件的入口类肯定会有main方法 public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, TaskManager, args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles ! -1L) {LOG.info(Maximum number of open file descriptors is {}., maxOpenFileHandles);} else {LOG.info(Cannot determine the maximum number of open file descriptors);}//安装的方式启动taskmanager进程 runTaskManagerProcessSecurely(args);} 之后就是在TaskManageRunner的方法调用了最终会进入到runTaskManager这个静态方法 public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//之前方法都是静态方法调用初始化taskManagerRunner对象taskManagerRunner new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService);//开始创建TaskmanagertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException(Failed to start the TaskManagerRunner., exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException(Unexpected failure during runtime of TaskManagerRunner.,ExceptionUtils.stripExecutionException(t));}} 之前一直是在调用TaskManageRunner的静态方法做一些日志加载安全检查的前置校验此时才真正的实例化TaskManageRunner对象调用start方法进行TaskManager的创建 //taskManagerRunner.start()public void start() throws Exception {synchronized (lock) {startTaskManagerRunnerServices();taskExecutorService.start();} } 创建RpcService和TaskExecutor taskManagerRunner.start()方法内部有两个方法的调用 startTaskManagerRunnerServices() private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem RpcSystem.load(configuration);//非RPC相关 代码省略JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//创建rpcServicerpcService createRpcService(configuration, highAvailabilityServices, rpcSystem);//非RPC相关 代码省略//创建TaskExecutortaskExecutorService taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}} 可以看到这个方法首先调用createRpcService这个方法这个方法内部内就是去创建ActorSystem初始化RpcService 初始化RpcServer和PekkoInvocationHandler 然后就是创建TaskExecutorTaskExecutor继承自EndPoint,EndPoint构造方法执行的时候会初始化RpcServer /*** Initializes the RPC endpoint.** param rpcService The RPC server that dispatches calls to this RPC endpoint.* param endpointId Unique identifier for this endpoint*/protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService checkNotNull(rpcService, rpcService);this.endpointId checkNotNull(endpointId, endpointId);//创建RpcServer 方法内部//1.创建Acotr通信对象PekkoRpcActor//2.对象动态代理对象PekkoInvocationHandler赋值给rpcServerthis.rpcServer rpcService.startServer(this);this.resourceRegistry new CloseableRegistry();this.mainThreadExecutor new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);registerResource(this.mainThreadExecutor);} taskExecutorService.start() 这个方法会调用TaskExecutor对象的start方法,会调用父类EndPoint的start方法 /*** Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc* endpoint is ready to process remote procedure calls.*/public final void start() {rpcServer.start();}rpcServer.start()方法如下 public void start() {//rpcEndpoint是Actor对象rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());}这块儿代码就是taskmanger初始化后自己会给自己发送一个Akka START控制类的消息准确来说是继承了EndPoint的类都会在初始化之后给自身发送一个这样的消息。 因为发的是Akka的消息会进入到TaskExecutor的PekkoInvocationHandler#createReceive接收Akka消息的逻辑 //构造方法PekkoRpcActor(final T rpcEndpoint,final CompletableFutureBoolean terminationFuture,final int version,final long maximumFramesize,final boolean forceSerialization,final ClassLoader flinkClassLoader) {//省略其他代码//PekkoPrcActor初始化 会将state枚举值设置为StoppedState.STOPPEDthis.state StoppedState.STOPPED;}//接收消息Overridepublic Receive createReceive() {return ReceiveBuilder.create()//匹配到握手消息.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)//控制类消息.match(ControlMessages.class, this::handleControlMessage)//除以上两种之外的任意消息.matchAny(this::handleMessage).build();//处理控制类消息的逻辑 private void handleControlMessage(ControlMessages controlMessage) {try {switch (controlMessage) {case START:state state.start(this, flinkClassLoader);break;case STOP:state state.stop();break;case TERMINATE:state state.terminate(this, flinkClassLoader);break;default:handleUnknownControlMessage(controlMessage);}} catch (Exception e) {this.rpcEndpointTerminationResult RpcEndpointTerminationResult.failure(e);throw e;}} PekkoRpcActor在初始化的时候会 将自身state属性设置为StoppedState.STOPPED; 接收到ControlMessages.START消息会走到handleControlMessage方法的case stop分支因为state是StoppedState.STOPPED所以代码会走到StoppedState这个静态枚举类的start方法 public State start(PekkoRpcActor? pekkoRpcActor, ClassLoader flinkClassLoader) {pekkoRpcActor.mainThreadValidator.enterMainThread();try {runWithContextClassLoader(() - pekkoRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);} catch (Throwable throwable) {pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format(Could not start RpcEndpoint %s.,pekkoRpcActor.rpcEndpoint.getEndpointId()),throwable)));} finally {pekkoRpcActor.mainThreadValidator.exitMainThread();}return StartedState.STARTED;} pekkoRpcActor.rpcEndpoint.internalCallOnStart()这块儿代码是关键又指定到了Endpoint定义的方法 public final void internalCallOnStart() throws Exception {validateRunsInMainThread();isRunning true;onStart();}protected void onStart() throws Exception {} 这块儿代码饶了半天其实用大白话来讲就是Flink任何需要进行通信的组件都要继承Endpoint类组件初始化之前会先初始化RpcService对象作为Endpoint子类的成员变量然后再由RpcService初始化ActorSystem,创建Actor和代理对象之后再给自身发一个控制类的START方法最后一定要进入到自身的onStart方法 TaskExecutor向ResourceManager注册流程 onStart方法开始进入到向ResourceManager注册的流程 Overridepublic void onStart() throws Exception {try {//开始向ResourceManager注册startTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception new TaskManagerException(String.format(Could not start the TaskExecutor %s, getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManager//new ResourceManagerLeaderListener()是真正注册的代码resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//省略其他代码} catch (Exception e) {handleStartTaskExecutorServicesException(e);}} new ResourceManagerLeaderListener()是真正注册的方法 private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() -notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}Overridepublic void handleError(Exception exception) {onFatalError(exception);}} 再进入到notifyOfNewResourceManagerLeader方法内部 private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {//获取ResouceManager的通信地址resourceManagerAddress createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//尝试连接ResouceMnangerreconnectToResourceManager(new FlinkException(String.format(ResourceManager leader changed to new address %s,resourceManagerAddress))); } reconnectToResourceManager方法内部 private void reconnectToResourceManager(Exception cause) {//如果已存在ResourceManger的连接 关闭连接closeResourceManagerConnection(cause);//设置注册超时时间startRegistrationTimeout();//继续尝试连接ResouceManagertryConnectToResourceManager();} tryConnectToResourceManager(); private void tryConnectToResourceManager() {if (resourceManagerAddress ! null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress ! null);assert (establishedResourceManagerConnection null);assert (resourceManagerConnection null);log.info(Connecting to ResourceManager {}., resourceManagerAddress);//封装taskExecutor的信息地址 硬件资源 内存资源final TaskExecutorRegistration taskExecutorRegistration new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());resourceManagerConnection new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();} 进入到connectToResourceManager方法封装注册信息。进入start方法 public void start() {checkState(!closed, The RPC connection is already closed);checkState(!isConnected() pendingRegistration null,The RPC connection is already started);//创建注册成功、注册失败的回调方法final RetryingRegistrationF, G, S, R newRegistration createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//开始主持newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}} 首先创建注册成功和主持失败的回调方法然后继续进入注册的流程 public void startRegistration() {//创建动态代理对象final CompletableFutureG rpcGatewayFuture;//ResourceManager可能有主从所以走Fenced这块儿if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture (CompletableFutureG)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture rpcService.connect(targetAddress, targetType);}//省略其他代码 }private C extends RpcGateway CompletableFutureC connectInternal(final String address,final ClassC clazz,FunctionActorRef, InvocationHandler invocationHandlerFactory) {checkState(!stopped, RpcService is stopped);//省略无关代码//握手确保连接正常final CompletableFutureHandshakeSuccessMessage handshakeFuture final CompletableFutureC gatewayFuture actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) - {InvocationHandler invocationHandler invocationHandlerFactory.apply(actorRef);ClassLoader classLoader getClass().getClassLoader();//真正核心的代码 创建代理的实现SuppressWarnings(unchecked)C proxy (C)Proxy.newProxyInstance(classLoader,new Class?[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);} 然后就会走到RpcService获取到ReouceManager的代理然后将代理对象和主持方法通过akka消息发送到ResouceManager的RpcActor,然后进入消息处理执行代理的对象的注册方法也就是ResouceManager的注册方法从而将TaskManager进行注册 启动注册流程图
http://www.hkea.cn/news/14409088/

相关文章:

  • 孝感网站建设公司潍坊地区制作网站
  • 网站建设报价 东莞运营推广的工作内容
  • 网站建设蛋蛋28数字博物馆网站建设内容
  • 网站后台需要多少页面设计培训学什么
  • 丰台建设企业网站高端网站建设 源码
  • 网站域名怎么看企业融资什么意思
  • 嘉兴手机网站制作wordpress 归类插件
  • 网站编辑模版烟台网站优化公司
  • 网站这么做优化网站的内容做证据观点
  • 佛山建网站哪家好做个网站网站需要多少钱
  • 网站开发建设账务处理程序ftp怎么连接网站
  • 网站 建立目录出错网站设计制作公司需要什么资质
  • 企业网站如何建设报告psd数据网站
  • 肥城网站开发公司智汇隆网站建设
  • 网站推广工做计划范本龙口网站建设公司哪家好
  • 新昌做网站票付通app下载
  • 西安seo建站网站设计的初衷
  • 网站目录结构设计应注意的问题做网站怎么合并单元格
  • 网站制作 南通做一个手机app大概需要多少钱
  • 网站建设毕业设计评价做网站用最新软件
  • 网站开发都做什么什么是网站关键词
  • 如何用dede做带下单的网站荣耀商城手机官网
  • 搭建网站的步骤和顺序30天网站建设实录视频云盘
  • 外国炫酷网站设计专门做mmd的网站
  • 网站开发技术要求深圳fpc人才网官网
  • 简单做网站的软件营销型网站 策划运营网站
  • 爱站seo综合查询有pc网站 移动网站怎么做
  • 电子商务网站建设平台郑州政策最新消息
  • 高毅资产网站谁做的企业vi设计需求
  • 网站效果展示广州培训网站建设