1、背景
K8S在云原生领域已经成为事实性的任务编排调度标准,在实际开发过程中我们经常会接触到它(除非你仍采用原始的裸机部署应用),随着深入的使用,当K8S提供的原生能力无法满足需求时,就需要进行二次扩展,比如Controller、调度等,本文接下来会介绍Controller原理及如何扩展。
2、Controller原理
K8S采用声明式API方式定义各种资源的预期状态,然后通过一个Reconcile(调谐)过程不断修正状态以达到预期,这个过程就是通过Controller来完成。K8S提供了很多内置Controller,包括:Deployment Controller、ReplicaSet Controller、StatefulSet Controller、Job Controller、Service Controller等。整个Reconcile过程如下:
3、List Watch机制
通过上面介绍,Controller的关键是需要监控资源的状态及变化,K8S中采用List Watch机制来实现,List用于同步当前资源状态,Watch用于监听资源状态变化。可以通过以下方式来初步了解K8S的List Watch。
启动API Server代理:kubectl proxy --port 8080
监听default namespace中pod变化:curl http://localhost:8080/api/v1/namespaces/default/pods?watch=true
结果如下:
通过tcpdump抓包可以看到API Server是采用http chunked分块传输以及long-polling方式实现watch机制:
3.1 long-polling
看到上面,如果我们要开发像API Server一样具备watch机制的服务器要如何实现,具体代码如下:
package main
import (
"net/http"
"fmt"
"time"
)
func poll(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Cache-Control", "no-cache")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming error", http.StatusInternalServerError)
return
}
for i:=0;;i++ {
fmt.Fprintf(w, "data: %d\n", i)
flusher.Flush()
time.Sleep(1*time.Second)
}
}
func main() {
http.HandleFunc("/poll", poll)
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Println("start server error:", err)
}
}
3.2 http watch
通过前面curl请求API Server可以看到只需简单封装下API接口即可实现Controller需要的watch功能,具体代码如下:
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
const apiServer = "http://localhost:8080"
type Pod struct {
Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
CreationTimestamp time.Time `json:"creationTimestamp"`
} `json:"metadata"`
}
type Event struct {
EventType string `json:"type"`
Object Pod `json:"object"`
}
func main() {
client := &http.Client{}
req, err := http.NewRequest("GET", apiServer+"/api/v1/namespaces/default/pods?watch=true", nil)
if err != nil {
fmt.Println(err)
return
}
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
var event Event
decoder := json.NewDecoder(resp.Body)
for {
if err := decoder.Decode(&event); err != nil {
fmt.Println(err)
return
}
fmt.Printf("%s Pod %s \n", event.EventType, event.Object.Metadata.Name)
}
}
3.3 client-go watch
为了简化HTTP调用,K8S提供了client-go库用于实现watch功能,具体代码如下:
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 创建Kubernetes API client
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
fmt.Println(err)
return
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Println(err)
return
}
// 监听pods变化
watcher, err := clientset.CoreV1().Pods("argo").Watch(context.Background(), metav1.ListOptions{})
if err != nil {
fmt.Println(err)
return
}
// 循环处理event
for event := range watcher.ResultChan() {
pod := event.Object.(*corev1.Pod)
switch event.Type {
case watch.Added:
fmt.Printf("Pod %s added\n", pod.Name)
// todo: 调谐逻辑
case watch.Modified:
fmt.Printf("Pod %s modified\n", pod.Name)
// todo: 调谐逻辑
case watch.Deleted:
fmt.Printf("Pod %s deleted\n", pod.Name)
// todo: 调谐逻辑
}
}
}
4、Informer机制
client-go库除了提供上面介绍的watch机制外,还提供了更高层级的Informer机制用于高效查询和监听资源变化,接下来介绍下Informer机制原理及使用。
4.1 Informer原理
client-go通过ListWatch方式封装了Http请求,并通过内部缓存机制减少API调用,使用Informer机制开发的自定义controller的执行流程如下:
主要执行流程如下:
- Reflector调用API Server接口拉取待处理资源对象的全量列表和监听变更事件,并将列表或变更事件添加到队列DeltaFIFO中;
- 另一协程定期从事件队列中获取消息,然后根据消息类型分发到自定义处理函数ResourceEventHandler,另一方面会将获取的消息进行缓存用于查询对象。
4.1.1 结构定义
主要结构定义如下:
//实现ListWatch
type Reflector struct {
...
expectedTypereflect.Type //期望处理的类型,如v1.Pod
storeStore //消息队列,用于存储watch获取到的变更对象,实际类型为DeltaFIFO
listerWatcherListerWatcher //用于执行ListWatch机制的函数
...
}
//DeltaFIFO用于存储变更对象,是消费生产队列,Reflector是生产者,消费者调用Pop函数
type DeltaFIFO struct {
...
items map[string]Deltas //所有变更对象,键为资源对象的key,默认为namespace+对象名,值为变更消息列表
queue []string //按照FIFO顺序的资源对象key列表
keyFunc KeyFunc //获取资源对象的key函数,默认为namespace+对象名
knownObjects KeyListerGetter //对象缓存区Indexer,实例化cache类
...
}
//Indexer的具体实现类
type cache struct {
cacheStorage ThreadSafeStore //线程安全缓存区,实例化threadSafeMap类
keyFunc KeyFunc //获取资源对象的key函数
}
//ThreadSafeStore的具体实现类
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} //对象缓存区,键为对象key,值为对象
index *storeIndex //
}
主要结构之间的引用关系如下:
4.1.2 源码分析
创建Informer、Indexer对象流程如下:
cache.NewIndexerInformer() |---tools/cache/controller.go/NewIndexerInformer() |---tools/cache/store.go/NewIndexer() //创建对象缓存区indexer,实际实例化的是cache对象 |---tools/cache/controller.go/newInformer() //创建controller对象 |---tools/cache/delta_fifo.go/NewDeltaFIFOWithOptions() //创建消息队列DeltaFIFO对象 |---tools/cache/controller.go/New() //实例化controller对象
Informer执行流程如下:
informer.Run() //启动informer运行 |---tools/cache/controller.go/controller.Run() |---tools/cache/reflector.go/NewReflectorWithOptions() //实例化reflector进行ListWatch处理 |---tools/cache/reflector.go/Reflector.Run() //启动协程拉取和监听资源,并往DeltaFIFO队列中增加变更消息,相当于生产者 | |---tools/cache/reflector.go/Reflector.ListAndWatch() | |---tools/cache/reflector.go/Reflector.list() //获取全量列表 | | |---tools/cache/reflector.go/Reflector.syncWith() //将列表同步到DeltaFIFO队列和对象缓存区indexer | |---tools/cache/reflector.go/Reflector.watch() //监听变更事件,事件类型:watch.Added、watch.Modified、watch.Deleted | |---tools/cache/reflector.go/watchHandler() //事件处理函数,将对应消息加入DeltaFIFO队列 | |---tools/cache/delta_fifo.go/DeltaFIFO.Add() | |---tools/cache/delta_fifo.go/DeltaFIFO.Update() | |---tools/cache/delta_fifo.go/DeltaFIFO.Delete() |---tools/cache/controller.go/controller.processLoop() //循环处理DeltaFIFO消息队列,相当于消费者 |---tools/cache/delta_fifo.go/DeltaFIFO.Pop() //从DeltaFIFO中pop元素进行处理 |---tools/cache/controller.go/processDeltas() //处理变更消息,根据消息类型,更新缓存区数据及调用传入的外部处理函数
4.2 Informer使用
使用Informer实现controller控制default命名空间的pod:
package main
import (
"fmt"
"time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
// 自定义controller
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}
// 新建controller
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}
// 处理队列中下一key
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
//处理key
err := c.syncToStdout(key.(string))
//处理错误
c.handleErr(err, key)
return true
}
// 输出到stdout
func (c *Controller) syncToStdout(key string) error {
//通过key查询缓冲区中资源对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}
return nil
}
// 处理错误
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
//判断该key是否需要重试
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
//重入队列
c.queue.AddRateLimited(key)
return
}
//达到最大重试次数,错误
c.queue.Forget(key)
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
// 运行controller
func (c *Controller) Run(workers int, stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
//启动Informer ListWatch
go c.informer.Run(stopCh)
//等待Informer完成同步
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
//启动controller多实例数
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod controller")
}
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
func main() {
//创建配置
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
klog.Fatal(err)
}
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
//创建pod ListWatch
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
//创建用户区消息队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
//创建indexer、informer对象,indexer用于缓存资源对象,informer用于监听对象变更消息
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
controller := NewController(queue, indexer, informer)
//启动controller
stop := make(chan struct{})
defer close(stop)
go controller.Run(1, stop)
select {}
}
5、自定义controller
client-go中只提供了内置资源类型相关的API接口,因此默认的Informer机制只适用于内置资源,针对自定义资源则需要通过code-generator工具生成对应的Informer需要的框架代码。
使用code-generator生成框架代码步骤:
(1)创建一个项目democontroller
mkdir democontroller && cd democontroller go mod init democontroller
(2)定义CRD结构
创建CRD group为democontroller,版本为v1alpha1
mkdir -p pkg/apis/democontroller/v1alpha1 && cd pkg/apis/democontroller/v1alpha1
创建doc.go,为整个package中所有结构生成DeepCopy方法
// +k8s:deepcopy-gen=package // +groupName=democontroller // Package v1alpha1 is the v1alpha1 version of the API. package v1alpha1 // import "demo-controller/pkg/apis/samplecontroller/v1alpha1"
创建types.go,定义CRD对应的go结构
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Demo is a specification for a Demo resource
type Demo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DemoSpec `json:"spec"`
Status DemoStatus `json:"status"`
}
// DemoSpec is the spec for a Demo resource
type DemoSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}
// DemoStatus is the status for a Demo resource
type DemoStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// DemoList is a list of Demo resources
type DemoList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Demo `json:"items"`
}
创建register.go,将上面定义的go结构注册到Scheme
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion is group version used to register these objects
// 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "democontroller", Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
//注意,添加了Demo/Demolist 两个资源到scheme
scheme.AddKnownTypes(SchemeGroupVersion,
&Demo{},
&DemoList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
(3)创建代码生成脚本
创建脚本目录
mkdir hack && cd hack
创建boilerplate.go.txt,用于提供代码声明模板
/* Copyright 2024 jemuelmiao Licensed under the Apache License */
创建tools.go,引入依赖库
//go:build tools // +build tools // This package imports things required by build scripts, to force `go mod` to see them as dependencies package tools import _ "k8s.io/code-generator"
创建代码生成脚本update-codegen.sh
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${SCRIPT_ROOT}/vendor/k8s.io/code-generator
source "${CODEGEN_PKG}/kube_codegen.sh"
THIS_PKG="democontroller"
kube::codegen::gen_helpers \
--boilerplate "${SCRIPT_ROOT}/hack/boilerplate.go.txt" \
"${SCRIPT_ROOT}/pkg/apis"
kube::codegen::gen_client \
--with-watch \
--output-dir "${SCRIPT_ROOT}/pkg/generated" \
--output-pkg "${THIS_PKG}/pkg/generated" \
"${SCRIPT_ROOT}/pkg/apis"
(4)生成框架代码
go mod tidy go mod vendor chmod -R 777 vendor cd hack && chmod 755 update-codegen.sh && ./update-codegen.sh
框架代码生成前:
框架代码生成后:
(5)创建main.go验证
创建cmd/main.go
package main
import (
"democontroller/pkg/generated/clientset/versioned"
"democontroller/pkg/generated/clientset/versioned/typed/democontroller/v1alpha1"
"democontroller/pkg/generated/informers/externalversions"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"time"
"context"
)
func main() {
config, e := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if e != nil {
panic(e.Error())
}
//使用v1alpha1
client, e := v1alpha1.NewForConfig(config)
if e != nil {
panic(e.Error())
}
demoList, e := client.Demos("test").List(context.TODO(), metav1.ListOptions{})
fmt.Println(demoList, e)
//使用versioned
clientset, e := versioned.NewForConfig(config)
factory := externalversions.NewSharedInformerFactory(clientset, 30*time.Second)
demo, e := factory.Democontroller().V1alpha1().Demos().Lister().Demos("test").Get("test")
if e != nil {
panic(e.Error())
}
fmt.Println(demo, e)
}
(6)编译运行
go mod tidy && go mod vendor go build cmd/main.go
6、kubebuilder
上面通过code-generator生成自定义controller稍显繁琐,有些工具如kubebuilder可以很方便的生成框架脚手架代码,简化了自定义controller的开发。
使用kubebuilder生成脚手架代码步骤:
(1)前提
- go version v1.20.0+
- docker version 17.03+
- kubectl version v1.11.3+
- Access to a Kubernetes v1.11.3+ cluster
(2)安装kubebuilder
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)" chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
(3)创建项目
mkdir guestbook && cd guestbook kubebuilder init --project-name guestbook --domain com --repo com/guestbook
- repo:go module名
- domain:是自定义CRD group后缀
- project-name:项目名,用于label名或kustomization.yaml中名称前缀等
(4)创建API
kubebuilder create api --group webapp --version v1 --kind Guestbook
- group:CRD group,上面创建的apiVersion为webapp.com/v1
(5)修改CRD go结构
生成的CRD对应的go结构在文件api/v1/guestbook_types.go中,根据需要调整该结构内容
(6)生成CRD yaml
每次修改CRD go结构后,执行make manifests重新生成CRD yaml文件,该文件在config/crd/bases/webapp.com_guestbooks.yaml。
(7)安装CRD到k8s集群
- 安装CRD:make install
- 卸载CRD:make uninstall
(8)部署controller
- 构建镜像:make docker-build
- 推送镜像:make docker-push
- 部署镜像:make deploy
- 卸载镜像:make undeploy
kubebuilder生成的项目结构如下:
构建镜像如下:









