当前位置: 首页 > news >正文

wordpress前台视频上传长沙seo

wordpress前台视频上传,长沙seo,互联网保险经纪公司排名,wordpress 登录后页面k8s informer的list-watch机制剖析 1、list-watch场景#xff1a; client-go中的reflector模块首先会list apiserver获取某个资源的全量信息#xff0c;然后根据list到的rv来watch资源的增量信息。希望使用client-go编写的控制器组件在与apiserver发生连接异常时#xff0c…k8s informer的list-watch机制剖析 1、list-watch场景 client-go中的reflector模块首先会list apiserver获取某个资源的全量信息然后根据list到的rv来watch资源的增量信息。希望使用client-go编写的控制器组件在与apiserver发生连接异常时尽量的re-watch资源而不是re-list 2、list-watch主要做的三件事 informer的list-watch逻辑主要做三个事情 1、List部分逻辑设置分页参数执行list方法将list结果同步进DeltaFIFO队列中其实是调用store中的Replace方法。 2、定时同步定时同步以协程的方式运行使用定时器实现定期同步Store中的Resync操作。 3、Watch部分逻辑在for循环里执行watch函数获取resultchan监听resultchan中数据并处理 过程细节剖析 1、第一次list资源会设置资源版本号为空旧版会设为0拉完后就更新资源版本后面watch的时候只要关心比这个资源版本大的资源。list的时候会把ListWatch对象包裹在pager对象里 这个对象的作用是控制分页查询比如资源对象太多时为了防止过大的网络IOpager可以通过控制url的limit和continue参数来指定一次请求获取的资源数量。 2、watch的时候会开启一个死循环ListerWatcher会返回要一个watch对象及其内部的一条channel没有数据时则一直阻塞监听channel只要有新资源变化就会停止阻塞然后就根据事件类型往DeltaFIFO里面更新数据最后会更新最新资源版本。 3、每次向apiserver发起watch请求如果大概8分钟内都没有任何事件则apiserver会主动断开连接断开连接则会关闭watch对象的channel Reflector监听channel结束然后会再次构建watch对象并发起watch请求。 4、ListAndWatch()会被Run()调用。Run()里面把ListAndWatch()包裹在了一个重试函数wait.Until()里面ListAndWatch()正常情况下是死循环一旦ListAndWatch()发送错误就会返回wait.Until()在指定时间后又会重新执行ListAndWatch() 。这一步也叫所谓的ReList。再一次list资源时会尝试传入一个上次list到或最新watch到的资源版本但并不保证可以成功list比如watch到的Pod的资源版本和PodList的资源版本没有任何关联Pod的更新不代表PodList的更新这里只是尝试一下而已如果list失败了就把url参数resourceVersion置为空这样就能拉最新的列表。 概括 通过list机制来获取全量资源然后使用那个resourceversion并通过watch模式来增量更新后续每次watch到新的变化后除了更新cache还会更新resourceversion并用新的resourceversion去watch 3、list-watch源码剖析 // ListAndWatch 函数首先列出所有的对象并在调用的时候获得资源版本然后使用该资源版本来进行 watch 操作。 // 如果 ListAndWatch 没有初始化 watch 成功就会返回错误。 func (r *Reflector) ListAndWatch(stopCh -chan struct{}) error {klog.V(3).Infof(Listing and watching %v from %s, r.expectedTypeName, r.name)var resourceVersion stringoptions : metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}// 1.List部分逻辑设置分页参数执行list方法将list结果同步进DeltaFIFO队列中if err : func() error {initTrace : trace.New(Reflector ListAndWatch, trace.Field{name, r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh : make(chan struct{}, 1)panicCh : make(chan interface{}, 1)go func() {defer func() {if r : recover(); r ! nil {panicCh - r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.// 如果listerWatcher支持则尝试以块的形式收集列表如果不支持则收集第一个列表请求将返回完整响应pager : pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize ! 0:pager.PageSize r.WatchListPageSizecase r.paginatedResult:// We got a paginated result initially. Assume this resource and server honor// paging requests (i.e. watch cache is probably disabled) and leave the default// pager size set.case options.ResourceVersion ! options.ResourceVersion ! 0:// User didnt explicitly request pagination.//// With ResourceVersion ! , we have a possibility to list from watch cache,// but we do that (for ResourceVersion ! 0) only if Limit is unset.// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly// switch off pagination to force listing from watch cache (if enabled).// With the existing semantic of RV (result is at least as fresh as provided RV),// this is correct and doesnt lead to going back in time.//// We also dont turn off pagination for ResourceVersion0, since watch cache// is ignoring Limit in that case anyway, and if watch cache is not enabled// we dont introduce regression.pager.PageSize 0}// 如果过期或者不合法 resourceversion 则进行重试list, paginatedResult, err pager.List(context.Background(), options)if isExpiredError(err) || isTooLargeResourceVersionError(err) {r.setIsLastSyncResourceVersionUnavailable(true)// Retry immediately if the resource version used to list is unavailable.// The pager already falls back to full list if paginated list calls fail due to an Expired error on// continuation pages, but the pager might not be enabled, the full list might fail because the// resource version it is listing at is expired or the cache may not yet be synced to the provided// resource version. So we need to fallback to resourceVersion in all to recover and ensure// the reflector makes forward progress.list, paginatedResult, err pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}()select {case -stopCh:return nilcase r : -panicCh:panic(r)case -listCh:}if err ! nil {return fmt.Errorf(failed to list %v: %v, r.expectedTypeName, err)}// We check if the list was paginated and if so set the paginatedResult based on that.// However, we want to do that only for the initial list (which is the only case// when we set ResourceVersion0). The reasoning behind it is that later, in some// situations we may force listing directly from etcd (by setting ResourceVersion)// which will return paginated result, even if watch cache is enabled. However, in// that case, we still want to prefer sending requests to watch cache if possible.//// Paginated result returned for request with ResourceVersion0 mean that watch// cache is disabled and there are a lot of objects of a given type. In such case,// there is no need to prefer listing from watch cache.if options.ResourceVersion 0 paginatedResult {r.paginatedResult true}r.setIsLastSyncResourceVersionUnavailable(false) // list was successfulinitTrace.Step(Objects listed)// listMetaInterface, err : meta.ListAccessor(list)if err ! nil {return fmt.Errorf(unable to understand list result %#v: %v, list, err)}// 获取资源版本号resourceVersion listMetaInterface.GetResourceVersion()initTrace.Step(Resource version extracted)// 将资源对象转换为资源列表讲runtime.Object 对象转换为[]runtime.Object对象items, err : meta.ExtractList(list)if err ! nil {return fmt.Errorf(unable to understand list result %#v (%v), list, err)}initTrace.Step(Objects extracted)// 将资源对象列表中的资源和版本号存储在store中if err : r.syncWith(items, resourceVersion); err ! nil {return fmt.Errorf(unable to sync list result: %v, err)}initTrace.Step(SyncWith done)// 更新resourceVersion r.setLastSyncResourceVersion(resourceVersion)initTrace.Step(Resource version updated)return nil}(); err ! nil {return err}// 2.定时同步定时同步以协程的方式运行使用定时器实现定期同步resyncerrc : make(chan error, 1)cancelCh : make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup : r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case -resyncCh:case -stopCh:returncase -cancelCh:return}// 如果ShouldResync 为nil或者调用返回true则执行Store中的Resync操作if r.ShouldResync nil || r.ShouldResync() {klog.V(4).Infof(%s: forcing resync, r.name)// 将indexer的数据和deltafifo进行同步if err : r.store.Resync(); err ! nil {resyncerrc - errreturn}}cleanup()resyncCh, cleanup r.resyncChan()}}()// 3.在for循环里执行watch函数获取resultchan监听resultchan中数据并处理for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case -stopCh:return nildefault:}timeoutSeconds : int64(minWatchTimeout.Seconds() * (rand.Float64() 1.0))options metav1.ListOptions{ResourceVersion: resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window.TimeoutSeconds: timeoutSeconds,// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.// Reflector doesnt assume bookmarks are returned at all (if the server do not support// watch bookmarks, it will ignore this field).AllowWatchBookmarks: true,}// start the clock before sending the request, since some proxies wont flush headers until after the first watch event is sentstart : r.clock.Now()w, err : r.listerWatcher.Watch(options)if err ! nil {// If this is connection refused error, it means that most likely apiserver is not responsive.// It doesnt make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If thats the case begin exponentially backing off and resend watch request.// 如果这是“连接被拒绝”错误则意味着 apiserver 很可能没有响应。// 重新列出所有对象是没有意义的因为我们很可能能够重新启动// 看我们结束的地方。// 如果是这种情况开始指数级后退并重新发送监视请求if utilnet.IsConnectionRefused(err) {-r.initConnBackoffManager.Backoff().C()continue}return err}if err : r.watchHandler(start, w, resourceVersion, resyncerrc, stopCh); err ! nil {if err ! errorStopRequested {switch {case isExpiredError(err):// Dont set LastSyncResourceVersionUnavailable - LIST call with ResourceVersionRV already// has a semantic that it returns data at least as fresh as provided RV.// So first try to LIST with setting RV to resource version of last observed object.klog.V(4).Infof(%s: watch of %v closed with: %v, r.name, r.expectedTypeName, err)default:klog.Warningf(%s: watch of %v ended with: %v, r.name, r.expectedTypeName, err)}}return nil}} }4.4 LastSyncResourceVersion获取上一次同步的资源版本func (r *Reflector) LastSyncResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()return r.lastSyncResourceVersion }4.5 resyncChan返回一个定时通道和清理函数清理函数就是停止计时器。这边的定时重新同步是使用定时器实现的。func (r *Reflector) resyncChan() (-chan time.Time, func() bool) {if r.resyncPeriod 0 {return neverExitWatch, func() bool { return false }}// The cleanup function is required: imagine the scenario where watches// always fail so we end up listing frequently. Then, if we dont// manually stop the timer, we could end up with many timers active// concurrently.t : r.clock.NewTimer(r.resyncPeriod)return t.C(), t.Stop } 4.6 syncWith将从apiserver list的资源对象结果同步进DeltaFIFO队列中调用队列的Replace方法实现。func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {found : make([]interface{}, 0, len(items))for _, item : range items {found append(found, item)}return r.store.Replace(found, resourceVersion) }4.7 watchHandlerwatch的处理接收watch的接口作为参数watch接口对外方法是Stop和Resultchan,前者关闭结果通道后者获取通道。func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh -chan struct{}) error {eventCount : 0// Stopping the watcher should be idempotent and if we return from this function theres no way// were coming back in with the same watch interface.defer w.Stop()loop:for {select {case -stopCh:return errorStopRequestedcase err : -errc:return errcase event, ok : -w.ResultChan():if !ok {break loop}if event.Type watch.Error {return apierrors.FromObject(event.Object)}if r.expectedType ! nil {if e, a : r.expectedType, reflect.TypeOf(event.Object); e ! a {utilruntime.HandleError(fmt.Errorf(%s: expected type %v, but watch event object had type %v, r.name, e, a))continue}}// 判断期待的类型和监听到的事件类型是否一致if r.expectedGVK ! nil {if e, a : *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e ! a {utilruntime.HandleError(fmt.Errorf(%s: expected gvk %v, but watch event object had gvk %v, r.name, e, a))continue}}// 获取事件对象meta, err : meta.Accessor(event.Object)if err ! nil {utilruntime.HandleError(fmt.Errorf(%s: unable to understand watch event %#v, r.name, event))continue}newResourceVersion : meta.GetResourceVersion()// 对事件类型进行判断并进行对应操作switch event.Type {case watch.Added:err : r.store.Add(event.Object)if err ! nil {utilruntime.HandleError(fmt.Errorf(%s: unable to add watch event object (%#v) to store: %v, r.name, event.Object, err))}case watch.Modified:err : r.store.Update(event.Object)if err ! nil {utilruntime.HandleError(fmt.Errorf(%s: unable to update watch event object (%#v) to store: %v, r.name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the last known// state, which is passed in event.Object? If so, may need// to change this.err : r.store.Delete(event.Object)if err ! nil {utilruntime.HandleError(fmt.Errorf(%s: unable to delete watch event object (%#v) from store: %v, r.name, event.Object, err))}case watch.Bookmark:// 表示监听已在此处同步只需更新// A Bookmark means watch has synced here, just update the resourceVersiondefault:utilruntime.HandleError(fmt.Errorf(%s: unable to understand watch event %#v, r.name, event))}*resourceVersion newResourceVersion// 更新 resource version 版本, 下次使用该 resourceVersion 来 watch 监听.r.setLastSyncResourceVersion(newResourceVersion)if rvu, ok : r.store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(newResourceVersion)}eventCount}}watchDuration : r.clock.Since(start)// 如果 watch 退出小于 一秒, 另外一条事件也没拿到, 则打条错误日志if watchDuration 1*time.Second eventCount 0 {return fmt.Errorf(very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received, r.name)}klog.V(4).Infof(%s: Watch close - %v total %v items received, r.name, r.expectedTypeName, eventCount)return nil }4.8 relistResourceVersionrelistResourceVersion 函数获得反射器 relist 的资源版本如果资源版本非 0 则表示根据资源版本号继续获取当传输过程中遇到网络故障或者其他原因导致中断下次再连接时会根据资源版本号继续传输未完成的部分。 可以使本地缓存中的数据与Etcd集群中的数据保持一致该函数实现如下所示// 如果最后一次relist的结果是HTTP 410Gone状态码则返回这样relist将通过quorum读取etcd中可用的最新资源版本。 // 返回使用 lastSyncResourceVersion这样反射器就不会使用在relist结果或watch事件中watch到的资源版本更老的资源版本进行relist了 // 当 r.lastSyncResourceVersion 为 时这里为 0当使用 r.lastSyncResourceVersion 失败时这里为 // 区别是 会直接请求到 etcd获取一个最新的版本而 0 访问的是 cache // 第一次使用0出错了使用否则用lastSyncResourceVersion // 注意第一次不会直接全量list etcd是全量list apiserver func (r *Reflector) relistResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()if r.isLastSyncResourceVersionUnavailable {// 因为反射器会进行分页List请求如果 lastSyncResourceVersion 过期了所有的分页列表请求就都会跳过 watch 缓存// 所以设置 ResourceVersion然后再次 List重新建立反射器到最新的可用资源版本从 etcd 中读取保持一致性。return }if r.lastSyncResourceVersion {// 反射器执行的初始 List 操作的时候使用0作为资源版本。return 0}return r.lastSyncResourceVersion }4.9 setLastSyncResourceVersion用于存储已被Reflector处理的最新资源对象的ResourceVersionr.setLastSyncResourceVersion方法用于更新该值。 lastSyncResourceVersion属性为Reflector struct的一个属性 func (r *Reflector) setLastSyncResourceVersion(v string) {r.lastSyncResourceVersionMutex.Lock()defer r.lastSyncResourceVersionMutex.Unlock()r.lastSyncResourceVersion v }// setIsLastSyncResourceVersionUnavailable 设置是否返回具有lastSyncResourceVersion 的最后一个列表或监视请求“过期”或“资源版本太大”错误。 func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {r.lastSyncResourceVersionMutex.Lock()defer r.lastSyncResourceVersionMutex.Unlock()r.isLastSyncResourceVersionUnavailable isUnavailable }
http://www.hkea.cn/news/14423958/

相关文章:

  • wordpress中英文插件seo营销专员
  • c2c网站 多钱东莞做网站需要避免这些因素
  • iis网站批量导入莞城发布最新通告
  • 成都科技网站建设费淘宝天猫优惠券网站建设费用
  • 北京上云科技网站建设视觉设计网
  • 怎么样做网站代cms 导航网站
  • 网站排名易下拉系统12306网站建设投标书
  • 曹县网站开发微商城是怎么做的
  • 青岛微网站开发网站开发实例百度云
  • 营销网站建设苏州动漫网站html
  • 网站设计和管理容易吗最早做网页的公司
  • 静态网站开发预期效果wordpress wp_insert_attachment
  • 视频网站怎么做服务器文章内容网站系统
  • 网站制作西安网站设计师是什么部门
  • 网上有什么做兼职的网站中国中小企业信息网
  • 禹城市网站建设平谷区网站建设
  • 免费招聘网站哪个好东莞沙田最新消息
  • 架设仿冒网站挂马深圳 高端网站建设宝安
  • 网站建设合同内容网站开发续签
  • 百度创建网站吗网站建设方案书制作流程
  • 怎么在自己电脑上建网站深圳市官网网站建设
  • 网站静态页面生成网站友情链接怎么弄
  • 二级网站开发 一级关系福州网签查询系统
  • 网站开发工具概述与比较网站开发人员需求
  • 什么叫宣传类网站重庆市建设工程信息网官网造价
  • 悠悠我心的个人网站素材建设网站的步骤知乎
  • 中航建设集团有限公司网站wordpress 获取分类列表
  • 网站商城app 建设方案建设项目工程信息
  • 沈阳seo自然优化排名外贸网站优化怎么做
  • 南宁市做网站的公司斐讯k2做网站