内丘企业做网站,站建设培训学校,做网站免费空间,html5个人网站模板前言
在学习 KubeVela 的核心控制器之前#xff0c;我们先简单了解一下 KubeVela 的相关知识。
KubeVela 本身是一个应用交付与管理控制平面#xff0c;它架在 Kubernetes 集群、云平台等基础设施之上#xff0c;通过开放应用模型来对组件、云服务、运维能力、交付工作流进…前言
在学习 KubeVela 的核心控制器之前我们先简单了解一下 KubeVela 的相关知识。
KubeVela 本身是一个应用交付与管理控制平面它架在 Kubernetes 集群、云平台等基础设施之上通过开放应用模型来对组件、云服务、运维能力、交付工作流进行统一的编排和交付。
具体来说KubeVela 本身主要由如下几个部分组成:
核心控制器 为整个系统提供核心控制逻辑完成诸如编排应用和工作流、修订版本快照、垃圾回收等等基础逻辑。
模块化能力控制器 负责对 X-Definitions 对象进行注册和管理。
Cluster Gateway 控制器 为操作多集群提供了统一的访问接口。
插件体系 负责注册和管理 KubeVela 的扩展功能包括 CRD 控制器和相关模块定义。比如 VelaUX、FluxCD、Workflow 等插件。
UI 控制台 和 CLI 分别为用户提供了图形化界面和命令行界面操作 API。
本文主要介绍 KubeVela 的核心控制逻辑如未做特殊说明以下内容均基于 KubeVela 社区版本 v1.8.2 阐述。
OAM 应用模型
在介绍 KubeVela 的核心控制逻辑之前我们先了解一下核心控制器作用对象OAM 应用模型如图 1开放应用模型允许用户把一个现代微服务应用部署所需的所有组件和各项运维动作描述为一个统一的、与基础设施无关的部署计划进而实现在混合环境中进行标准化和高效率的应用交付。 图1
具体来说 • 通过一个叫做应用部署计划Application的对象来声明一个微服务应用的完整交付流程这其中包含了待交付组件、关联的运维动作、交付流水线等内容。 • 所有的待交付组件、运维动作和流水线中的每一个步骤都遵循 OAM 规范设计为独立的可插拔模块允许用户按照自己的需求进行组合或者定制。 • OAM 模型也会负责规范各个模块之间的协作接口。
应用部署计划
Application 对象是 KubeVela 核心控制器作用的最小单元也是用户唯一需要了解的 API它表达了一个微服务应用的部署计划。遵循 OAM 规范一个应用部署计划Application由待部署组件Component、运维动作Trait、应用的执行策略Policy以及部署工作流Workflow这四部分概念组成。
组件
组件Component是构成微服务应用的基本单元比如一个 Bookinfo 应用可以包含 Ratings、Reviews、Details 等多个组件。
运维特征
运维特征Trait负责定义组件可以关联的通用运维行为比如服务发布、访问、治理、弹性、可观测性、灰度发布等。在 OAM 规范中一个组件可以绑定任意个运维特征。
应用的执行策略
应用的执行策略Policy负责定义应用级别的部署特征比如健康检查规则、安全组、防火墙、SLO、检验等模块。
部署执行工作流
部署执行工作流Workflow定义了从部署开始到达到部署终态的一条完整路径KubeVela 会按这个流水线执行工作流中定义的各个步骤来完成整个应用交付。
KubeVela 不仅内置了多种类型组件、运维特征、策略以及工作流 Step还支持用户自定义组件、运维特征、策略及工作流 Step帮助用户在混合环境中进行标准化和高效率的应用交付。在实际使用时用户通过上述 Application 对象来引用预置的组件、运维特征、应用策略、以及工作流节点模块填写这些模块暴露的用户参数即可完成一次对应用交付的建模。
核心控制器工作原理
KubeVela vela-core 的代码结构相对来说比较简单因为它是基于 controller-runtime 框架开发实现的控制器管理器。其中 componentdefinition、traitdefinition、policydefinition、workflowstepdefinition 控制器分别用于调谐管理组件、运维特征、策略、工作流步骤类型及其版本状态信息而 application 控制器则用于调谐应用部署计划。
application_controller 是 KubeVela 的核心控制器它会解析应用部署计划生成当前应用的修订版本并与最近一次修订版本进行比较根据是否有差异来判断修订版本状态是否需要更新然后解析工作流 Steps 中引用的外部策略生成 manifest 并执行 apply再然后根据应用工作流 Steps 生成工作流 Instance 和待执行的任务 Runners最后创建工作流 Executor 并执行任务 Runners。 KubeVela 核心控制器的工作原理跟 K8s Controller 是一样的都是通过 API Server 提供的 List Watch 接口来实时监控集群中 Application 资源对象的状态变化当资源对象的状态变化时控制器会尝试将其状态调谐为期望的状态。接下来笔者将对 application_controller 的调谐逻辑进行详细说明主要包括解析应用部署计划、创建应用修订版本、应用外部策略、解析执行工作流任务等四个过程。
1、解析应用部署计划
解析应用部署计划的过程实际上就是解析应用部署计划中的组件、运维特征、策略和工作流然后生成应用描述文件 AppFile在生成 AppFile 之前会先检查最新的应用修订版与应用是否具有相同的 publishVersion如果 publishVersion 相同则直接根据最新的应用修订版生成 AppFile否则根据应用应用部署计划生成 AppFile。AppFile 作为后续调谐逻辑的基准数据模型不仅包含了应用部署计划中的信息还记录了调谐过程中所必须的 PolicyWorkload 等中间产物。
func (p *Parser) GenerateAppFile(ctx context.Context, app *v1beta1.Application) (*Appfile, error) {if ctx, ok : ctx.(monitorContext.Context); ok {subCtx : ctx.Fork(generate-app-file, monitorContext.DurationMetric(func(v float64) {metrics.AppReconcileStageDurationHistogram.WithLabelValues(generate-appfile).Observe(v)}))defer subCtx.Commit(finish generate appFile)}if isLatest, appRev, err : p.isLatestPublishVersion(ctx, app); err ! nil {return nil, err} else if isLatest {app.Spec appRev.Spec.Application.Specreturn p.GenerateAppFileFromRevision(appRev)}return p.GenerateAppFileFromApp(ctx, app)
}
2、创建应用修订版本
创建应用修订版本的过程实际上就是保存应用版本如果是更新应用的操作会获取应用最近一次的版本信息并与应用当前版本比较确认是否需要更新应用状态中的最近一次版本信息。
if err : handler.PrepareCurrentAppRevision(logCtx, appFile); err ! nil {logCtx.Error(err, Failed to prepare app revision)r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRevision, err))return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(Revision, err), common.ApplicationRendering)}if err : handler.FinalizeAndApplyAppRevision(logCtx); err ! nil {logCtx.Error(err, Failed to apply app revision)r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedRevision, err))return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition(Revision, err), common.ApplicationRendering)}logCtx.Info(Successfully prepare current app revision, revisionName, handler.currentAppRev.Name,revisionHash, handler.currentRevHash, isNewRevision, handler.isNewRevision)app.Status.SetConditions(condition.ReadyCondition(Revision))r.Recorder.Event(app, event.Normal(velatypes.ReasonRevisoned, velatypes.MessageRevisioned))if err : handler.UpdateAppLatestRevisionStatus(logCtx); err ! nil {logCtx.Error(err, Failed to update application status)return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationRendering)}logCtx.Info(Successfully apply application revision)
3、应用外部策略
外部策略指的是未在内部策略中声明的且在工作流步骤中使用的策略这些策略是在应用的命名空间下声明的策略。AppFile 中的策略负载 PolicyWorkload 是通过加载工作流的外部策略得到的应用外部策略会将 AppFile 中的 PolicyWorkloads 渲染成 manifest 后分发到集群。
func LoadExternalPoliciesForWorkflow(ctx context.Context, cli client.Client, appNs string, steps []workflowv1alpha1.WorkflowStep, internalPolicies []v1beta1.AppPolicy) ([]v1beta1.AppPolicy, error) {policies : internalPoliciespolicyMap : map[string]struct{}{}for _, policy : range policies {policyMap[policy.Name] struct{}{}}// Load extra used policies declared in the workflow stepfor _, _step : range steps {if _step.Type DeployWorkflowStep _step.Properties ! nil {props : DeployWorkflowStepSpec{}if err : utils.StrictUnmarshal(_step.Properties.Raw, props); err ! nil {return nil, errors.Wrapf(err, invalid WorkflowStep %s, _step.Name)}for _, policyName : range props.Policies {if _, found : policyMap[policyName]; !found {po : v1alpha1.Policy{}if err : cli.Get(ctx, types2.NamespacedName{Namespace: appNs, Name: policyName}, po); err ! nil {if kerrors.IsNotFound(err) {return nil, errors.Errorf(external policy %s not found, policyName)}return nil, errors.Wrapf(err, failed to load external policy %s in namespace %s, policyName, appNs)}policies append(policies, v1beta1.AppPolicy{Name: policyName, Type: po.Type, Properties: po.Properties})policyMap[policyName] struct{}{}}}}}return policies, nil
}func (h *AppHandler) ApplyPolicies(ctx context.Context, af *appfile.Appfile) error {if ctx, ok : ctx.(monitorContext.Context); ok {subCtx : ctx.Fork(apply-policies, monitorContext.DurationMetric(func(v float64) {metrics.AppReconcileStageDurationHistogram.WithLabelValues(apply-policies).Observe(v)}))defer subCtx.Commit(finish apply policies)}policyManifests, err : af.GeneratePolicyManifests(ctx)if err ! nil {return errors.Wrapf(err, failed to render policy manifests)}if len(policyManifests) 0 {for _, policyManifest : range policyManifests {util.AddLabels(policyManifest, map[string]string{oam.LabelAppName: h.app.GetName(),oam.LabelAppNamespace: h.app.GetNamespace(),})}if err h.Dispatch(ctx, , common.PolicyResourceCreator, policyManifests...); err ! nil {return errors.Wrapf(err, failed to dispatch policy manifests)}}return nil
}
4、解析执行工作流任务
核心控制器中最重要的也是最难理解的部分应该就属工作流任务的解析执行了因为该部分使用了大量的函数式编程和异步编程对于一个 go 语言的初学者来说捋清楚这段代码并不是一件易事。接下来笔者将从解析和执行两个方面来介绍工作流任务。
解析工作流任务的过程主要包含三个步骤第一步是注册 handlers 到 providers 中每个 provider 中都包含了多个 handlers 处理程序这些 handlers 主要供后续执行工作流任务使用不管是内置的还是自定义工作流步骤在声明 Step 的过程中可以调用特定 provider 中的 handler 来完成工作任务的执行。第二步是初始化工作流实例执行工作流任务的阶段也会根据这个工作流实例创建工作流的执行器用于执行工作流中任务 Runners。第三步是根据工作流实例和已注册的 handlers providers 去遍历解析工作流步骤 Step 生成工作流任务运行程序 Runners。
func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,app *v1beta1.Application,appParser *appfile.Parser,af *appfile.Appfile) (*wfTypes.WorkflowInstance, []wfTypes.TaskRunner, error) {appRev : h.currentAppRevt : time.Now()defer func() {metrics.AppReconcileStageDurationHistogram.WithLabelValues(generate-app-steps).Observe(time.Since(t).Seconds())}()appLabels : map[string]string{oam.LabelAppName: app.Name,oam.LabelAppNamespace: app.Namespace,}handlerProviders : providers.NewProviders()kube.Install(handlerProviders, h.r.Client, appLabels, kube.Handlers{Apply: h.Dispatch,Delete: h.Delete,})configprovider.Install(handlerProviders, h.r.Client, func(ctx context.Context, resources []*unstructured.Unstructured, applyOptions []apply.ApplyOption) error {for _, res : range resources {res.SetLabels(util.MergeMapOverrideWithDst(res.GetLabels(), appLabels))}return h.resourceKeeper.Dispatch(ctx, resources, applyOptions)})oamProvider.Install(handlerProviders, app, af, h.r.Client, h.applyComponentFunc(appParser, appRev, af), h.renderComponentFunc(appParser, appRev, af))pCtx : velaprocess.NewContext(generateContextDataFromApp(app, appRev.Name))renderer : func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Workload, error) {return appParser.ParseWorkloadFromRevisionAndClient(ctx, comp, appRev)}multiclusterProvider.Install(handlerProviders, h.r.Client, app, af,h.applyComponentFunc(appParser, appRev, af),h.checkComponentHealth(appParser, appRev, af),renderer)terraformProvider.Install(handlerProviders, app, renderer)query.Install(handlerProviders, h.r.Client, nil)instance : generateWorkflowInstance(af, app)executor.InitializeWorkflowInstance(instance)runners, err : generator.GenerateRunners(ctx, instance, wfTypes.StepGeneratorOptions{Providers: handlerProviders,PackageDiscover: h.r.pd,ProcessCtx: pCtx,TemplateLoader: template.NewWorkflowStepTemplateRevisionLoader(appRev, h.r.dm),Client: h.r.Client,StepConvertor: map[string]func(step workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error){wfTypes.WorkflowStepTypeApplyComponent: func(lstep workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error) {copierStep : lstep.DeepCopy()if err : convertStepProperties(copierStep, app); err ! nil {return lstep, errors.WithMessage(err, convert [apply-component])}copierStep.Type wfTypes.WorkflowStepTypeBuiltinApplyComponentreturn *copierStep, nil},},})if err ! nil {return nil, nil, err}return instance, runners, nil
}
生成任务运行程序的过程是根据应用部署计划中配置的工作流遍历工作流步骤 Step 参数及类型加载 Step 类型模板然后根据 Step 参数和模板编译生成 task。taskRunner 包含三个参数第一个参数是工作流步骤名称 wfstepName用于标识任务名称第二个参数是 checkPending 函数用于检查是否挂起任务运行第三个参数是 run 函数也就是任务的实际运行程序加载 Step 模板并接收 Step 配置参数完成任务编译的过程就是在这个 run 函数中完成的。
func GenerateRunners(ctx monitorContext.Context, instance *types.WorkflowInstance, options types.StepGeneratorOptions) ([]types.TaskRunner, error) {ctx.V(options.LogLevel)subCtx : ctx.Fork(generate-task-runners, monitorContext.DurationMetric(func(v float64) {metrics.GenerateTaskRunnersDurationHistogram.WithLabelValues(workflowrun).Observe(v)}))defer subCtx.Commit(finish generate task runners)options initStepGeneratorOptions(ctx, instance, options)taskDiscover : tasks.NewTaskDiscover(ctx, options)var tasks []types.TaskRunnerfor _, step : range instance.Steps {opt : types.TaskGeneratorOptions{ID: generateStepID(instance.Status, step.Name),PackageDiscover: options.PackageDiscover,ProcessContext: options.ProcessCtx,}for typ, convertor : range options.StepConvertor {if step.Type typ {opt.StepConvertor convertor}}task, err : generateTaskRunner(ctx, instance, step, taskDiscover, opt, options)if err ! nil {return nil, err}tasks append(tasks, task)}return tasks, nil
}
执行工作流任务的过程也包含三个步骤第一步是根据工作流实例创建工作流执行器。第二步是调用执行器的 ExecuteRunners 方法按顺序执行工作流任务运行程序。第三步则是根据工作流任务运行程序的执行结果即工作流执行状态和工作流实例状态的 EndTime 来调谐应用状态或 gc ResourceTrackers其中 ResourceTrackers 主要是用来跟踪和维护应用管理的资源会在转发应用管理的资源清单之前在 HubCluster 中进行创建可以确保在删除应用程序时能真正删除所有托管的资源。
func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {InitializeWorkflowInstance(w.instance)status : w.instance.StatusdagMode : status.Mode.Steps v1alpha1.WorkflowModeDAGcacheKey : fmt.Sprintf(%s-%s, w.instance.Name, w.instance.Namespace)allRunnersDone, allRunnersSucceeded : checkRunners(taskRunners, w.instance.Status)if status.Finished {StepStatusCache.Delete(cacheKey)}if checkWorkflowTerminated(status, allRunnersDone) {if isTerminatedManually(status) {return v1alpha1.WorkflowStateTerminated, nil}return v1alpha1.WorkflowStateFailed, nil}if checkWorkflowSuspended(status) {return v1alpha1.WorkflowStateSuspending, nil}if allRunnersSucceeded {return v1alpha1.WorkflowStateSucceeded, nil}wfCtx, err : w.makeContext(ctx, w.instance.Name)if err ! nil {ctx.Error(err, make context)return v1alpha1.WorkflowStateExecuting, err}w.wfCtx wfCtxif cacheValue, ok : StepStatusCache.Load(cacheKey); ok {// handle cache resourceif len(status.Steps) cacheValue.(int) {return v1alpha1.WorkflowStateSkipped, nil}}e : newEngine(ctx, wfCtx, w, status, taskRunners)err e.Run(ctx, taskRunners, dagMode)if err ! nil {ctx.Error(err, run steps)StepStatusCache.Store(cacheKey, len(status.Steps))return v1alpha1.WorkflowStateExecuting, err}StepStatusCache.Store(cacheKey, len(status.Steps))if feature.DefaultMutableFeatureGate.Enabled(features.EnablePatchStatusAtOnce) {return e.status.Phase, nil}return e.checkWorkflowPhase(), nil
}
执行工作流任务首先会创建工作流的执行引擎然后调用引擎的 Run 方法顺序执行或并行执行 taskRunner默认 steps 以 StepByStep 顺序执行subSteps 以 DAG 并行执行。顺序执行会遍历 taskRunners并依次调用 taskRunner 的 run 方法run 方法的内容就是上文提到的生成 taskRunner 时的 run 函数根据 Step 参数配置和加载的 Step 模板完成工作流步骤任务编译后会执行 Step CUE Template 中调用的 provider handlers即上文提到的在解析工作流任务阶段注册的各类型 providers handlers从而完成 taskRunner 执行。应用部署计划中用到最多的工作流步骤类型是 deploydeploy 是一个的功能强大的组件部署步骤使用策略进行多集群交付。另外使用最多的应用策略是 topologytopology 描述了组件应该部署到的集群和命名空间。
deploy.cue
import (vela/op
)deploy: {type: workflow-stepannotations: {category: Application Delivery}labels: {scope: Application}description: A powerful and unified deploy step for components multi-cluster delivery with policies.
}
template: {deploy: op.#Deploy {policies: parameter.policiesparallelism: parameter.parallelismignoreTerraformComponent: parameter.ignoreTerraformComponent}parameter: {//usageIf set to false, the workflow will suspend automatically before this step, default to be true.auto: *true | bool//usageDeclare the policies that used for this deployment. If not specified, the components will be deployed to the hub cluster.policies: *[] | [...string]//usageMaximum number of concurrent delivered components.parallelism: *5 | int//usageIf set false, this step will apply the components with the terraform workload.ignoreTerraformComponent: *true | bool}
}
总结
本文主要介绍了 KubeVela 核心控制器的工作原理包括核心控制逻辑中解析应用部署计划、创建应用修订版本、应用外部策略、解析执行工作流任务等四个部分本篇作为综述帮助大家初步了解 KubeVela 核心控制器的技术要点和运行机制后续我们将分别从上述四个部分进行详细解读。
参考文献
应用管理平台 kubevelahttps://qiankunli.github.io/2022/10/23/kubevela.html
kubevela 源码分析https://qiankunli.github.io/2022/11/06/kubevela_source.html
KubeVela 源码仓库https://github.com/kubevela/kubevela