编程技术分享

  • 关于作者
  1. 首页
  2. Go
  3. 正文

K8S源码分析系列3—K8S调度器

2025年11月3日 27点热度 0人点赞 0条评论

1、调度框架

1.1 框架图

K8S从1.15版本开始引入可插拔架构的调度框架,定义了pod调度周期中的一些关键点,通过插件形式对这些关键点进行自定义扩展,从而达到不修改已有代码的情况下调整调度策略。如下为pod调度周期的流程图:

调度框架提供了一些扩展点,包括:QueueSort、PreFilter、Filter、PostFilter、PreScore、Score、Reserve、Permit、PreBind、Bind、PostBind,每个扩展点有明确的调用时机和作用,后面将会详细介绍。
插件分为in-tree、out-of-tree两类,前者为内置插件,后者为扩展插件,每个插件可以实现一个或多个扩展点。

1.2 数据结构

框架接口定义了一系列插件执行函数,每个函数在指定扩展点上调用,数据结构定义如下:
type Framework interface {
    //为插件提供一些工具
    Handle

    //返回函数用于调度队列中pod排序
    QueueSortFunc() LessFunc

    //执行PreFilter插件,如果任一插件返回非成功,调度循环被终止
    RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)

    //执行PostFilter插件,PostFilter插件可以用于通知,或者更新集群状态
    RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)

    //执行PreBind插件,如果任一插件返回非成功,将触发Unreserve插件,且pod重新放回调度队列
    RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    //执行PostBind插件
    RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    //执行Reserve插件,如果任一插件返回失败,pod将不会被调度
    RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    //执行Unreserve插件,pod绑定失败后执行,按照reserve逆序执行
    RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)

    //执行Permit插件,如果有任一插件返回非成功或等待,调度失败
    RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    //pod阻塞等待,直到pod被拒绝或允许调度
    WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status

    //执行Bind插件,Bind插件可以选择是否处理给定pod,如果没有插件处理,则返回skip状态
    RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status

    //判断是否有至少一个Filter插件
    HasFilterPlugins() bool

    //判断是否有至少一个PostFilter插件
    HasPostFilterPlugins() bool

    //判断是否有至少一个Score插件
    HasScorePlugins() bool

    ListPlugins() *config.Plugins
    ProfileName() string
}
框架接口的具体实现类,负责初始化并运行调度器插件,数据结构定义如下:
type frameworkImpl struct {
    //包含所有可用插件,用于启用、初始化插件,在初始化framework之前所有插件必须在registry中
    registry Registry
    snapshotSharedLister framework.SharedLister
    //保存permit阶段等待的pod
    waitingPods *waitingPodsMap
    //存储每个ScorePlugin插件的权重值
    scorePluginWeight map[string]int
    //扩展点插件
    queueSortPlugins []framework.QueueSortPlugin
    preFilterPlugins []framework.PreFilterPlugin
    filterPlugins []framework.FilterPlugin
    postFilterPlugins []framework.PostFilterPlugin
    preScorePlugins []framework.PreScorePlugin
    scorePlugins []framework.ScorePlugin
    reservePlugins []framework.ReservePlugin
    preBindPlugins []framework.PreBindPlugin
    bindPlugins []framework.BindPlugin
    postBindPlugins []framework.PostBindPlugin
    permitPlugins []framework.PermitPlugin

    clientSet clientset.Interface
    kubeConfig *restclient.Config
    eventRecorder events.EventRecorder
    //为k8s资源提供Informer机制
    informerFactory informers.SharedInformerFactory
    metricsRecorder *metricsRecorder
    profileName string

    extenders []framework.Extender
    framework.PodNominator
    parallelizer parallelize.Parallelizer
}

1.3 扩展点

如上文介绍,调度框架提供了一些扩展点,每个扩展点详细说明如下:
  • QueueSort:用于调度队列中pod排序,决定优先调度哪个pod,同时只能启用一个插件。
  • PreFilter:在调度周期开始前进行预处理,所有插件必须返回成功,否则pod调度终止。
  • Filter:用于过滤不满足pod运行条件的node,对于每一个node,只有当所有filter插件都返回成功时,才满足运行条件。
  • PostFilter:在过滤阶段之后,仅仅当pod没找到可运行的node时执行。
  • PreScore:在计算node分值前进行预处理,所有插件必须返回成功,否则pod调度终止。
  • Score:计算每个通过filter的node分值,用于node排序,所有插件必须返回成功,否则pod调度终止。
  • Reserve:在pod分配node后,绑定node前执行,用于为pod预留资源,提高调度吞吐。
  • Permit:用于阻止或延迟pod绑定node。
  • PreBind:在pod绑定node前执行,所有插件必须返回成功,否则pod调度终止。
  • Bind:绑定pod到node,按照配置顺序调用,一旦一个插件返回成功,剩余插件会跳过。
  • PostBind:pod成功绑定后执行。

2、调度器

2.1 数据结构

调度器数据结构如下:
type Scheduler struct {
    //缓存实现为type cacheImpl struct,用于管理assumed pod,assumed pod是已选定node,但未执行绑定的pod
    Cache internalcache.Cache
    //用于扩展外部进程来控制调度流程,如http服务
    Extenders []framework.Extender
    //阻塞获取下一个待调度pod
    NextPod func() *framework.QueuedPodInfo
    //发生错误时调用
    Error func(*framework.QueuedPodInfo, error)
    //调度指定pod到某个node
    SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResule, error)
    //关闭用于停止调度器
    StopEverything <-chan struct{}
    //pod调度队列,实现为PriorityQueue
    SchedulingQueue internalqueue.SchedulingQueue
    //调度器对应framework,key为调度器名称,value为framework
    Profiles profile.Map
    ...
}
结构关系如下:

2.2 调度流程图

调用链流程如下:
scheduler.go/main()
|---server.go/NewSchedulerCommand()
    |---options.go/NewOptions()
    |---server.go/runCommand()
        |---server.go/Setup()  //初始化调度器
        |   |---options.go/Options.Config() //创建调度器配置
        |       |---options.go/Options.ApplyTo() 
        |   |---scheduler.go/New() //创建调度器
        |       |---scheme.go/Scheme.Default() //设置调度器配置对象KubeSchedulerConfiguration的默认值,pkg/scheduler/apis/config/v1beta3/defaults.go
        |       |---framework/plugins/registry.go/NewInTreeRegistry()  //设置默认in-tree插件,包括:节点亲和性、节点资源、污点容忍等内置插件
        |       |---metrics.go/Register()  //注册metric
        |       |---profile.go/NewMap()  //构建frameworks,格式为:{schedulerName: framework}
        |       |---scheduling_queue.go/NewSchedulingQueue() //创建调度队列,后面通过scheduling_queue.go/MakeNextPodFunc()将其设置为调度器的NextPod属性
        |           |---scheduling_queue.go/NewPriorityQueue() //创建优先级队列
        |       |---scheduler.go/newScheduler() //创建调度器对象
        |       |---eventhandlers.go/addAllEventHandlers()  //添加资源事件处理,如Service、Pod、Node等,pod则是在事件处理中加入待调度队列activeQ
        |---server.go/Run()
            |---client-go/informers/factory.go/sharedInformerFactory.Start() //启动资源事件处理,默认资源包括:v1.Node、v1.PersistentVolumeClaim、v1.CSINode、v1.PodDisruptionBudget、v1.ReplicationController、v1.StatefulSet、v1.CSIDriver、v1.VSIStorageCapacity、v1.Pod、v1.PersistentVolume、v1.StorageClass、v1.Namespace、v1.Service、v1.ReplicaSet
            |---scheduler.go/Scheduler.Run() //运行调度器
                |---scheduling_queue.go/PriorityQueue.Run() //启动pod调度队列协程
                |   |---scheduling_queue.go/PriorityQueue.flushBackoffQCompleted()  //1s执行一次,将backoffQ中完成backoff的所有pod移动到activeQ
                |       |---scheduling_queue.go/PriorityQueue.addToActiveQ()
                |   |---scheduling_queue.go/PriorityQueue.flushUnschedulablePodsLeftover()  //30s执行一次,将unschedulablePods移动到backoffQ或activeQ
                |       |---scheduling_queue.go/PriorityQueue.movePodsToActiveOrBackoffQueue()
                |---schedule_one.go/Scheduler.scheduleOne() //执行一次单pod的完整调度
                    |---schedule_one.go/Scheduler.NextPod() //从调度队列中获取pod
                    |---schedule_one.go/Scheduler.frameworkForPod() //根据pod的SchedulerName获取FrameWork
                    |---schedule_one.go/Scheduler.schedulePod() //调度pod到node
                    |   |---schedule_one.go/Scheduler.findNodesThatFitPod() //查找合适node
                    |       |---framework.go/frameworkImpl.RunPreFilterPlugins() //执行插件PreFilterPlugin
                    |   |---schedule_one.go/prioritizeNodes() //节点打分
                    |       |---framework.go/frameworkImpl.RunPreScorePlugins() //执行插件PreScorePlugin
                    |       |---framework.go/frameworkImpl.RunScorePlugins() //执行插件ScorePlugin
                    |   |---schedule_one.go/selectHost() //选择评分最高的节点
                    |---framework.go/frameworkImpl.RunPostFilterPlugins() //调度失败,执行插件PostFilterPlugin,可能执行抢占,下一个调度周期被调度
                    |---schedule_one.go/Scheduler.assume() //选择node成功,设置pod.nodeName并加入缓存,实际bind操作可以异步执行,提高调度效率
                    |---framework.go/frameworkImpl.RunReservePluginsReserve() //执行插件ReservePlugin,通过缓存给pod预留资源,提高调度效率
                    |---framework.go/frameworkImpl.RunPermitPlugins() //执行插件PermitPlugin,用于允许、阻止、延迟节点绑定
                    |---framework.go/frameworkImpl.WaitOnPermit() //等待延迟绑定的pod通过或拒绝
                    |---framework.go/frameworkImpl.RunPreBindPlugins() //执行插件PreBindPlugin,执行pod绑定前逻辑,如volume绑定前等待PV controller完成绑定操作
                    |---schedule_one.go/Scheduler.bind() //绑定pod到node
                        |---schedule_one.go/Scheduler.extendersBinding() //执行extender绑定操作
                        |---framework.go/frameworkImpl.RunBindPlugins() //执行插件BindPlugin
                        |---schedule_one.go/Scheduler.finishBinding() //完成绑定,更新缓存
                    |---framework.go/frameworkImpl.RunPostBindPlugins() //执行插件PostBindPlugin,执行pod绑定后处理

3、调度缓存

3.1 数据结构

调度缓存包括pod状态信息、node信息,缓存接口数据结构如下:
type Cache interface {
    //node数量,用于测试
    NodeCount() int
    //pod数量,用于测试
    PodCount() (int, error)
    //pod已选定node,尚未绑定,在bind前加入缓存
    AssumePod(pod *v1.Pod) error
    //缓存中assumed pod过期
    FinishBinding(pod *v1.Pod) error
    //从缓存中删除assumed pod
    ForgetPod(pod *v1.Pod) error
    //确保pod是assumed状态,否则会更新或重新添加pod到缓存(pod过期或node信息不匹配)
    AddPod(pod *v1.Pod) error
    //移除旧pod信息,增加新pod信息
    UpdatePod(oldPod, newPod *v1.Pod) error
    //删除pod,pod信息会从已选择的node上移除
    RemovePod(pod *v1.Pod) error
    //从缓存中获取pod信息
    GetPod(pod *v1.Pod) (*v1.Pod, error)
    //判断指定pod是否assumed状态,并且未过期
    IsAssumedPod(pod *v1.Pod) (bool, error)
    //增加node总体信息到缓存
    AddNode(node *v1.Node) *framework.NodeInfo
    //更新缓存node总体信息
    UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
    //移除node总体信息
    RemoveNode(node *v1.Node) error
    //更新缓存内容
    UpdateSnapshot(nodeSnapshot *Snapshot) error
    //输出缓存信息,用于debug
    Dump() *Dump
}
调度缓存实现数据结构如下:
type cacheImpl struct {
    ...
    //assumed pod uid列表
    assumedPods sets.String
    //pod状态信息,key为pod uid
    podStates map[string]*podState
    //node信息,key为nodeName
    nodes map[string]*nodeInfoListItem
    //指向nodes中最近更新的节点信息,链表头节点
    headNode *nodeInfoListItem
    //节点树形拓扑结构
    nodeTree *nodeTree
    //镜像信息,key为镜像名,value包含镜像大小、镜像存在于哪些节点
    imageStates map[string]*imageState
}

3.2 缓存状态机

缓存中pod状态分为:Initial、Assumed、Added、Expired、Deleted,各状态含义如下:
  • Initial:初始状态,非实际存在于缓存中。
  • Assumed:假定的调度状态,pod已选定node,但尚未绑定。
  • Added:add动作,过期的pod或node发生变化的pod重新更新缓存。
  • Expired:过期的pod,非实际存在于缓存中。
  • Deleted:pod被删除,非实际存在于缓存中。

4、调度插件

插件分为两类:out-of-tree plugin、in-tree plugin。in-tree插件通过NewInTreeRegistry构建,out-of-tree插件通过选项WithFrameworkOutOfTreeRegistry进行注册。

4.1 插件接口

Plugin接口
type Plugin interface {
    Name() string
}

QueueSortPlugin接口

//用于调度队列中pod排序,同时只能启用一个插件
type QueueSortPlugin interface {
    Plugin
    //比较函数,用于调度队列中pod排序
    Less(*QueuedPodInfo, *QueuedPodInfo) bool
}

PreFilterPlugin接口

//扩展接口,用于增量更新state
type PreFilterExtensions interface {
    //用于评估增加pod的影响
    AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
    //用于评估移除pod的影响
    RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}

//在调度周期开始前调用
type PreFilterPlugin interface {
    Plugin
    //在调度周期开始前调用,所有PreFilter必须返回成功,否则pod被拒绝调度
    PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
    //返回PreFilterExtensions接口,在PreFilter之后调用,可提供扩展插件增量更新信息
    PreFilterExtensions() PreFilterExtensions
}

FilterPlugin接口

//在filter扩展点执行,用于过滤出不能运行pod的节点,返回非Success结果将排除对应node
type FilterPlugin interface {
    Plugin
    //当所有filter插件返回Success时,pod能运行在指定节点上,否则排除节点
    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}

PostFilterPlugin接口

//通知类型扩展点,在pod不能被调度时调用
type PostFilterPlugin interface {
    Plugin
    //尝试让pod在之后的调度周期中能被调度
    PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}

PreScorePlugin接口

//通知类型扩展点,传入所有通过filter的节点,用于更新内部状态或生成日志、指标
type PreScorePlugin interface {
    Plugin
    //所有插件必须返回Success,否则pod被拒绝
    PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}

ScorePlugin接口

//节点分值扩展插件
type ScoreExtensions interface {
    //用于标准化节点分值,执行成功后会更新scores
    NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
}

//节点分值计算插件
type ScorePlugin interface {
    Plugin
    //计算每个通过filter的节点分值,分值用于节点排序,所有插件必须返回Success,否则pod被拒绝
    Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
    //返回ScoreExtensions接口
    ScoreExtensions() ScoreExtensions
}

ReservePlugin接口

//通知类型扩展点,为pod预留资源
type ReservePlugin interface {
    Plugin
    //当调度器cache更新后调用,如果此方法返回失败,调度器将调用所有Reserve插件的Unreserve方法
    Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
    //当Reserve调用失败时执行,此方法必须实现幂等性,可能部分插件的Reserve方法未被调用
    Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}

PremitPlugin接口

//在pod绑定节点前调用
type PremitPlugin interface {
    Plugin
    //在pod绑定节点、prebind插件前调用,用于阻止或延迟绑定pod,插件返回成功、等待、拒绝,仅当没有其他插件拒绝时才会执行插件等待
    Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}

PreBindPlugin接口

//在pod绑定节点前调用
type PreBindPlugin interface {
    Plugin
    //所有PreBind插件必须返回Success,否则pod将被拒绝,不会执行绑定
    PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}

BindPlugin接口

//绑定pod到节点
type BindPlugin interface {
    Plugin
    //绑定pod到节点,绑定函数会按照配置顺序调用,插件可以选择是否处理pod绑定,不处理需要返回Skip,处理了后续函数会跳过,如果插件返回Error,pod将被拒绝不会绑定
    Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}

PostBindPlugin接口

//通知类型扩展点,pod成功绑定节点后调用
type PostBindPlugin interface {
    Plugin
    //pod成功绑定节点后调用,用于状态清理等后处理
    PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}

4.2 内置插件

(1)SelectorSpread
用于将同一控制器管理的多个pod分散在不同拓扑域。
//插件实现的扩展点:Score(包含NormalizeScore)、PreScore
type SelectorSpread struct {
    sharedLister           framework.SharedLister
    services               corelisters.ServiceLister
    replicationControllers corelisters.ReplicationControllerLister
    replicaSets            appslisters.ReplicaSetLister
    statefulSets           appslisters.StatefulSetLister
}
(2)ImageLocality
用于将pod调度到已存在镜像的node上。
//插件实现的扩展点:Score
type ImageLocality struct {
    handle framework.Handle
}
(3)TaintToleration
检查pod是否容忍node上的taints。
//插件实现的扩展点:Filter、PreScore、Score(包含NormalizeScore)
type TaintToleration struct {
    handle framework.Handle
}
(4)NodeName
检查pod指定的node name是否匹配当前node。
//插件实现的扩展点:Filter
type NodeName struct {}
(5)NodePorts
检查node是否满足pod请求的port。
//插件实现的扩展点:PreFilter、Filter
type NodePorts struct{}
(6)NodeAffinity
检查pod的node selector是否匹配node label。
//插件实现的扩展点:PreFilter、Filter、PreScore、Score(包含NormalizeScore)
type NodeAffinity struct {
    handle              framework.Handle
    addedNodeSelector   *nodeaffinity.NodeSelector
    addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms
}
(7)PodTopologySpread
确保满足pod的topologySpreadConstraints条件。
type PodTopologySpread struct {
    systemDefaulted                     bool
    parallelizer                        parallelize.Parallelizer
    defaultConstraints                  []v1.TopologySpreadConstraint
    sharedLister                        framework.SharedLister
    services                            corelisters.ServiceLister
    replicationCtrls                    corelisters.ReplicationControllerLister
    replicaSets                         appslisters.ReplicaSetLister
    statefulSets                        appslisters.StatefulSetLister
    enableMinDomainsInPodTopologySpread bool
}
(8)NodeUnschedulable
过滤掉node.Spec.Unschedulable=true的节点,除非pod设置容忍污点。
//插件实现的扩展点:Filter
type NodeUnschedulable struct {}
(9)Fit
检查node是否有足够的资源。
//插件实现的扩展点:PreFilter、Filter、Score
type Fit struct {
    ignoredResources      sets.String
    ignoredResourceGroups sets.String
    handle                framework.Handle
    resourceAllocationScorer
}
(10)BalancedAllocation
用于平衡资源使用率,插件分别计算cpu、内存请求量和node上可分配量比率,并且基于该比值接近程度计算节点优先级。
//插件实现的扩展点:Score
type BalancedAllocation struct {
    handle framework.Handle
    resourceAllocationScorer
}
(11)VolumeBinding
插件用于绑定pod volume,在Filter阶段创建cache,并在Reserve、PreBind阶段使用。
//插件实现的扩展点:PreFilter、Filter、Score、Reserve、PreBind、Unreserve
type VolumeBinding struct {
    Binder    SchedulerVolumeBinder
    PVCLister corelisters.PersistentVolumeClaimLister
    scorer    volumeCapacityScorer
    fts       feature.Features
}
(12)VolumeRestrictions
插件检查volume的限制。
//插件实现的扩展点:PreFilter、Filter
type VolumeRestrictions struct {
    parallelizer           parallelize.Parallelizer
    pvcLister              corelisters.PersistentVolumeClaimLister
    nodeInfoLister         framework.SharedLister
    enableReadWriteOncePod bool
}
(13)VolumeZone
插件检查volume zone
//插件实现的扩展点:Filter
type VolumeZone struct {
    pvLister  corelisters.PersistentVolumeLister
    pvcLister corelisters.PersistentVolumeClaimLister
    scLister  storagelisters.StorageClassLister
}
(14)CSILimits
插件检查节点volume限制
//插件实现的扩展点:Filter
type CSILimits struct {
    csiNodeLister storagelisters.CSINodeLister
    pvLister      corelisters.PersistentVolumeLister
    pvcLister     corelisters.PersistentVolumeClaimLister
    scLister      storagelisters.StorageClassLister
    randomVolumeIDPrefix string
    translator InTreeToCSITranslator
}
(15)InterPodAffinity
插件检查pod亲和性。
type InterPodAffinity struct {
    parallelizer parallelize.Parallelizer
    args         config.InterPodAffinityArgs
    sharedLister framework.SharedLister
    nsLister     listersv1.NamespaceLister
}
(16)PrioritySort
插件根据pod的spec.Priority进行排序。
type PrioritySort struct{}
(17)DefaultBinder
插件使用k8s client绑定pod到node。
//插件实现扩展点:Bind
type DefaultBinder struct {
    handle framework.Handle
}
(18)DefaultPreemption
插件实现pod默认抢占。
//插件实现扩展点:PostFilter
type DefaultPreemption struct {
    fh        framework.Handle
    args      config.DefaultPreemptionArgs
    podLister corelisters.PodLister
    pdbLister policylisters.PodDisruptionBudgetLister
}

5、调度队列

5.1 数据结构

调度队列接口,存储等待调度的pod
type SchedulingQueue interface {
    Add(pod *v1.Pod) error
    Activate(pods map[string]*v1.Pod)
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    SchedulingCycle() int64
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    Close()
    Run()
}
优先级队列,实现调度队列接口
type PriorityQueue struct {
    framework.PodNominator
    stop chan struct{}
    clock util.Clock
    //pod初始化的backoff时长
    podInitialBackoffDuration time.Duration
    //pod最大backoff时长
    podMaxBackoffDuration time.Duration
    //pod在不可调度队列中最大时长
    podMaxInUnschedulablePodsDuration time.Duration
    lock sync.RWMutex
    cond sync.Cond
    //优先级调度队列,等待调度的pod,队列头优先级最高,优先级比较函数为Framework的QueueSortFunc
    activeQ *heap.Heap
    //优先级backoff队列,接收来自unschedulablePods的pod,完成退避时间后移到activeQ队列
    podBackoffQ *heap.Heap
    //不可调度队列
    unschedulablePods *UnschedulablePods
    //调度周期序列号,每次弹出pod会增加
    schedulingCycle int64
    //当收到将pod从unschedulable队列移动到backoff或active队列时,记录当前调度序列号
    moveRequestCycle int64
    clusterEventMap map[framework.ClusterEvent]sets.String
    closed bool
    nsLister listersv1.NamespaceLister
}

5.2 流程图

调度器后台会启动两个定时任务,将退避队列、不可调度队列的pod移动到调度队列:
flushBackoffQCompleted:每1秒执行一次,将完成退避时间的所有pod移动到activeQ中。
flushUnschedulablePodsLeftover:每30秒执行一次,将unschedulablePods中超过阈值时间(默认5min)的pod移动到podBackoffQ或activeQ。

6、自定义调度器

接下来我们开发一个自定义调度器,根据pod的label中使用hintNode来选择调度节点,代码结构如下:

(1)调度器程序
main.go,启动调度器,注册自定义插件MyPlugin
package main

import (
    "fmt"
    "k8s.io/kubernetes/cmd/kube-scheduler/app"
    "myscheduler/pkg/plugins"
    "os"
)

func main() {
    command := app.NewSchedulerCommand(
        app.WithPlugin("MyPlugin", plugins.NewMyPlugin),
    )
    if err := command.Execute(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
}

myplugin.go,在Filter扩展点使用pod label中设置的hintNode字段选择调度的节点

package plugins

import (
    "context"
    "fmt"
    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/kubernetes/pkg/scheduler/framework"
)

type MyPlugin struct {
}

// 确保MyPlugin实现了FilterPlugin接口
var _ framework.FilterPlugin = &MyPlugin{}

func (plugin *MyPlugin) Name() string {
    return "MyPlugin"
}

func (plugin *MyPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    fmt.Println("pod labels:", pod.Labels)
    hintNode, ok := pod.Labels["hintNode"]
    if !ok {
        return framework.NewStatus(framework.UnschedulableAndUnresolvable)
    }
    if nodeInfo.Node().Name == hintNode {
        return framework.NewStatus(framework.Success)
    }
    return framework.NewStatus(framework.UnschedulableAndUnresolvable)
}

func NewMyPlugin(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
    return &MyPlugin{}, nil
}
(2)构建镜像
Dockerfile
FROM golang:1.24.6-alpine AS builder
RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories

COPY . /data/app
WORKDIR /data/app

RUN mkdir -p /data/app/bin && \
    export GOPROXY=https://goproxy.cn,direct && \
    go mod tidy && \
    go build -ldflags="-s -w" -o /data/app/bin/main ./cmd/main.go

FROM alpine:3.16
RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories

RUN apk add tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
    && echo "Asia/Shanghai" > /etc/timezone \
    && apk del tzdata

WORKDIR /data/app
COPY --from=builder /data/app/bin /data/app
(3)调度器部署
clusterrole.yaml,创建调度器角色,授予相应权限
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: myscheduler-role
rules:
  - apiGroups:
      - ""
    resources:
      - namespaces
    verbs:
      - list
      - get
      - watch
  - apiGroups:
      - ""
    resources:
      - endpoints
      - events
    verbs:
      - create
      - get
      - update
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - pods
    verbs:
      - delete
      - get
      - list
      - watch
      - update
  - apiGroups:
      - ""
    resources:
      - bindings
      - pods/binding
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - pods/status
    verbs:
      - patch
      - update
  - apiGroups:
      - ""
    resources:
      - replicationcontrollers
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - apps
      - extensions
    resources:
      - replicasets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - apps
    resources:
      - statefulsets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - policy
    resources:
      - poddisruptionbudgets
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - persistentvolumeclaims
      - persistentvolumes
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "storage.k8s.io"
    resources:
      - storageclasses
      - csinodes
      - csidrivers
      - csistoragecapacities
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "coordination.k8s.io"
    resources:
      - leases
    verbs:
      - create
      - get
      - list
      - update
  - apiGroups:
      - "events.k8s.io"
    resources:
      - events
    verbs:
      - create
      - patch
      - update
serviceaccount.yaml,创建调度器账号
apiVersion: v1
kind: ServiceAccount
metadata:
  name: myscheduler-account
  namespace: kube-system
clusterrolebinding.yaml,绑定账号、角色
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: myscheduler-rolebinding
  namespace: kube-system
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: myscheduler-role
subjects:
  - kind: ServiceAccount
    name: myscheduler-account
    namespace: kube-system
configmap.yaml,创建调度器配置文件,kind为KubeSchedulerConfiguration
apiVersion: v1
kind: ConfigMap
metadata:
  name: myscheduler-configmap
  namespace: kube-system
data:
  scheduler.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1beta3
    kind: KubeSchedulerConfiguration
    parallelism: 16
    leaderElection:
      leaderElect: true
      resourceLock: leases
      resourceNamespace: kube-system
      resourceName: myscheduler
    profiles:
      - schedulerName: myscheduler
        plugins:
          multiPoint:
            enabled:
              - name: MyPlugin

deployment.yaml,创建调度器Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: myscheduler
  namespace: kube-system
  labels:
    app: myscheduler
spec:
  replicas: 1
  selector:
    matchLabels:
      app: myscheduler
  template:
    metadata:
      labels:
        app: myscheduler
    spec:
      serviceAccountName: myscheduler-account
      priorityClassName: system-cluster-critical
      volumes:
        - name: config
          configMap:
            name: myscheduler-configmap
      containers:
        - name: myscheduler
          image: docker.io/library/myscheduler:1.0
          imagePullPolicy: IfNotPresent
          args:
            - ./main
            - --config=/data/app/config/scheduler.yaml
          resources:
            requests:
              cpu: "50m"
          volumeMounts:
            - name: config
              mountPath: /data/app/config
调度器成功启动:

(4)测试验证
nginx.yaml,启动测试nginx服务,label中使用hintNode为worker1
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
  namespace: default
  labels:
    app: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
        hintNode: worker1
    spec:
      schedulerName: myscheduler
      containers:
        - name: nginx
          image: docker.io/library/nginx:1.25
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: nginx
  namespace: default
spec:
  type: NodePort
  selector:
    app: nginx
  ports:
    - port: 80
      targetPort: 80
      nodePort: 30200
nginx pod被调度到指定node:

标签: 暂无
最后更新:2025年11月3日

jemuel

这个人很懒,什么都没留下

点赞
< 上一篇
文章目录
  • 1、调度框架
    • 1.1 框架图
    • 1.2 数据结构
    • 1.3 扩展点
  • 2、调度器
    • 2.1 数据结构
    • 2.2 调度流程图
  • 3、调度缓存
    • 3.1 数据结构
    • 3.2 缓存状态机
  • 4、调度插件
    • 4.1 插件接口
    • 4.2 内置插件
  • 5、调度队列
    • 5.1 数据结构
    • 5.2 流程图
  • 6、自定义调度器
最新 热点 随机
最新 热点 随机
K8S源码分析系列3—K8S调度器 K8S源码分析系列2—远程调试K8S组件 Volcano源码分析系列—调度篇 K8S源码分析系列1—搭建K8S调试集群 K8S Controller开发 6.5840 Lab 1: MapReduce
K8S源码分析系列3—K8S调度器
Go调度模型 MySQL源码分析系列3——登录协议解析 K8S源码分析系列3—K8S调度器 GraphQL介绍及使用 Golang优先级调度 MongoDB源码分析系列1——编译环境搭建

COPYRIGHT © 2021 www.miaozhouguang.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

粤ICP备2022006024号

粤公网安备 44030602006568号