当前位置: 首页 > news >正文

网站建设外包需要注意哪些郑州建网站多少

网站建设外包需要注意哪些,郑州建网站多少,张家港网站制作,租用的网站空间的缺点前言 目前已发布了3篇关于Flink RPC相关的文章#xff0c;分别从底层通信系统akka/Pekko#xff0c;RPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制#xff08;一#xff09;-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制分别从底层通信系统akka/PekkoRPC实现方式动态代理以及Flink RPC相关的组件做了介绍 深度了解flink rpc机制一-Akka/Pekko_flink pekko akka-CSDN博客 深度了解flink rpc机制二-动态代理-CSDN博客 深度了解flink rpc机制三-组件以及交互-CSDN博客 这篇文章通过分析源码对以上知识进行验证并串联加深印象更深入的了解Flink RPC的实现原理。本篇文章分享TaskManager启动和向ResouceManager注册的流程TaskManager在flink 1.12之后被更名为TaskExecutor可能文章中两个名称都会使用大家理解成一个就行。 TaskManage启动源码分析 入口类 TaskManager的启动类入口,以Flink的Standalone模式为例可以在flink目录下的bin目录的flink-daemon.sh找到入口类 . $bin/config.shcase $DAEMON in(taskexecutor)CLASS_TO_RUNorg.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUNorg.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUNorg.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUNorg.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUNorg.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo Unknown daemon ${DAEMON}. $USAGE.exit 1;; esac 从这里可以看到Standalon模式下各个组件的启动类入口TaskManager的入口类是TaskManageRunner,做为组件的入口类肯定会有main方法 public static void main(String[] args) throws Exception {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, TaskManager, args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);long maxOpenFileHandles EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles ! -1L) {LOG.info(Maximum number of open file descriptors is {}., maxOpenFileHandles);} else {LOG.info(Cannot determine the maximum number of open file descriptors);}//安装的方式启动taskmanager进程 runTaskManagerProcessSecurely(args);} 之后就是在TaskManageRunner的方法调用了最终会进入到runTaskManager这个静态方法 public static int runTaskManager(Configuration configuration, PluginManager pluginManager)throws Exception {final TaskManagerRunner taskManagerRunner;try {//之前方法都是静态方法调用初始化taskManagerRunner对象taskManagerRunner new TaskManagerRunner(configuration,pluginManager,TaskManagerRunner::createTaskExecutorService);//开始创建TaskmanagertaskManagerRunner.start();} catch (Exception exception) {throw new FlinkException(Failed to start the TaskManagerRunner., exception);}try {return taskManagerRunner.getTerminationFuture().get().getExitCode();} catch (Throwable t) {throw new FlinkException(Unexpected failure during runtime of TaskManagerRunner.,ExceptionUtils.stripExecutionException(t));}} 之前一直是在调用TaskManageRunner的静态方法做一些日志加载安全检查的前置校验此时才真正的实例化TaskManageRunner对象调用start方法进行TaskManager的创建 //taskManagerRunner.start()public void start() throws Exception {synchronized (lock) {startTaskManagerRunnerServices();taskExecutorService.start();} } 创建RpcService和TaskExecutor taskManagerRunner.start()方法内部有两个方法的调用 startTaskManagerRunnerServices() private void startTaskManagerRunnerServices() throws Exception {synchronized (lock) {rpcSystem RpcSystem.load(configuration);//非RPC相关 代码省略JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));//创建rpcServicerpcService createRpcService(configuration, highAvailabilityServices, rpcSystem);//非RPC相关 代码省略//创建TaskExecutortaskExecutorService taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId.unwrap(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,workingDirectory.unwrap(),this,delegationTokenReceiverRepository);}} 可以看到这个方法首先调用createRpcService这个方法这个方法内部内就是去创建ActorSystem初始化RpcService 初始化RpcServer和PekkoInvocationHandler 然后就是创建TaskExecutorTaskExecutor继承自EndPoint,EndPoint构造方法执行的时候会初始化RpcServer /*** Initializes the RPC endpoint.** param rpcService The RPC server that dispatches calls to this RPC endpoint.* param endpointId Unique identifier for this endpoint*/protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService checkNotNull(rpcService, rpcService);this.endpointId checkNotNull(endpointId, endpointId);//创建RpcServer 方法内部//1.创建Acotr通信对象PekkoRpcActor//2.对象动态代理对象PekkoInvocationHandler赋值给rpcServerthis.rpcServer rpcService.startServer(this);this.resourceRegistry new CloseableRegistry();this.mainThreadExecutor new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);registerResource(this.mainThreadExecutor);} taskExecutorService.start() 这个方法会调用TaskExecutor对象的start方法,会调用父类EndPoint的start方法 /*** Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc* endpoint is ready to process remote procedure calls.*/public final void start() {rpcServer.start();}rpcServer.start()方法如下 public void start() {//rpcEndpoint是Actor对象rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());}这块儿代码就是taskmanger初始化后自己会给自己发送一个Akka START控制类的消息准确来说是继承了EndPoint的类都会在初始化之后给自身发送一个这样的消息。 因为发的是Akka的消息会进入到TaskExecutor的PekkoInvocationHandler#createReceive接收Akka消息的逻辑 //构造方法PekkoRpcActor(final T rpcEndpoint,final CompletableFutureBoolean terminationFuture,final int version,final long maximumFramesize,final boolean forceSerialization,final ClassLoader flinkClassLoader) {//省略其他代码//PekkoPrcActor初始化 会将state枚举值设置为StoppedState.STOPPEDthis.state StoppedState.STOPPED;}//接收消息Overridepublic Receive createReceive() {return ReceiveBuilder.create()//匹配到握手消息.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)//控制类消息.match(ControlMessages.class, this::handleControlMessage)//除以上两种之外的任意消息.matchAny(this::handleMessage).build();//处理控制类消息的逻辑 private void handleControlMessage(ControlMessages controlMessage) {try {switch (controlMessage) {case START:state state.start(this, flinkClassLoader);break;case STOP:state state.stop();break;case TERMINATE:state state.terminate(this, flinkClassLoader);break;default:handleUnknownControlMessage(controlMessage);}} catch (Exception e) {this.rpcEndpointTerminationResult RpcEndpointTerminationResult.failure(e);throw e;}} PekkoRpcActor在初始化的时候会 将自身state属性设置为StoppedState.STOPPED; 接收到ControlMessages.START消息会走到handleControlMessage方法的case stop分支因为state是StoppedState.STOPPED所以代码会走到StoppedState这个静态枚举类的start方法 public State start(PekkoRpcActor? pekkoRpcActor, ClassLoader flinkClassLoader) {pekkoRpcActor.mainThreadValidator.enterMainThread();try {runWithContextClassLoader(() - pekkoRpcActor.rpcEndpoint.internalCallOnStart(), flinkClassLoader);} catch (Throwable throwable) {pekkoRpcActor.stop(RpcEndpointTerminationResult.failure(new RpcException(String.format(Could not start RpcEndpoint %s.,pekkoRpcActor.rpcEndpoint.getEndpointId()),throwable)));} finally {pekkoRpcActor.mainThreadValidator.exitMainThread();}return StartedState.STARTED;} pekkoRpcActor.rpcEndpoint.internalCallOnStart()这块儿代码是关键又指定到了Endpoint定义的方法 public final void internalCallOnStart() throws Exception {validateRunsInMainThread();isRunning true;onStart();}protected void onStart() throws Exception {} 这块儿代码饶了半天其实用大白话来讲就是Flink任何需要进行通信的组件都要继承Endpoint类组件初始化之前会先初始化RpcService对象作为Endpoint子类的成员变量然后再由RpcService初始化ActorSystem,创建Actor和代理对象之后再给自身发一个控制类的START方法最后一定要进入到自身的onStart方法 TaskExecutor向ResourceManager注册流程 onStart方法开始进入到向ResourceManager注册的流程 Overridepublic void onStart() throws Exception {try {//开始向ResourceManager注册startTaskExecutorServices();} catch (Throwable t) {final TaskManagerException exception new TaskManagerException(String.format(Could not start the TaskExecutor %s, getAddress()), t);onFatalError(exception);throw exception;}startRegistrationTimeout();}private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManager//new ResourceManagerLeaderListener()是真正注册的代码resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//省略其他代码} catch (Exception e) {handleStartTaskExecutorServicesException(e);}} new ResourceManagerLeaderListener()是真正注册的方法 private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {runAsync(() -notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}Overridepublic void handleError(Exception exception) {onFatalError(exception);}} 再进入到notifyOfNewResourceManagerLeader方法内部 private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {//获取ResouceManager的通信地址resourceManagerAddress createResourceManagerAddress(newLeaderAddress, newResourceManagerId);//尝试连接ResouceMnangerreconnectToResourceManager(new FlinkException(String.format(ResourceManager leader changed to new address %s,resourceManagerAddress))); } reconnectToResourceManager方法内部 private void reconnectToResourceManager(Exception cause) {//如果已存在ResourceManger的连接 关闭连接closeResourceManagerConnection(cause);//设置注册超时时间startRegistrationTimeout();//继续尝试连接ResouceManagertryConnectToResourceManager();} tryConnectToResourceManager(); private void tryConnectToResourceManager() {if (resourceManagerAddress ! null) {connectToResourceManager();}}private void connectToResourceManager() {assert (resourceManagerAddress ! null);assert (establishedResourceManagerConnection null);assert (resourceManagerConnection null);log.info(Connecting to ResourceManager {}., resourceManagerAddress);//封装taskExecutor的信息地址 硬件资源 内存资源final TaskExecutorRegistration taskExecutorRegistration new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile(),unresolvedTaskManagerLocation.getNodeId());resourceManagerConnection new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();} 进入到connectToResourceManager方法封装注册信息。进入start方法 public void start() {checkState(!closed, The RPC connection is already closed);checkState(!isConnected() pendingRegistration null,The RPC connection is already started);//创建注册成功、注册失败的回调方法final RetryingRegistrationF, G, S, R newRegistration createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {//开始主持newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}} 首先创建注册成功和主持失败的回调方法然后继续进入注册的流程 public void startRegistration() {//创建动态代理对象final CompletableFutureG rpcGatewayFuture;//ResourceManager可能有主从所以走Fenced这块儿if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture (CompletableFutureG)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture rpcService.connect(targetAddress, targetType);}//省略其他代码 }private C extends RpcGateway CompletableFutureC connectInternal(final String address,final ClassC clazz,FunctionActorRef, InvocationHandler invocationHandlerFactory) {checkState(!stopped, RpcService is stopped);//省略无关代码//握手确保连接正常final CompletableFutureHandshakeSuccessMessage handshakeFuture final CompletableFutureC gatewayFuture actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) - {InvocationHandler invocationHandler invocationHandlerFactory.apply(actorRef);ClassLoader classLoader getClass().getClassLoader();//真正核心的代码 创建代理的实现SuppressWarnings(unchecked)C proxy (C)Proxy.newProxyInstance(classLoader,new Class?[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);} 然后就会走到RpcService获取到ReouceManager的代理然后将代理对象和主持方法通过akka消息发送到ResouceManager的RpcActor,然后进入消息处理执行代理的对象的注册方法也就是ResouceManager的注册方法从而将TaskManager进行注册 启动注册流程图
http://www.hkea.cn/news/14396545/

相关文章:

  • 网站工作室和网络公司盘锦做网站哪家好
  • 河北中太建设集团有限公司网站阳泉企业网站建设公司
  • 一个人只做网站的流程杭州注册公司流程是怎样的
  • 专业手机网站建设多少钱dede网站制作教程
  • 青岛市建设工程质量安全监督站官方网站wordpress置顶文章调用
  • 欧美网站模板 psd怎么找人做网站
  • 网站开发找哪家好免费网站做seo
  • 有哪些网站适合大学生做兼职网络系统管理比赛
  • 易讯企业建站系统我想自己在网站上发文章 怎样做
  • 外贸网站有哪些网上商城购物系统
  • 工商注册网站模板大连市建设工程信息网官网
  • 淘宝网站建设方案网站开发多语言
  • 怎么上网做网站亚马逊推广
  • 天津网站建设是什么给网站做数据分析
  • 贷款织梦网站模版网站怎么能快速备案
  • vultr怎么建设影视网站建立个人网站需要什么
  • 商城网站 前置审批安装wordpress linux
  • 网站的内链怎么做北京商场需要几天核酸
  • 展示网站欣赏WordPress mvc插件
  • 网站前后台模板wordpress 更改数据库
  • 网站做搜索引擎优化自己做的网站怎么传入外网
  • 江西省建设招标网站百度seo搜索排名
  • 内蒙古创意网站开发学生做微商怎么加入
  • 济南做门户网站开发公司做婚礼请柬的网站有哪些
  • 网站建设黄页软件广告logo图片大全
  • html怎么做音乐网站wordpress运营服务费用
  • 校庆网站建设策划书范文网站制作找
  • 高端网站策划公司齐鲁网
  • 成品免费ppt网站百度站长工具app
  • wordpress自动采集aote网站的优化策略