衡阳公司网站建设,com域名查询官网,wordpress改雅黑,提供网站建设服务平台欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《Kubernetes对象深入学习》系列的第四篇#xff0c;前面咱们读源码和文档#xff0c;从理论上学习了kubernetes的对象相关的知识#xff…欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)https://github.com/zq2599/blog_demos 本篇概览 本文是《Kubernetes对象深入学习》系列的第四篇前面咱们读源码和文档从理论上学习了kubernetes的对象相关的知识是时候自己动手来实战操作了 本篇的主要内容就是新建一个golang工程里面运行一个基于client-go的Controller监听两种资源类型的变化事件 面对两种不同类型的资源对象咱们开发一个通用的方法使用该方法可以获取各种类型的对象的属性以此来验证前文学习的知识点 整个项目的功能如下图所示其实挺简单的用kubectl对资源做修改修改labelapi-server会向所有监听者发送变更事件object-tutorials是个go开发的应用程序里面使用client-go库监听kubernetes上的pod和service所有变更事件在收到事件后object-tutorials中会对变更对象做一些读对象属性相关的操作
在k8s部署service和deployment
为了实战需要首先请在kubernetes环境将service和deployment部署好这里给出我的部署脚本作为参考所有要部署的内容我都集中在这个名为nginx-deployment-service.yaml脚本中了
---
apiVersion: apps/v1
kind: Deployment
metadata:namespace: client-go-tutorialsname: nginx-deploymentlabels:app: nginx-apptype: front-end
spec:replicas: 3selector:matchLabels:app: nginx-apptype: front-endtemplate:metadata:labels:app: nginx-apptype: front-end# 这是第一个业务自定义label指定了mysql的语言类型是c语言language: c# 这是第二个业务自定义label指定了这个pod属于哪一类服务nginx属于web类business-service-type: webspec:containers:- name: nginx-containerimage: nginx:latestresources:limits:cpu: 0.5memory: 128Mirequests:cpu: 0.1memory: 64Mi
---
apiVersion: v1
kind: Service
metadata:namespace: client-go-tutorialsname: nginx-service
spec:type: NodePortselector:app: nginx-apptype: front-endports:- port: 80targetPort: 80nodePort: 30011先执行以下命令创建namespace
kubectl create namespace client-go-tutorials再执行以下命令即可完成资源的创建
kubectl apply -f nginx-deployment-service.yaml来查看一下资源情况如下图service和pod都创建好了准备工作完成可以开始编码了
源码下载
如果您不想编写代码也可以从GitHub上直接下载地址和链接信息如下表所示(https://github.com/zq2599/blog_demos)
名称链接备注项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址https协议git仓库地址(ssh)gitgithub.com:zq2599/blog_demos.git该项目源码的仓库地址ssh协议
这个git项目中有多个文件夹本篇的源码在object-tutorials文件夹下如下图黄框所示
编码准备工程
执行命令名为go mod init object-tutorials新建module确保您的goproxy是正常的执行命令go get k8s.io/client-gov0.22.8下载client-go的指定版本现在工程已经准备好了接着就是具体的编码
编码梳理
咱们按照开发顺序开始写代码如果您看过欣宸的《client-go实战》系列此刻对使用client-go开发简易版controller应该很熟悉了这里在简单提一下controller的基本套路
在整个controller中核心是队列或者说队列是唯一的一条线其他知识点都是珍珠被队列穿起来当队列创建成功后咱们接下来要做的就是往队列中生产数据然后取出数据来消费还要多考虑一些例如多个协程并行消费以及消费过程中发生异常时的处理逻辑然后就是本篇的核心了无视资源类型的不同可以用同一段代买来处理各种资源对象的属性
这些编码要实现的功能如下图所示队列为线其他知识点为珍珠
编码数据结构
新建controller.go文件先定义数据结构
// 自定义controller数据结构嵌入了真实的控制器
type Controller struct {// 本地缓存关注的对象都会同步到这里indexer cache.Indexer// 消息队列用来触发对真实对象的处理事件queue workqueue.RateLimitingInterface// 实际运行运行的控制器informer cache.Controller
}编码创建队列生产数据
首先要创建队列然后对k8s的api-server建立listwatchlist取得全量数据watch监听数据变化的通知再让整个监听和响应的逻辑运行起来上述功能由以下两个方法组成CreateAndStartController负责创建实例并且队列的生产逻辑也在此方法中Run方法负责让队列的生产和消费运转起来 // Run 开始常规的控制器模式持续响应资源变化事件
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {defer runtime.HandleCrash()// Let the workers stop when we are donedefer c.queue.ShutDown()klog.Info(Starting Pod controller)go c.informer.Run(stopCh)// Wait for all involved caches to be synced, before processing items from the queue is started// 刚开始启动从api-server一次性全量同步所有数据if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf(timed out waiting for caches to sync))return}// 支持多个线程并行从队列中取得数据进行处理for i : 0; i threadiness; i {go wait.Until(c.runWorker, time.Second, stopCh)}-stopChklog.Info(Stopping Pod controller)
}// CreateAndStartController 为了便于外部使用这里将controller的创建和启动封装在一起
func CreateAndStartController(c cache.Getter, objType objectruntime.Object, resource string, namespace string, stopCh chan struct{}) {// ListWatcher用于获取数据并监听资源的事件podListWatcher : cache.NewListWatchFromClient(c, resource, NAMESPACE, fields.Everything())// 限速队列里面存的是有事件发生的对象的身份信息而非对象本身queue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 创建本地缓存并对指定类型的资源开始监听// 注意如果业务上有必要其实可以将新增、修改、删除等事件放入不同队列然后分别做针对性处理// 但是controller对应的模式主要是让status与spec达成一致也就是说增删改等事件对应的都是查到实际情况令其与期望情况保持一致// 因此多数情况下增删改用一个队列即可里面放入变化的对象的身份至于处理方式只有一种查到实际情况令其与期望情况保持一致indexer, informer : cache.NewIndexerInformer(podListWatcher, objType, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err : cache.MetaNamespaceKeyFunc(obj)if err nil {// 再次注意这里放入队列的并非对象而是对象的身份作用是仅仅告知消费方该对象有变化// 至于有什么变化需要消费方自行判断然后再做针对性处理queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err : cache.MetaNamespaceKeyFunc(new)if err nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err : cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},}, cache.Indexers{})controller : Controller{informer: informer,indexer: indexer,queue: queue,}go controller.Run(1, stopCh)
}
编码消费队列的数据
这里先写好业务代码就是知道某个对象发生变化时具体的业务逻辑是什么如下所示
// syncToStdout 这是业务逻辑代码被调用意味着key对应的对象有变化(新增或者修改)
func (c *Controller) syncToStdout(key string) error {// 从本地缓存中取出完整的对象obj, exists, err : c.indexer.GetByKey(key)if err ! nil {klog.Errorf(Fetching object with key %s from store failed with %v, key, err)return err}// 如果不存在就表示这是个删除事件if !exists {fmt.Printf(Pod %s does not exist anymore\n, key)} else {// 这里无视了obj具体是什么类型的对象(deployment、pod这些都有可能)// 用meta.Accessor转换出metav1.Object对象后就能获取该对象的所有meta信息objMeta, err : meta.Accessor(obj)if err ! nil {klog.Errorf(get meta accessor error, [%s], failed with %v, key, err)return err}// 取得资源的所有属性labels : objMeta.GetLabels()if labels nil {klog.Infof(name [%s], namespace [%s], label is empty, objMeta.GetName(), objMeta.GetNamespace())return nil}// 遍历每个属性打印出来for key, value : range labels {klog.Infof(name [%s], namespace [%s], key [%s], value [%s],objMeta.GetName(),objMeta.GetNamespace(),key,value)}}return nil
}接下里要写的是从消费队列中的数据即取数据然后调用上面的syncToStdout这里由两个方法组成runWorker负责建立一个无限循环不断调用processNextItem方法
// processNextItem 不间断从队列中取得数据并处理
func (c *Controller) processNextItem() bool {// 注意队列里面不是对象而是key这是个阻塞队列会一直等待key, quit : c.queue.Get()if quit {return false}// Tell the queue that we are done with processing this key. This unblocks the key for other workers// This allows safe parallel processing because two pods with the same key are never processed in// parallel.defer c.queue.Done(key)// 注意这里的syncToStdout应该是业务代码处理对象变化的事件err : c.syncToStdout(key.(string))// 如果前面的业务逻辑遇到了错误就在此处理c.handleErr(err, key)// 外面的调用逻辑是返回true就继续调用processNextItem方法return true
}// runWorker 这是个无限循环不断地从队列取出数据处理
func (c *Controller) runWorker() {for c.processNextItem() {}
}编码异常处理
还有个handleErr方法在业务消费队列数据失败时的处理逻辑可见这里的做法是将key重新放入队列让业务逻辑再消费一次这就是失败重试逻辑这样的重复次数被限定在5次以内超过了就不再放入队列中了
// handleErr 如果前面的业务逻辑执行出现错误就在此集中处理错误本例中主要是重试次数的控制
func (c *Controller) handleErr(err error, key interface{}) {if err nil {// Forget about the #AddRateLimited history of the key on every successful synchronization.// This ensures that future processing of updates for this key is not delayed because of// an outdated error history.c.queue.Forget(key)return}// 如果重试次数未超过5次就继续重试if c.queue.NumRequeues(key) 5 {klog.Infof(Error syncing pod %v: %v, key, err)// Re-enqueue the key rate limited. Based on the rate limiter on the// queue and the re-enqueue history, the key will be processed later again.c.queue.AddRateLimited(key)return}// 代码走到这里意味着有错误并且重试超过了5次应该立即丢弃c.queue.Forget(key)// 这种连续五次重试还未成功的错误交给全局处理逻辑runtime.HandleError(err)klog.Infof(Dropping pod %q out of the queue: %v, key, err)
}编码主程序
最后是main.go文件中的main方法作用是加载kubernetes的配置文件以及决定要监听哪些资源的变化这里通过调用两次CreateAndStartController方法对pod和service的变化建立了监听
package mainimport (flagpath/filepathv1 k8s.io/api/core/v1k8s.io/client-go/kubernetesk8s.io/client-go/tools/clientcmdk8s.io/client-go/util/homedirk8s.io/klog/v2
)const (NAMESPACE client-go-tutorials
)func main() {var kubeconfig *stringvar master string// 试图取到当前账号的家目录if home : homedir.HomeDir(); home ! {// 如果能取到就把家目录下的.kube/config作为默认配置文件kubeconfig flag.String(kubeconfig, filepath.Join(home, .kube, config), (optional) absolute path to the kubeconfig file)master } else {// 如果取不到就没有默认配置文件必须通过kubeconfig参数来指定flag.StringVar(kubeconfig, kubeconfig, , absolute path to the kubeconfig file)flag.StringVar(master, master, , master url)flag.Parse()}config, err : clientcmd.BuildConfigFromFlags(master, *kubeconfig)if err ! nil {klog.Fatal(err)}clientset, err : kubernetes.NewForConfig(config)if err ! nil {klog.Fatal(err)}stop : make(chan struct{})defer close(stop)CreateAndStartController(clientset.CoreV1().RESTClient(), v1.Pod{}, pods, NAMESPACE, stop)CreateAndStartController(clientset.CoreV1().RESTClient(), v1.Service{}, services, NAMESPACE, stop)select {}
}
运行程序验证效果
现在将程序运行起来用go build构建应用或者直接用IDE运行启动后可以看到输入如下可见程序符合预期将所有service和pod的label都在日志中打印出来了值得注意的是service现在还没有任何label这在日志中也有提示
Starting: /root/software/gopath/bin/dlv dap --listen127.0.0.1:33405 --log-dest3 from /root/github/blog_demos/tutorials/object-tutorials
DAP server listening at: 127.0.0.1:33405
Type dlv help for list of commands.
I0723 08:20:14.695662 2256666 controller.go:131] Starting Pod controller
I0723 08:20:14.696133 2256666 controller.go:131] Starting Pod controller
I0723 08:20:14.796419 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wpnt7], namespace [client-go-tutorials], key [business-service-type], value [web]
I0723 08:20:14.796620 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wpnt7], namespace [client-go-tutorials], key [language], value [c]
I0723 08:20:14.796669 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wpnt7], namespace [client-go-tutorials], key [pod-template-hash], value [78f6b696d9]
I0723 08:20:14.796704 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wpnt7], namespace [client-go-tutorials], key [type], value [front-end]
I0723 08:20:14.796737 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wpnt7], namespace [client-go-tutorials], key [app], value [nginx-app]
I0723 08:20:14.796792 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wp4qf], namespace [client-go-tutorials], key [app], value [nginx-app]
I0723 08:20:14.796831 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wp4qf], namespace [client-go-tutorials], key [business-service-type], value [web]
I0723 08:20:14.796865 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wp4qf], namespace [client-go-tutorials], key [language], value [c]
I0723 08:20:14.796901 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wp4qf], namespace [client-go-tutorials], key [pod-template-hash], value [78f6b696d9]
I0723 08:20:14.796960 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-wp4qf], namespace [client-go-tutorials], key [type], value [front-end]
I0723 08:20:14.797007 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-j98xj], namespace [client-go-tutorials], key [pod-template-hash], value [78f6b696d9]
I0723 08:20:14.797047 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-j98xj], namespace [client-go-tutorials], key [type], value [front-end]
I0723 08:20:14.797100 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-j98xj], namespace [client-go-tutorials], key [app], value [nginx-app]
I0723 08:20:14.797139 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-j98xj], namespace [client-go-tutorials], key [business-service-type], value [web]
I0723 08:20:14.797174 2256666 controller.go:88] name [nginx-deployment-78f6b696d9-j98xj], namespace [client-go-tutorials], key [language], value [c]
I0723 08:20:14.797346 2256666 controller.go:82] name [nginx-service], namespace [client-go-tutorials], label is empty现在修改资源对象试试首先修改service执行以下命令进入vi编辑模式
kubectl edit service nginx-service -n client-go-tutorial下图红框中是新增的内容 保存退出在程序的控制台可见以下日志输出证明service的变更事件都被会咱们的object-tutorials程序响应也能顺利取出service对象的属性并打印到日志中 接着修改一个pod的label新增内容如下图黄色箭头所示 保存后程序这边立即有日志输出会打印该pod的所有label 至此编码和验证都完成了结果符合预期meta.Accessor方法很好用拿到属性对象后所有资源的属性信息都能轻易获取
你不孤单欣宸原创一路相伴
Java系列Spring系列Docker系列kubernetes系列数据库中间件系列DevOps系列