1、调度框架
1.1 框架图
K8S从1.15版本开始引入可插拔架构的调度框架,定义了pod调度周期中的一些关键点,通过插件形式对这些关键点进行自定义扩展,从而达到不修改已有代码的情况下调整调度策略。如下为pod调度周期的流程图:
调度框架提供了一些扩展点,包括:QueueSort、PreFilter、Filter、PostFilter、PreScore、Score、Reserve、Permit、PreBind、Bind、PostBind,每个扩展点有明确的调用时机和作用,后面将会详细介绍。
插件分为in-tree、out-of-tree两类,前者为内置插件,后者为扩展插件,每个插件可以实现一个或多个扩展点。
1.2 数据结构
框架接口定义了一系列插件执行函数,每个函数在指定扩展点上调用,数据结构定义如下:
type Framework interface {
//为插件提供一些工具
Handle
//返回函数用于调度队列中pod排序
QueueSortFunc() LessFunc
//执行PreFilter插件,如果任一插件返回非成功,调度循环被终止
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
//执行PostFilter插件,PostFilter插件可以用于通知,或者更新集群状态
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
//执行PreBind插件,如果任一插件返回非成功,将触发Unreserve插件,且pod重新放回调度队列
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//执行PostBind插件
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//执行Reserve插件,如果任一插件返回失败,pod将不会被调度
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//执行Unreserve插件,pod绑定失败后执行,按照reserve逆序执行
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//执行Permit插件,如果有任一插件返回非成功或等待,调度失败
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//pod阻塞等待,直到pod被拒绝或允许调度
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
//执行Bind插件,Bind插件可以选择是否处理给定pod,如果没有插件处理,则返回skip状态
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//判断是否有至少一个Filter插件
HasFilterPlugins() bool
//判断是否有至少一个PostFilter插件
HasPostFilterPlugins() bool
//判断是否有至少一个Score插件
HasScorePlugins() bool
ListPlugins() *config.Plugins
ProfileName() string
}
框架接口的具体实现类,负责初始化并运行调度器插件,数据结构定义如下:
type frameworkImpl struct {
//包含所有可用插件,用于启用、初始化插件,在初始化framework之前所有插件必须在registry中
registry Registry
snapshotSharedLister framework.SharedLister
//保存permit阶段等待的pod
waitingPods *waitingPodsMap
//存储每个ScorePlugin插件的权重值
scorePluginWeight map[string]int
//扩展点插件
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
//为k8s资源提供Informer机制
informerFactory informers.SharedInformerFactory
metricsRecorder *metricsRecorder
profileName string
extenders []framework.Extender
framework.PodNominator
parallelizer parallelize.Parallelizer
}
1.3 扩展点
如上文介绍,调度框架提供了一些扩展点,每个扩展点详细说明如下:
- QueueSort:用于调度队列中pod排序,决定优先调度哪个pod,同时只能启用一个插件。
- PreFilter:在调度周期开始前进行预处理,所有插件必须返回成功,否则pod调度终止。
- Filter:用于过滤不满足pod运行条件的node,对于每一个node,只有当所有filter插件都返回成功时,才满足运行条件。
- PostFilter:在过滤阶段之后,仅仅当pod没找到可运行的node时执行。
- PreScore:在计算node分值前进行预处理,所有插件必须返回成功,否则pod调度终止。
- Score:计算每个通过filter的node分值,用于node排序,所有插件必须返回成功,否则pod调度终止。
- Reserve:在pod分配node后,绑定node前执行,用于为pod预留资源,提高调度吞吐。
- Permit:用于阻止或延迟pod绑定node。
- PreBind:在pod绑定node前执行,所有插件必须返回成功,否则pod调度终止。
- Bind:绑定pod到node,按照配置顺序调用,一旦一个插件返回成功,剩余插件会跳过。
- PostBind:pod成功绑定后执行。
2、调度器
2.1 数据结构
调度器数据结构如下:
type Scheduler struct {
//缓存实现为type cacheImpl struct,用于管理assumed pod,assumed pod是已选定node,但未执行绑定的pod
Cache internalcache.Cache
//用于扩展外部进程来控制调度流程,如http服务
Extenders []framework.Extender
//阻塞获取下一个待调度pod
NextPod func() *framework.QueuedPodInfo
//发生错误时调用
Error func(*framework.QueuedPodInfo, error)
//调度指定pod到某个node
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResule, error)
//关闭用于停止调度器
StopEverything <-chan struct{}
//pod调度队列,实现为PriorityQueue
SchedulingQueue internalqueue.SchedulingQueue
//调度器对应framework,key为调度器名称,value为framework
Profiles profile.Map
...
}
结构关系如下:
2.2 调度流程图
调用链流程如下:
scheduler.go/main()
|---server.go/NewSchedulerCommand()
|---options.go/NewOptions()
|---server.go/runCommand()
|---server.go/Setup() //初始化调度器
| |---options.go/Options.Config() //创建调度器配置
| |---options.go/Options.ApplyTo()
| |---scheduler.go/New() //创建调度器
| |---scheme.go/Scheme.Default() //设置调度器配置对象KubeSchedulerConfiguration的默认值,pkg/scheduler/apis/config/v1beta3/defaults.go
| |---framework/plugins/registry.go/NewInTreeRegistry() //设置默认in-tree插件,包括:节点亲和性、节点资源、污点容忍等内置插件
| |---metrics.go/Register() //注册metric
| |---profile.go/NewMap() //构建frameworks,格式为:{schedulerName: framework}
| |---scheduling_queue.go/NewSchedulingQueue() //创建调度队列,后面通过scheduling_queue.go/MakeNextPodFunc()将其设置为调度器的NextPod属性
| |---scheduling_queue.go/NewPriorityQueue() //创建优先级队列
| |---scheduler.go/newScheduler() //创建调度器对象
| |---eventhandlers.go/addAllEventHandlers() //添加资源事件处理,如Service、Pod、Node等,pod则是在事件处理中加入待调度队列activeQ
|---server.go/Run()
|---client-go/informers/factory.go/sharedInformerFactory.Start() //启动资源事件处理,默认资源包括:v1.Node、v1.PersistentVolumeClaim、v1.CSINode、v1.PodDisruptionBudget、v1.ReplicationController、v1.StatefulSet、v1.CSIDriver、v1.VSIStorageCapacity、v1.Pod、v1.PersistentVolume、v1.StorageClass、v1.Namespace、v1.Service、v1.ReplicaSet
|---scheduler.go/Scheduler.Run() //运行调度器
|---scheduling_queue.go/PriorityQueue.Run() //启动pod调度队列协程
| |---scheduling_queue.go/PriorityQueue.flushBackoffQCompleted() //1s执行一次,将backoffQ中完成backoff的所有pod移动到activeQ
| |---scheduling_queue.go/PriorityQueue.addToActiveQ()
| |---scheduling_queue.go/PriorityQueue.flushUnschedulablePodsLeftover() //30s执行一次,将unschedulablePods移动到backoffQ或activeQ
| |---scheduling_queue.go/PriorityQueue.movePodsToActiveOrBackoffQueue()
|---schedule_one.go/Scheduler.scheduleOne() //执行一次单pod的完整调度
|---schedule_one.go/Scheduler.NextPod() //从调度队列中获取pod
|---schedule_one.go/Scheduler.frameworkForPod() //根据pod的SchedulerName获取FrameWork
|---schedule_one.go/Scheduler.schedulePod() //调度pod到node
| |---schedule_one.go/Scheduler.findNodesThatFitPod() //查找合适node
| |---framework.go/frameworkImpl.RunPreFilterPlugins() //执行插件PreFilterPlugin
| |---schedule_one.go/prioritizeNodes() //节点打分
| |---framework.go/frameworkImpl.RunPreScorePlugins() //执行插件PreScorePlugin
| |---framework.go/frameworkImpl.RunScorePlugins() //执行插件ScorePlugin
| |---schedule_one.go/selectHost() //选择评分最高的节点
|---framework.go/frameworkImpl.RunPostFilterPlugins() //调度失败,执行插件PostFilterPlugin,可能执行抢占,下一个调度周期被调度
|---schedule_one.go/Scheduler.assume() //选择node成功,设置pod.nodeName并加入缓存,实际bind操作可以异步执行,提高调度效率
|---framework.go/frameworkImpl.RunReservePluginsReserve() //执行插件ReservePlugin,通过缓存给pod预留资源,提高调度效率
|---framework.go/frameworkImpl.RunPermitPlugins() //执行插件PermitPlugin,用于允许、阻止、延迟节点绑定
|---framework.go/frameworkImpl.WaitOnPermit() //等待延迟绑定的pod通过或拒绝
|---framework.go/frameworkImpl.RunPreBindPlugins() //执行插件PreBindPlugin,执行pod绑定前逻辑,如volume绑定前等待PV controller完成绑定操作
|---schedule_one.go/Scheduler.bind() //绑定pod到node
|---schedule_one.go/Scheduler.extendersBinding() //执行extender绑定操作
|---framework.go/frameworkImpl.RunBindPlugins() //执行插件BindPlugin
|---schedule_one.go/Scheduler.finishBinding() //完成绑定,更新缓存
|---framework.go/frameworkImpl.RunPostBindPlugins() //执行插件PostBindPlugin,执行pod绑定后处理
3、调度缓存
3.1 数据结构
调度缓存包括pod状态信息、node信息,缓存接口数据结构如下:
type Cache interface {
//node数量,用于测试
NodeCount() int
//pod数量,用于测试
PodCount() (int, error)
//pod已选定node,尚未绑定,在bind前加入缓存
AssumePod(pod *v1.Pod) error
//缓存中assumed pod过期
FinishBinding(pod *v1.Pod) error
//从缓存中删除assumed pod
ForgetPod(pod *v1.Pod) error
//确保pod是assumed状态,否则会更新或重新添加pod到缓存(pod过期或node信息不匹配)
AddPod(pod *v1.Pod) error
//移除旧pod信息,增加新pod信息
UpdatePod(oldPod, newPod *v1.Pod) error
//删除pod,pod信息会从已选择的node上移除
RemovePod(pod *v1.Pod) error
//从缓存中获取pod信息
GetPod(pod *v1.Pod) (*v1.Pod, error)
//判断指定pod是否assumed状态,并且未过期
IsAssumedPod(pod *v1.Pod) (bool, error)
//增加node总体信息到缓存
AddNode(node *v1.Node) *framework.NodeInfo
//更新缓存node总体信息
UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo
//移除node总体信息
RemoveNode(node *v1.Node) error
//更新缓存内容
UpdateSnapshot(nodeSnapshot *Snapshot) error
//输出缓存信息,用于debug
Dump() *Dump
}
调度缓存实现数据结构如下:
type cacheImpl struct {
...
//assumed pod uid列表
assumedPods sets.String
//pod状态信息,key为pod uid
podStates map[string]*podState
//node信息,key为nodeName
nodes map[string]*nodeInfoListItem
//指向nodes中最近更新的节点信息,链表头节点
headNode *nodeInfoListItem
//节点树形拓扑结构
nodeTree *nodeTree
//镜像信息,key为镜像名,value包含镜像大小、镜像存在于哪些节点
imageStates map[string]*imageState
}
3.2 缓存状态机
缓存中pod状态分为:Initial、Assumed、Added、Expired、Deleted,各状态含义如下:
- Initial:初始状态,非实际存在于缓存中。
- Assumed:假定的调度状态,pod已选定node,但尚未绑定。
- Added:add动作,过期的pod或node发生变化的pod重新更新缓存。
- Expired:过期的pod,非实际存在于缓存中。
- Deleted:pod被删除,非实际存在于缓存中。
4、调度插件
插件分为两类:out-of-tree plugin、in-tree plugin。in-tree插件通过NewInTreeRegistry构建,out-of-tree插件通过选项WithFrameworkOutOfTreeRegistry进行注册。
4.1 插件接口
Plugin接口
type Plugin interface {
Name() string
}
QueueSortPlugin接口
//用于调度队列中pod排序,同时只能启用一个插件
type QueueSortPlugin interface {
Plugin
//比较函数,用于调度队列中pod排序
Less(*QueuedPodInfo, *QueuedPodInfo) bool
}
PreFilterPlugin接口
//扩展接口,用于增量更新state
type PreFilterExtensions interface {
//用于评估增加pod的影响
AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
//用于评估移除pod的影响
RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}
//在调度周期开始前调用
type PreFilterPlugin interface {
Plugin
//在调度周期开始前调用,所有PreFilter必须返回成功,否则pod被拒绝调度
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
//返回PreFilterExtensions接口,在PreFilter之后调用,可提供扩展插件增量更新信息
PreFilterExtensions() PreFilterExtensions
}
FilterPlugin接口
//在filter扩展点执行,用于过滤出不能运行pod的节点,返回非Success结果将排除对应node
type FilterPlugin interface {
Plugin
//当所有filter插件返回Success时,pod能运行在指定节点上,否则排除节点
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
PostFilterPlugin接口
//通知类型扩展点,在pod不能被调度时调用
type PostFilterPlugin interface {
Plugin
//尝试让pod在之后的调度周期中能被调度
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}
PreScorePlugin接口
//通知类型扩展点,传入所有通过filter的节点,用于更新内部状态或生成日志、指标
type PreScorePlugin interface {
Plugin
//所有插件必须返回Success,否则pod被拒绝
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}
ScorePlugin接口
//节点分值扩展插件
type ScoreExtensions interface {
//用于标准化节点分值,执行成功后会更新scores
NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
}
//节点分值计算插件
type ScorePlugin interface {
Plugin
//计算每个通过filter的节点分值,分值用于节点排序,所有插件必须返回Success,否则pod被拒绝
Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)
//返回ScoreExtensions接口
ScoreExtensions() ScoreExtensions
}
ReservePlugin接口
//通知类型扩展点,为pod预留资源
type ReservePlugin interface {
Plugin
//当调度器cache更新后调用,如果此方法返回失败,调度器将调用所有Reserve插件的Unreserve方法
Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
//当Reserve调用失败时执行,此方法必须实现幂等性,可能部分插件的Reserve方法未被调用
Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
PremitPlugin接口
//在pod绑定节点前调用
type PremitPlugin interface {
Plugin
//在pod绑定节点、prebind插件前调用,用于阻止或延迟绑定pod,插件返回成功、等待、拒绝,仅当没有其他插件拒绝时才会执行插件等待
Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}
PreBindPlugin接口
//在pod绑定节点前调用
type PreBindPlugin interface {
Plugin
//所有PreBind插件必须返回Success,否则pod将被拒绝,不会执行绑定
PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
BindPlugin接口
//绑定pod到节点
type BindPlugin interface {
Plugin
//绑定pod到节点,绑定函数会按照配置顺序调用,插件可以选择是否处理pod绑定,不处理需要返回Skip,处理了后续函数会跳过,如果插件返回Error,pod将被拒绝不会绑定
Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
PostBindPlugin接口
//通知类型扩展点,pod成功绑定节点后调用
type PostBindPlugin interface {
Plugin
//pod成功绑定节点后调用,用于状态清理等后处理
PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}
4.2 内置插件
(1)SelectorSpread
用于将同一控制器管理的多个pod分散在不同拓扑域。
//插件实现的扩展点:Score(包含NormalizeScore)、PreScore
type SelectorSpread struct {
sharedLister framework.SharedLister
services corelisters.ServiceLister
replicationControllers corelisters.ReplicationControllerLister
replicaSets appslisters.ReplicaSetLister
statefulSets appslisters.StatefulSetLister
}
(2)ImageLocality
用于将pod调度到已存在镜像的node上。
//插件实现的扩展点:Score
type ImageLocality struct {
handle framework.Handle
}
(3)TaintToleration
检查pod是否容忍node上的taints。
//插件实现的扩展点:Filter、PreScore、Score(包含NormalizeScore)
type TaintToleration struct {
handle framework.Handle
}
(4)NodeName
检查pod指定的node name是否匹配当前node。
//插件实现的扩展点:Filter
type NodeName struct {}
(5)NodePorts
检查node是否满足pod请求的port。
//插件实现的扩展点:PreFilter、Filter
type NodePorts struct{}
(6)NodeAffinity
检查pod的node selector是否匹配node label。
//插件实现的扩展点:PreFilter、Filter、PreScore、Score(包含NormalizeScore)
type NodeAffinity struct {
handle framework.Handle
addedNodeSelector *nodeaffinity.NodeSelector
addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms
}
(7)PodTopologySpread
确保满足pod的topologySpreadConstraints条件。
type PodTopologySpread struct {
systemDefaulted bool
parallelizer parallelize.Parallelizer
defaultConstraints []v1.TopologySpreadConstraint
sharedLister framework.SharedLister
services corelisters.ServiceLister
replicationCtrls corelisters.ReplicationControllerLister
replicaSets appslisters.ReplicaSetLister
statefulSets appslisters.StatefulSetLister
enableMinDomainsInPodTopologySpread bool
}
(8)NodeUnschedulable
过滤掉node.Spec.Unschedulable=true的节点,除非pod设置容忍污点。
//插件实现的扩展点:Filter
type NodeUnschedulable struct {}
(9)Fit
检查node是否有足够的资源。
//插件实现的扩展点:PreFilter、Filter、Score
type Fit struct {
ignoredResources sets.String
ignoredResourceGroups sets.String
handle framework.Handle
resourceAllocationScorer
}
(10)BalancedAllocation
用于平衡资源使用率,插件分别计算cpu、内存请求量和node上可分配量比率,并且基于该比值接近程度计算节点优先级。
//插件实现的扩展点:Score
type BalancedAllocation struct {
handle framework.Handle
resourceAllocationScorer
}
(11)VolumeBinding
插件用于绑定pod volume,在Filter阶段创建cache,并在Reserve、PreBind阶段使用。
//插件实现的扩展点:PreFilter、Filter、Score、Reserve、PreBind、Unreserve
type VolumeBinding struct {
Binder SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
scorer volumeCapacityScorer
fts feature.Features
}
(12)VolumeRestrictions
插件检查volume的限制。
//插件实现的扩展点:PreFilter、Filter
type VolumeRestrictions struct {
parallelizer parallelize.Parallelizer
pvcLister corelisters.PersistentVolumeClaimLister
nodeInfoLister framework.SharedLister
enableReadWriteOncePod bool
}
(13)VolumeZone
插件检查volume zone
//插件实现的扩展点:Filter
type VolumeZone struct {
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
}
(14)CSILimits
插件检查节点volume限制
//插件实现的扩展点:Filter
type CSILimits struct {
csiNodeLister storagelisters.CSINodeLister
pvLister corelisters.PersistentVolumeLister
pvcLister corelisters.PersistentVolumeClaimLister
scLister storagelisters.StorageClassLister
randomVolumeIDPrefix string
translator InTreeToCSITranslator
}
(15)InterPodAffinity
插件检查pod亲和性。
type InterPodAffinity struct {
parallelizer parallelize.Parallelizer
args config.InterPodAffinityArgs
sharedLister framework.SharedLister
nsLister listersv1.NamespaceLister
}
(16)PrioritySort
插件根据pod的spec.Priority进行排序。
type PrioritySort struct{}
(17)DefaultBinder
插件使用k8s client绑定pod到node。
//插件实现扩展点:Bind
type DefaultBinder struct {
handle framework.Handle
}
(18)DefaultPreemption
插件实现pod默认抢占。
//插件实现扩展点:PostFilter
type DefaultPreemption struct {
fh framework.Handle
args config.DefaultPreemptionArgs
podLister corelisters.PodLister
pdbLister policylisters.PodDisruptionBudgetLister
}
5、调度队列
5.1 数据结构
调度队列接口,存储等待调度的pod
type SchedulingQueue interface {
Add(pod *v1.Pod) error
Activate(pods map[string]*v1.Pod)
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
SchedulingCycle() int64
Pop() (*framework.QueuedPodInfo, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
PendingPods() []*v1.Pod
Close()
Run()
}
优先级队列,实现调度队列接口
type PriorityQueue struct {
framework.PodNominator
stop chan struct{}
clock util.Clock
//pod初始化的backoff时长
podInitialBackoffDuration time.Duration
//pod最大backoff时长
podMaxBackoffDuration time.Duration
//pod在不可调度队列中最大时长
podMaxInUnschedulablePodsDuration time.Duration
lock sync.RWMutex
cond sync.Cond
//优先级调度队列,等待调度的pod,队列头优先级最高,优先级比较函数为Framework的QueueSortFunc
activeQ *heap.Heap
//优先级backoff队列,接收来自unschedulablePods的pod,完成退避时间后移到activeQ队列
podBackoffQ *heap.Heap
//不可调度队列
unschedulablePods *UnschedulablePods
//调度周期序列号,每次弹出pod会增加
schedulingCycle int64
//当收到将pod从unschedulable队列移动到backoff或active队列时,记录当前调度序列号
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.String
closed bool
nsLister listersv1.NamespaceLister
}
5.2 流程图
调度器后台会启动两个定时任务,将退避队列、不可调度队列的pod移动到调度队列:
flushBackoffQCompleted:每1秒执行一次,将完成退避时间的所有pod移动到activeQ中。
flushUnschedulablePodsLeftover:每30秒执行一次,将unschedulablePods中超过阈值时间(默认5min)的pod移动到podBackoffQ或activeQ。
6、自定义调度器
接下来我们开发一个自定义调度器,根据pod的label中使用hintNode来选择调度节点,代码结构如下:
(1)调度器程序
main.go,启动调度器,注册自定义插件MyPlugin
package main
import (
"fmt"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"myscheduler/pkg/plugins"
"os"
)
func main() {
command := app.NewSchedulerCommand(
app.WithPlugin("MyPlugin", plugins.NewMyPlugin),
)
if err := command.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
myplugin.go,在Filter扩展点使用pod label中设置的hintNode字段选择调度的节点
package plugins
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
type MyPlugin struct {
}
// 确保MyPlugin实现了FilterPlugin接口
var _ framework.FilterPlugin = &MyPlugin{}
func (plugin *MyPlugin) Name() string {
return "MyPlugin"
}
func (plugin *MyPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
fmt.Println("pod labels:", pod.Labels)
hintNode, ok := pod.Labels["hintNode"]
if !ok {
return framework.NewStatus(framework.UnschedulableAndUnresolvable)
}
if nodeInfo.Node().Name == hintNode {
return framework.NewStatus(framework.Success)
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable)
}
func NewMyPlugin(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
return &MyPlugin{}, nil
}
(2)构建镜像
Dockerfile
FROM golang:1.24.6-alpine AS builder
RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
COPY . /data/app
WORKDIR /data/app
RUN mkdir -p /data/app/bin && \
export GOPROXY=https://goproxy.cn,direct && \
go mod tidy && \
go build -ldflags="-s -w" -o /data/app/bin/main ./cmd/main.go
FROM alpine:3.16
RUN set -eux && sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories
RUN apk add tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone \
&& apk del tzdata
WORKDIR /data/app
COPY --from=builder /data/app/bin /data/app
(3)调度器部署
clusterrole.yaml,创建调度器角色,授予相应权限
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: myscheduler-role
rules:
- apiGroups:
- ""
resources:
- namespaces
verbs:
- list
- get
- watch
- apiGroups:
- ""
resources:
- endpoints
- events
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch
- update
- apiGroups:
- ""
resources:
- bindings
- pods/binding
verbs:
- create
- apiGroups:
- ""
resources:
- pods/status
verbs:
- patch
- update
- apiGroups:
- ""
resources:
- replicationcontrollers
- services
verbs:
- get
- list
- watch
- apiGroups:
- apps
- extensions
resources:
- replicasets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
- persistentvolumes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
- csinodes
- csidrivers
- csistoragecapacities
verbs:
- get
- list
- watch
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- create
- get
- list
- update
- apiGroups:
- "events.k8s.io"
resources:
- events
verbs:
- create
- patch
- update
serviceaccount.yaml,创建调度器账号
apiVersion: v1 kind: ServiceAccount metadata: name: myscheduler-account namespace: kube-system
clusterrolebinding.yaml,绑定账号、角色
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: myscheduler-rolebinding
namespace: kube-system
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: myscheduler-role
subjects:
- kind: ServiceAccount
name: myscheduler-account
namespace: kube-system
configmap.yaml,创建调度器配置文件,kind为KubeSchedulerConfiguration
apiVersion: v1
kind: ConfigMap
metadata:
name: myscheduler-configmap
namespace: kube-system
data:
scheduler.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1beta3
kind: KubeSchedulerConfiguration
parallelism: 16
leaderElection:
leaderElect: true
resourceLock: leases
resourceNamespace: kube-system
resourceName: myscheduler
profiles:
- schedulerName: myscheduler
plugins:
multiPoint:
enabled:
- name: MyPlugin
deployment.yaml,创建调度器Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: myscheduler
namespace: kube-system
labels:
app: myscheduler
spec:
replicas: 1
selector:
matchLabels:
app: myscheduler
template:
metadata:
labels:
app: myscheduler
spec:
serviceAccountName: myscheduler-account
priorityClassName: system-cluster-critical
volumes:
- name: config
configMap:
name: myscheduler-configmap
containers:
- name: myscheduler
image: docker.io/library/myscheduler:1.0
imagePullPolicy: IfNotPresent
args:
- ./main
- --config=/data/app/config/scheduler.yaml
resources:
requests:
cpu: "50m"
volumeMounts:
- name: config
mountPath: /data/app/config
调度器成功启动:
(4)测试验证
nginx.yaml,启动测试nginx服务,label中使用hintNode为worker1
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
namespace: default
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
hintNode: worker1
spec:
schedulerName: myscheduler
containers:
- name: nginx
image: docker.io/library/nginx:1.25
imagePullPolicy: IfNotPresent
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: nginx
namespace: default
spec:
type: NodePort
selector:
app: nginx
ports:
- port: 80
targetPort: 80
nodePort: 30200
nginx pod被调度到指定node:






