做网站字体要求,软件开发八个阶段,seo技术培训岳阳,群晖安装wordpress域名本文首发在个人博客上#xff0c;欢迎来踩#xff01; 本次分析参考的K8s版本是 文章目录 调度队列简介调度队列源代码分析队列初始化QueuedPodInfo元素介绍ActiveQ源代码介绍UnschedulableQ源代码介绍**BackoffQ**源代码介绍队列弹出待调度的Pod队列增加新的待调度的Podpod调…本文首发在个人博客上欢迎来踩 本次分析参考的K8s版本是 文章目录 调度队列简介调度队列源代码分析队列初始化QueuedPodInfo元素介绍ActiveQ源代码介绍UnschedulableQ源代码介绍**BackoffQ**源代码介绍队列弹出待调度的Pod队列增加新的待调度的Podpod调度失败返回队列的处理flushBackoffQCompletedflushUnschedulablePodsLeftover 调度队列总结 调度队列简介
这里是官方对于K8s中调度队列的介绍很值得一看Scheduling queue in kube-scheduler。整体的架构如下图所示。 简单来说K8s中的调度队列主要有3种
ActiveQheap结构在每个调度周期开始时都会从这里取出一个Pod尝试调度。一开始提交的所有没有指定.spec.nodeName的Pod都会发送到这里也会接收来自unschedulableQ和BackoffQ刷新来的pod。默认的排序规则是按照优先级进行排列高优先级的Pod在前面。UnschedulableQMap结构存储调度失败的Pod以等待资源更新、其他相关Pod调度成功等事件从而将其的Pod其进行重调度。BackoffQheap结构用来暂时退避的队列默认的排列规则是按退避时间的长度进行排序需要退避的时间短的Pod在前面。为了防止Pod频繁的重调度每个Pod都会记录自己的重调度次数退避时间随着每次失败的调度尝试呈指数增长直到达到最大值例如尝试失败 3 次的 Pod 的目标退避超时设置为 curTime 2s^3 (8s)。注意有两种情况下Pod会进入到BackoffQ队列中 unscheduleableQ会定时对其中的所有pod进行重调度那么就需要计算各个pod是否退避了足够的时间如果没有就放入到BackoffQ中再退避一段时间。如果一个Pod调度失败时正好这时又异步地发生了资源变更事件p.moveRequestCycle **** podSchedulingCycle (schedulingCycle 是当前调度的周期ActiveQ队列每pop一个pod就加1moveRequestCycle是事件发生时schedulingCycle 的值那么就不会放入UnschedulableQ中而是会直接放入到BackoffQ中。
调度队列机制有两个在后台运行的定期刷新 go协程负责将 pod 移动到活动队列后续也将详细介绍相关代码
**flushUnschedulablePodsLeftover**每 30 秒运行一次将 Pod 从UnschedulableQ中移动以允许未由任何事件移动的不可调度的 Pod 再次重试。**flushBackoffQCompleted**每1秒运行一次将BackoffQ中已经回避了足够久的Pod移动到ActiveQ队列中
移动请求move request会触发一个事件该事件负责将 Pod 从UnschedulableQ移动到ActiveQ或BackoffQ。集群中许多事件可以触发移动请求的发生包括了 Pod、节点、服务、PV、PVC、存储类和 CSI 节点的更改。例如当某些pod被调度时UnschedulableQ中与其具有亲和性要求而导致之前无法调度的pod就会被移动出去或者当某个新node加入时原本因为资源不够导致无法调度的Pod也会被移动出去。
调度队列源代码分析
队列初始化
Scheduler中的调度队列SchedulingQueue为internalqueue.SchedulingQueue类型该类型的实现在pkg/scheduler/internal/queue/scheduling_queue.go:92如下。
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {framework.PodNominatorAdd(pod *v1.Pod) error// Activate moves the given pods to activeQ iff theyre in unschedulablePods or backoffQ.// The passed-in pods are originally compiled from plugins that want to activate Pods,// by injecting the pods through a reserved CycleState struct (PodsToActivate).Activate(pods map[string]*v1.Pod)// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.// The podSchedulingCycle represents the current scheduling cycle number which can be// returned by calling SchedulingCycle().AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error// SchedulingCycle returns the current number of scheduling cycle which is// cached by scheduling queue. Normally, incrementing this number whenever// a pod is popped (e.g. called Pop()) is enough.SchedulingCycle() int64// Pop removes the head of the queue and returns it. It blocks if the// queue is empty and waits until a new item is added to the queue.Pop() (*framework.QueuedPodInfo, error)Update(oldPod, newPod *v1.Pod) errorDelete(pod *v1.Pod) errorMoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)AssignedPodAdded(pod *v1.Pod)AssignedPodUpdated(pod *v1.Pod)PendingPods() ([]*v1.Pod, string)// Close closes the SchedulingQueue so that the goroutine which is// waiting to pop items can exit gracefully.Close()// Run starts the goroutines managing the queue.Run()
}上述代码定义了其需要的对队列中的元素添加、删除、更新、获取、运行等方法。而其标准实现PriorityQueue 在
pkg/scheduler/internal/queue/scheduling_queue.go:145 中首先查看其需要的变量
// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues and a additional data structure, namely: activeQ,
// backoffQ and unschedulablePods.
// - activeQ holds pods that are being considered for scheduling.
// - backoffQ holds pods that moved from unschedulablePods and will move to
// activeQ when their backoff periods complete.
// - unschedulablePods holds pods that were already attempted for scheduling and
// are currently determined to be unschedulable.
type PriorityQueue struct {*nominatorstop chan struct{}clock clock.Clock// pod initial backoff duration.podInitialBackoffDuration time.Duration// pod maximum backoff duration.podMaxBackoffDuration time.Duration// the maximum time a pod can stay in the unschedulablePods.podMaxInUnschedulablePodsDuration time.Durationcond sync.Cond// activeQ is heap structure that scheduler actively looks at to find pods to// schedule. Head of heap is the highest priority pod.activeQ *heap.Heap// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff// are popped from this heap before the scheduler looks at activeQpodBackoffQ *heap.Heap// unschedulablePods holds pods that have been tried and determined unschedulable.unschedulablePods *UnschedulablePods// schedulingCycle represents sequence number of scheduling cycle and is incremented// when a pod is popped.schedulingCycle int64// moveRequestCycle caches the sequence number of scheduling cycle when we// received a move request. Unschedulable pods in and before this scheduling// cycle will be put back to activeQueue if we were trying to schedule them// when we received move request.moveRequestCycle int64clusterEventMap map[framework.ClusterEvent]sets.String// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin// closed indicates that the queue is closed.// It is mainly used to let Pop() exit its control loop while waiting for an item.closed boolnsLister listersv1.NamespaceListermetricsRecorder metrics.MetricAsyncRecorder// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.pluginMetricsSamplePercent int
}在pkg/scheduler/internal/queue/scheduling_queue.go:291 中给出了生成了该队列的初始化方法
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(lessFn framework.LessFunc,informerFactory informers.SharedInformerFactory,opts ...Option,
) *PriorityQueue {options : defaultPriorityQueueOptionsif options.podLister nil {options.podLister informerFactory.Core().V1().Pods().Lister()}for _, opt : range opts {opt(options)}comp : func(podInfo1, podInfo2 interface{}) bool {pInfo1 : podInfo1.(*framework.QueuedPodInfo)pInfo2 : podInfo2.(*framework.QueuedPodInfo)return lessFn(pInfo1, pInfo2)}pq : PriorityQueue{nominator: newPodNominator(options.podLister),clock: options.clock,stop: make(chan struct{}),podInitialBackoffDuration: options.podInitialBackoffDuration,podMaxBackoffDuration: options.podMaxBackoffDuration,podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),moveRequestCycle: -1,clusterEventMap: options.clusterEventMap,preEnqueuePluginMap: options.preEnqueuePluginMap,metricsRecorder: options.metricsRecorder,pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,}pq.cond.L pq.lockpq.podBackoffQ heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())pq.nsLister informerFactory.Core().V1().Namespaces().Lister()return pq
}可以看到其包含了许多我们上面介绍的概念包括activeQ 、unschedulablePods 、podBackoffQ 、schedulingCycle 、moveRequestCycle 。
QueuedPodInfo元素介绍
这里也多次出现了QueuedPodInfo这个关键的数据结构它是Pod中的基础元素在此进行介绍其定义在pkg/scheduler/framework/types.go:98 中包括了PodInfo、添加时间、尝试次数等
// QueuedPodInfo is a Pod wrapper with additional information related to
// the pods status in the scheduling queue, such as the timestamp when
// its added to the queue.
type QueuedPodInfo struct {*PodInfo// The time pod added to the scheduling queue.Timestamp time.Time// Number of schedule attempts before successfully scheduled.// Its used to record the # attempts metric.Attempts int// The time when the pod is added to the queue for the first time. The pod may be added// back to the queue multiple times before its successfully scheduled.// It shouldnt be updated once initialized. Its used to record the e2e scheduling// latency for a pod.InitialAttemptTimestamp time.Time// If a Pod failed in a scheduling cycle, record the plugin names it failed by.UnschedulablePlugins sets.String// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.Gated bool
}PodInfo的定义在pkg/scheduler/framework/types.go:131
// PodInfo is a wrapper to a Pod with additional pre-computed information to
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {Pod *v1.PodRequiredAffinityTerms []AffinityTermRequiredAntiAffinityTerms []AffinityTermPreferredAffinityTerms []WeightedAffinityTermPreferredAntiAffinityTerms []WeightedAffinityTerm
}Pod的定义在staging/src/k8s.io/api/core/v1/types.go:4202中
// Pod is a collection of containers that can run on a host. This resource is created
// by clients and scheduled onto hosts.
type Pod struct {metav1.TypeMeta json:,inline// Standard objects metadata.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata// optionalmetav1.ObjectMeta json:metadata,omitempty protobuf:bytes,1,opt,namemetadata// Specification of the desired behavior of the pod.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status// optionalSpec PodSpec json:spec,omitempty protobuf:bytes,2,opt,namespec// Most recently observed status of the pod.// This data may not be up to date.// Populated by the system.// Read-only.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status// optionalStatus PodStatus json:status,omitempty protobuf:bytes,3,opt,namestatus
}ActiveQ源代码介绍
从初始化代码中可以看到ActiveQ是一个heap其相关定义在pkg/scheduler/internal/heap/heap.go 中
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {// data stores objects and has a queue that keeps their ordering according// to the heap invariant.data *data// metricRecorder updates the counter when elements of a heap get added or// removed, and it does nothing if its nilmetricRecorder metrics.MetricRecorder
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data struct {// items is a map from key of the objects to the objects and their index.// We depend on the property that items in the map are in the queue and vice versa.items map[string]*heapItem// queue implements a heap data structure and keeps the order of elements// according to the heap invariant. The queue keeps the keys of objects stored// in items.queue []string// keyFunc is used to make the key used for queued item insertion and retrieval, and// should be deterministic.keyFunc KeyFunc// lessFunc is used to compare two objects in the heap.lessFunc lessFunc
}可以看到他是用queue实现了一个heap。
ActiveQ的默认排序代码在pkg/scheduler/framework/plugins/queuesort/priority_sort.go:42中,即按优先级进行排序如果优先级相同就提交时间的早晚进行排序。
// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodQueueInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {p1 : corev1helpers.PodPriority(pInfo1.Pod)p2 : corev1helpers.PodPriority(pInfo2.Pod)return (p1 p2) || (p1 p2 pInfo1.Timestamp.Before(pInfo2.Timestamp))
}UnschedulableQ源代码介绍
UnschedulableQ进行初始化的具体代码在pkg/scheduler/internal/queue/scheduling_queue.go:998
// newUnschedulablePods initializes a new object of UnschedulablePods.
func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods {return UnschedulablePods{podInfoMap: make(map[string]*framework.QueuedPodInfo),keyFunc: util.GetPodFullName,unschedulableRecorder: unschedulableRecorder,gatedRecorder: gatedRecorder,}
}其具体的定义代码在pkg/scheduler/internal/queue/scheduling_queue.go:939 ,
// UnschedulablePods holds pods that cannot be scheduled. This data structure
// is used to implement unschedulablePods.
type UnschedulablePods struct {// podInfoMap is a map key by a pods full-name and the value is a pointer to the QueuedPodInfo.podInfoMap map[string]*framework.QueuedPodInfokeyFunc func(*v1.Pod) string// unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap// get added or removed, and it does nothing if its nil.unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}可以看到他没有进行heap的包装而是直接采用Map结构进行保存。
BackoffQ源代码介绍
BackoffQ也是一个heap与ActiveQ不同的一点在于排序函数不同其排序函数的定义在pkg/scheduler/internal/queue/scheduling_queue.go:888
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {pInfo1 : podInfo1.(*framework.QueuedPodInfo)pInfo2 : podInfo2.(*framework.QueuedPodInfo)bo1 : p.getBackoffTime(pInfo1)bo2 : p.getBackoffTime(pInfo2)return bo1.Before(bo2)
}getBackoffTime的定义在pkg/scheduler/internal/queue/scheduling_queue.go:911中即计算完成避让的时间
// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {duration : p.calculateBackoffDuration(podInfo)backoffTime : podInfo.Timestamp.Add(duration)return backoffTime
}可以看到队列排序时会将完成避让最早的pod放在前面。
然后再看其是如何计算避让时间的在pkg/scheduler/internal/queue/scheduling_queue.go 中
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {duration : p.podInitialBackoffDurationfor i : 1; i podInfo.Attempts; i {// Use subtraction instead of addition or multiplication to avoid overflow.if duration p.podMaxBackoffDuration-duration {return p.podMaxBackoffDuration}duration duration}return duration
}其计算可以理解为初次为p.podInitialBackoffDuration每次需要的避让时间都是前一次的两倍如果计算得到的避让时间大于p.podMaxBackoffDuration/2 就将避让时间设置为p.podMaxBackoffDuration 。
队列弹出待调度的Pod
其代码在pkg/scheduler/internal/queue/scheduling_queue.go:593 中
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {p.lock.Lock()defer p.lock.Unlock()for p.activeQ.Len() 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the p.closed is set and the condition is broadcast,// which causes this loop to continue and return from the Pop().if p.closed {return nil, fmt.Errorf(queueClosed)}p.cond.Wait()}obj, err : p.activeQ.Pop()if err ! nil {return nil, err}pInfo : obj.(*framework.QueuedPodInfo)pInfo.Attemptsp.schedulingCyclereturn pInfo, nil
}可以看到如果activeQ中没有需要调度的Pod了那么就会使用p.cond.Wait来进行等待否则就冲activeQ中Pop一个元素QueuedPodInfo同时这个QueuedPodInfo 的Attempts会1整个队列中的schedulingCycle也会增加。
队列增加新的待调度的Pod
其代码在pkg/scheduler/internal/queue/scheduling_queue.go:398中
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {p.lock.Lock()defer p.lock.Unlock()pInfo : p.newQueuedPodInfo(pod)gated : pInfo.Gatedif added, err : p.addToActiveQ(pInfo); !added {return err}if p.unschedulablePods.get(pod) ! nil {klog.ErrorS(nil, Error: pod is already in the unschedulable queue, pod, klog.KObj(pod))p.unschedulablePods.delete(pod, gated)}// Delete pod from backoffQ if it is backing offif err : p.podBackoffQ.Delete(pInfo); err nil {klog.ErrorS(nil, Error: pod is already in the podBackoff queue, pod, klog.KObj(pod))}klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pod), event, PodAdd, queue, activeQName)metrics.SchedulerQueueIncomingPods.WithLabelValues(active, PodAdd).Inc()p.addNominatedPodUnlocked(pInfo.PodInfo, nil)p.cond.Broadcast()return nil
}主要是将要加入的pod转化为QueuedPodInfo类型然后添加到activeQ队列中还需要检查其他队列中是否有这个pod如果有就删除同时做一些日志相关记录然后还会调用p.cond.Broadcast()来解除上述提到的p.cond.Wait 的等待。
pod调度失败返回队列的处理
当Pod调度失败后会调用来AddUnschedulableIfNotPresent函数来进行处理其代码位置在pkg/scheduler/internal/queue/scheduling_queue.go 中。
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
// unschedulable pods in unschedulablePods. But if there has been a recent move
// request, then the pod is put in podBackoffQ.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {p.lock.Lock()defer p.lock.Unlock()pod : pInfo.Podif p.unschedulablePods.get(pod) ! nil {return fmt.Errorf(Pod %v is already present in unschedulable queue, klog.KObj(pod))}if _, exists, _ : p.activeQ.Get(pInfo); exists {return fmt.Errorf(Pod %v is already present in the active queue, klog.KObj(pod))}if _, exists, _ : p.podBackoffQ.Get(pInfo); exists {return fmt.Errorf(Pod %v is already present in the backoff queue, klog.KObj(pod))}// Refresh the timestamp since the pod is re-added.pInfo.Timestamp p.clock.Now()// If a move request has been received, move it to the BackoffQ, otherwise move// it to unschedulablePods.for plugin : range pInfo.UnschedulablePlugins {metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()}if p.moveRequestCycle podSchedulingCycle {if err : p.podBackoffQ.Add(pInfo); err ! nil {return fmt.Errorf(error adding pod %v to the backoff queue: %v, klog.KObj(pod), err)}klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pod), event, ScheduleAttemptFailure, queue, backoffQName)metrics.SchedulerQueueIncomingPods.WithLabelValues(backoff, ScheduleAttemptFailure).Inc()} else {p.unschedulablePods.addOrUpdate(pInfo)klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pod), event, ScheduleAttemptFailure, queue, unschedulablePods)metrics.SchedulerQueueIncomingPods.WithLabelValues(unschedulable, ScheduleAttemptFailure).Inc()}p.addNominatedPodUnlocked(pInfo.PodInfo, nil)return nil
}这里首先检查了其他队列中是否含有该pod如果有就返回错误然后比较moveRequestCycle 和podSchedulingCycle 如果p.moveRequestCycle podSchedulingCycle 那就说明在刚刚调度这个pod的时候集群发生了变化可能现在可以成功调度这个pod了将其转入backoffQ中不然就正常加入unschedulableQ中。
flushBackoffQCompleted
在队列运行时会初始化两个go协程来分别不停检查backoffQ和unschedulableQ以及时将相关的Pod移出。代码在pkg/scheduler/internal/queue/scheduling_queue.go:333 中
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}对于flushBackoffQCompleted即是每1s运行一次直到接收到p.stop信息。对flushBackoffQCompleted 函数的具体定义在pkg/scheduler/internal/queue/scheduling_queue.go:537中如下
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {p.lock.Lock()defer p.lock.Unlock()activated : falsefor {rawPodInfo : p.podBackoffQ.Peek()if rawPodInfo nil {break}pInfo : rawPodInfo.(*framework.QueuedPodInfo)pod : pInfo.Podif p.isPodBackingoff(pInfo) {break}_, err : p.podBackoffQ.Pop()if err ! nil {klog.ErrorS(err, Unable to pop pod from backoff queue despite backoff completion, pod, klog.KObj(pod))break}if err : p.activeQ.Add(pInfo); err ! nil {klog.ErrorS(err, Error adding pod to the active queue, pod, klog.KObj(pInfo.Pod))} else {klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pod), event, BackoffComplete, queue, activeQName)metrics.SchedulerQueueIncomingPods.WithLabelValues(active, BackoffComplete).Inc()activated true}}if activated {p.cond.Broadcast()}
}其主要内容就是从backOffQ的首个元素开始查看检查器是否已经过了避让时间如果过了就将其放入到activeQ队列中直到首个元素没有达到避让时间或者队列为空。
flushUnschedulablePodsLeftover
flushUnschedulablePodsLeftover每30s运行一次这部分的代码在pkg/scheduler/internal/queue/scheduling_queue.go:572中如下
// flushUnschedulablePodsLeftover moves pods which stay in unschedulablePods
// longer than podMaxInUnschedulablePodsDuration to backoffQ or activeQ.
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {p.lock.Lock()defer p.lock.Unlock()var podsToMove []*framework.QueuedPodInfocurrentTime : p.clock.Now()for _, pInfo : range p.unschedulablePods.podInfoMap {lastScheduleTime : pInfo.Timestampif currentTime.Sub(lastScheduleTime) p.podMaxInUnschedulablePodsDuration {podsToMove append(podsToMove, pInfo)}}if len(podsToMove) 0 {p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)}
}可以看到其主要作用是遍历所有的pod如果其在unschedulableQ中呆的时间如果超过了最大的p.podMaxInUnschedulablePodsDuration时间就会将其移出去至于是移动到activeQ中还是移动到backoffQ中取决于movePodsToActiveOrBackoffQueue函数在pkg/scheduler/internal/queue/scheduling_queue.go:771中如下
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {activated : falsefor _, pInfo : range podInfoList {// If the event doesnt help making the Pod schedulable, continue.// Note: we dont run the check if pInfo.UnschedulablePlugins is nil, which denotes// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.// In that case, its desired to move it anyways.if len(pInfo.UnschedulablePlugins) ! 0 !p.podMatchesEvent(pInfo, event) {continue}pod : pInfo.Podif p.isPodBackingoff(pInfo) {if err : p.podBackoffQ.Add(pInfo); err ! nil {klog.ErrorS(err, Error adding pod to the backoff queue, pod, klog.KObj(pod))} else {klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pInfo.Pod), event, event, queue, backoffQName)metrics.SchedulerQueueIncomingPods.WithLabelValues(backoff, event.Label).Inc()p.unschedulablePods.delete(pod, pInfo.Gated)}} else {gated : pInfo.Gatedif added, _ : p.addToActiveQ(pInfo); added {klog.V(5).InfoS(Pod moved to an internal scheduling queue, pod, klog.KObj(pInfo.Pod), event, event, queue, activeQName)activated truemetrics.SchedulerQueueIncomingPods.WithLabelValues(active, event.Label).Inc()p.unschedulablePods.delete(pod, gated)}}}p.moveRequestCycle p.schedulingCycleif activated {p.cond.Broadcast()}
}注意这个函数不仅仅是在flushUnschedulablePodsLeftover中被调用还会在处理其他移动请求时触发只不过这里的移动请求是UnschedulableTimeout 判断到底是如何移动也很容易从代码中看出如果已经到达了避让时间就加入到activeQ中如果没有就加入到backoffQ中注意到如果有移动进activeQ中也是需要执行p.cond.Broadcast()同时注意到这里更新了moveRequestCycle为schedulingCycle这也是其统一更新moveRequestCycle 的地方。
调度队列总结
考虑到调度队列的细节我们可以用下图来对其进行归纳回顾。