1、背景
K8S在云原生领域已经成为事实性的任务编排调度标准,在实际开发过程中我们经常会接触到它(除非你仍采用原始的裸机部署应用),随着深入的使用,当K8S提供的原生能力无法满足需求时,就需要进行二次扩展,比如Controller、调度等,本文接下来会介绍Controller原理及如何扩展。
2、Controller原理
K8S采用声明式API方式定义各种资源的预期状态,然后通过一个Reconcile(调谐)过程不断修正状态以达到预期,这个过程就是通过Controller来完成。K8S提供了很多内置Controller,包括:Deployment Controller、ReplicaSet Controller、StatefulSet Controller、Job Controller、Service Controller等。整个Reconcile过程如下:
3、List Watch机制
通过上面介绍,Controller的关键是需要监控资源的状态及变化,K8S中采用List Watch机制来实现,List用于同步当前资源状态,Watch用于监听资源状态变化。可以通过以下方式来初步了解K8S的List Watch。
启动API Server代理:kubectl proxy --port 8080
监听default namespace中pod变化:curl http://localhost:8080/api/v1/namespaces/default/pods?watch=true
结果如下:
通过tcpdump抓包可以看到API Server是采用http chunked分块传输以及long-polling方式实现watch机制:
3.1 long-polling
看到上面,如果我们要开发像API Server一样具备watch机制的服务器要如何实现,具体代码如下:
package main import ( "net/http" "fmt" "time" ) func poll(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Connection", "keep-alive") w.Header().Set("Cache-Control", "no-cache") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming error", http.StatusInternalServerError) return } for i:=0;;i++ { fmt.Fprintf(w, "data: %d\n", i) flusher.Flush() time.Sleep(1*time.Second) } } func main() { http.HandleFunc("/poll", poll) if err := http.ListenAndServe(":8080", nil); err != nil { fmt.Println("start server error:", err) } }
3.2 http watch
通过前面curl请求API Server可以看到只需简单封装下API接口即可实现Controller需要的watch功能,具体代码如下:
package main import ( "encoding/json" "fmt" "net/http" "time" ) const apiServer = "http://localhost:8080" type Pod struct { Metadata struct { Name string `json:"name"` Namespace string `json:"namespace"` CreationTimestamp time.Time `json:"creationTimestamp"` } `json:"metadata"` } type Event struct { EventType string `json:"type"` Object Pod `json:"object"` } func main() { client := &http.Client{} req, err := http.NewRequest("GET", apiServer+"/api/v1/namespaces/default/pods?watch=true", nil) if err != nil { fmt.Println(err) return } resp, err := client.Do(req) if err != nil { fmt.Println(err) return } defer resp.Body.Close() var event Event decoder := json.NewDecoder(resp.Body) for { if err := decoder.Decode(&event); err != nil { fmt.Println(err) return } fmt.Printf("%s Pod %s \n", event.EventType, event.Object.Metadata.Name) } }
3.3 client-go watch
为了简化HTTP调用,K8S提供了client-go库用于实现watch功能,具体代码如下:
package main import ( "context" "fmt" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) func main() { // 创建Kubernetes API client config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { fmt.Println(err) return } clientset, err := kubernetes.NewForConfig(config) if err != nil { fmt.Println(err) return } // 监听pods变化 watcher, err := clientset.CoreV1().Pods("argo").Watch(context.Background(), metav1.ListOptions{}) if err != nil { fmt.Println(err) return } // 循环处理event for event := range watcher.ResultChan() { pod := event.Object.(*corev1.Pod) switch event.Type { case watch.Added: fmt.Printf("Pod %s added\n", pod.Name) // todo: 调谐逻辑 case watch.Modified: fmt.Printf("Pod %s modified\n", pod.Name) // todo: 调谐逻辑 case watch.Deleted: fmt.Printf("Pod %s deleted\n", pod.Name) // todo: 调谐逻辑 } } }
4、Informer机制
client-go库除了提供上面介绍的watch机制外,还提供了更高层级的Informer机制用于高效查询和监听资源变化,接下来介绍下Informer机制原理及使用。
4.1 Informer原理
client-go通过ListWatch方式封装了Http请求,并通过内部缓存机制减少API调用,使用Informer机制开发的自定义controller的执行流程如下:
主要执行流程如下:
- Reflector调用API Server接口拉取待处理资源对象的全量列表和监听变更事件,并将列表或变更事件添加到队列DeltaFIFO中;
- 另一协程定期从事件队列中获取消息,然后根据消息类型分发到自定义处理函数ResourceEventHandler,另一方面会将获取的消息进行缓存用于查询对象。
4.1.1 结构定义
主要结构定义如下:
//实现ListWatch type Reflector struct { ... expectedTypereflect.Type //期望处理的类型,如v1.Pod storeStore //消息队列,用于存储watch获取到的变更对象,实际类型为DeltaFIFO listerWatcherListerWatcher //用于执行ListWatch机制的函数 ... } //DeltaFIFO用于存储变更对象,是消费生产队列,Reflector是生产者,消费者调用Pop函数 type DeltaFIFO struct { ... items map[string]Deltas //所有变更对象,键为资源对象的key,默认为namespace+对象名,值为变更消息列表 queue []string //按照FIFO顺序的资源对象key列表 keyFunc KeyFunc //获取资源对象的key函数,默认为namespace+对象名 knownObjects KeyListerGetter //对象缓存区Indexer,实例化cache类 ... } //Indexer的具体实现类 type cache struct { cacheStorage ThreadSafeStore //线程安全缓存区,实例化threadSafeMap类 keyFunc KeyFunc //获取资源对象的key函数 } //ThreadSafeStore的具体实现类 type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} //对象缓存区,键为对象key,值为对象 index *storeIndex // }
主要结构之间的引用关系如下:
4.1.2 源码分析
创建Informer、Indexer对象流程如下:
cache.NewIndexerInformer() |---tools/cache/controller.go/NewIndexerInformer() |---tools/cache/store.go/NewIndexer() //创建对象缓存区indexer,实际实例化的是cache对象 |---tools/cache/controller.go/newInformer() //创建controller对象 |---tools/cache/delta_fifo.go/NewDeltaFIFOWithOptions() //创建消息队列DeltaFIFO对象 |---tools/cache/controller.go/New() //实例化controller对象
Informer执行流程如下:
informer.Run() //启动informer运行 |---tools/cache/controller.go/controller.Run() |---tools/cache/reflector.go/NewReflectorWithOptions() //实例化reflector进行ListWatch处理 |---tools/cache/reflector.go/Reflector.Run() //启动协程拉取和监听资源,并往DeltaFIFO队列中增加变更消息,相当于生产者 | |---tools/cache/reflector.go/Reflector.ListAndWatch() | |---tools/cache/reflector.go/Reflector.list() //获取全量列表 | | |---tools/cache/reflector.go/Reflector.syncWith() //将列表同步到DeltaFIFO队列和对象缓存区indexer | |---tools/cache/reflector.go/Reflector.watch() //监听变更事件,事件类型:watch.Added、watch.Modified、watch.Deleted | |---tools/cache/reflector.go/watchHandler() //事件处理函数,将对应消息加入DeltaFIFO队列 | |---tools/cache/delta_fifo.go/DeltaFIFO.Add() | |---tools/cache/delta_fifo.go/DeltaFIFO.Update() | |---tools/cache/delta_fifo.go/DeltaFIFO.Delete() |---tools/cache/controller.go/controller.processLoop() //循环处理DeltaFIFO消息队列,相当于消费者 |---tools/cache/delta_fifo.go/DeltaFIFO.Pop() //从DeltaFIFO中pop元素进行处理 |---tools/cache/controller.go/processDeltas() //处理变更消息,根据消息类型,更新缓存区数据及调用传入的外部处理函数
4.2 Informer使用
使用Informer实现controller控制default命名空间的pod:
package main import ( "fmt" "time" "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" ) // 自定义controller type Controller struct { indexer cache.Indexer queue workqueue.RateLimitingInterface informer cache.Controller } // 新建controller func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller { return &Controller{ informer: informer, indexer: indexer, queue: queue, } } // 处理队列中下一key func (c *Controller) processNextItem() bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) //处理key err := c.syncToStdout(key.(string)) //处理错误 c.handleErr(err, key) return true } // 输出到stdout func (c *Controller) syncToStdout(key string) error { //通过key查询缓冲区中资源对象 obj, exists, err := c.indexer.GetByKey(key) if err != nil { klog.Errorf("Fetching object with key %s from store failed with %v", key, err) return err } if !exists { fmt.Printf("Pod %s does not exist anymore\n", key) } else { fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName()) } return nil } // 处理错误 func (c *Controller) handleErr(err error, key interface{}) { if err == nil { c.queue.Forget(key) return } //判断该key是否需要重试 if c.queue.NumRequeues(key) < 5 { klog.Infof("Error syncing pod %v: %v", key, err) //重入队列 c.queue.AddRateLimited(key) return } //达到最大重试次数,错误 c.queue.Forget(key) runtime.HandleError(err) klog.Infof("Dropping pod %q out of the queue: %v", key, err) } // 运行controller func (c *Controller) Run(workers int, stopCh chan struct{}) { defer runtime.HandleCrash() defer c.queue.ShutDown() klog.Info("Starting Pod controller") //启动Informer ListWatch go c.informer.Run(stopCh) //等待Informer完成同步 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } //启动controller多实例数 for i := 0; i < workers; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } <-stopCh klog.Info("Stopping Pod controller") } func (c *Controller) runWorker() { for c.processNextItem() { } } func main() { //创建配置 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { klog.Fatal(err) } //创建clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatal(err) } //创建pod ListWatch podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) //创建用户区消息队列 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) //创建indexer、informer对象,indexer用于缓存资源对象,informer用于监听对象变更消息 indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, UpdateFunc: func(old interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { queue.Add(key) } }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, }, cache.Indexers{}) controller := NewController(queue, indexer, informer) //启动controller stop := make(chan struct{}) defer close(stop) go controller.Run(1, stop) select {} }
5、自定义controller
client-go中只提供了内置资源类型相关的API接口,因此默认的Informer机制只适用于内置资源,针对自定义资源则需要通过code-generator工具生成对应的Informer需要的框架代码。
使用code-generator生成框架代码步骤:
(1)创建一个项目democontroller
mkdir democontroller && cd democontroller go mod init democontroller
(2)定义CRD结构
创建CRD group为democontroller,版本为v1alpha1
mkdir -p pkg/apis/democontroller/v1alpha1 && cd pkg/apis/democontroller/v1alpha1
创建doc.go,为整个package中所有结构生成DeepCopy方法
// +k8s:deepcopy-gen=package // +groupName=democontroller // Package v1alpha1 is the v1alpha1 version of the API. package v1alpha1 // import "demo-controller/pkg/apis/samplecontroller/v1alpha1"
创建types.go,定义CRD对应的go结构
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // Demo is a specification for a Demo resource type Demo struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec DemoSpec `json:"spec"` Status DemoStatus `json:"status"` } // DemoSpec is the spec for a Demo resource type DemoSpec struct { DeploymentName string `json:"deploymentName"` Replicas *int32 `json:"replicas"` } // DemoStatus is the status for a Demo resource type DemoStatus struct { AvailableReplicas int32 `json:"availableReplicas"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // DemoList is a list of Demo resources type DemoList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Demo `json:"items"` }
创建register.go,将上面定义的go结构注册到Scheme
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) // SchemeGroupVersion is group version used to register these objects // 注册自己的自定义资源 var SchemeGroupVersion = schema.GroupVersion{Group: "democontroller", Version: "v1alpha1"} // Kind takes an unqualified kind and returns back a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) // Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error { //注意,添加了Demo/Demolist 两个资源到scheme scheme.AddKnownTypes(SchemeGroupVersion, &Demo{}, &DemoList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
(3)创建代码生成脚本
创建脚本目录
mkdir hack && cd hack
创建boilerplate.go.txt,用于提供代码声明模板
/* Copyright 2024 jemuelmiao Licensed under the Apache License */
创建tools.go,引入依赖库
//go:build tools // +build tools // This package imports things required by build scripts, to force `go mod` to see them as dependencies package tools import _ "k8s.io/code-generator"
创建代码生成脚本update-codegen.sh
#!/usr/bin/env bash set -o errexit set -o nounset set -o pipefail SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. CODEGEN_PKG=${SCRIPT_ROOT}/vendor/k8s.io/code-generator source "${CODEGEN_PKG}/kube_codegen.sh" THIS_PKG="democontroller" kube::codegen::gen_helpers \ --boilerplate "${SCRIPT_ROOT}/hack/boilerplate.go.txt" \ "${SCRIPT_ROOT}/pkg/apis" kube::codegen::gen_client \ --with-watch \ --output-dir "${SCRIPT_ROOT}/pkg/generated" \ --output-pkg "${THIS_PKG}/pkg/generated" \ "${SCRIPT_ROOT}/pkg/apis"
(4)生成框架代码
go mod tidy go mod vendor chmod -R 777 vendor cd hack && chmod 755 update-codegen.sh && ./update-codegen.sh
框架代码生成前:
框架代码生成后:
(5)创建main.go验证
创建cmd/main.go
package main import ( "democontroller/pkg/generated/clientset/versioned" "democontroller/pkg/generated/clientset/versioned/typed/democontroller/v1alpha1" "democontroller/pkg/generated/informers/externalversions" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" "time" "context" ) func main() { config, e := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if e != nil { panic(e.Error()) } //使用v1alpha1 client, e := v1alpha1.NewForConfig(config) if e != nil { panic(e.Error()) } demoList, e := client.Demos("test").List(context.TODO(), metav1.ListOptions{}) fmt.Println(demoList, e) //使用versioned clientset, e := versioned.NewForConfig(config) factory := externalversions.NewSharedInformerFactory(clientset, 30*time.Second) demo, e := factory.Democontroller().V1alpha1().Demos().Lister().Demos("test").Get("test") if e != nil { panic(e.Error()) } fmt.Println(demo, e) }
(6)编译运行
go mod tidy && go mod vendor go build cmd/main.go
6、kubebuilder
上面通过code-generator生成自定义controller稍显繁琐,有些工具如kubebuilder可以很方便的生成框架脚手架代码,简化了自定义controller的开发。
使用kubebuilder生成脚手架代码步骤:
(1)前提
- go version v1.20.0+
- docker version 17.03+
- kubectl version v1.11.3+
- Access to a Kubernetes v1.11.3+ cluster
(2)安装kubebuilder
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)" chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
(3)创建项目
mkdir guestbook && cd guestbook kubebuilder init --project-name guestbook --domain com --repo com/guestbook
- repo:go module名
- domain:是自定义CRD group后缀
- project-name:项目名,用于label名或kustomization.yaml中名称前缀等
(4)创建API
kubebuilder create api --group webapp --version v1 --kind Guestbook
- group:CRD group,上面创建的apiVersion为webapp.com/v1
(5)修改CRD go结构
生成的CRD对应的go结构在文件api/v1/guestbook_types.go中,根据需要调整该结构内容
(6)生成CRD yaml
每次修改CRD go结构后,执行make manifests重新生成CRD yaml文件,该文件在config/crd/bases/webapp.com_guestbooks.yaml。
(7)安装CRD到k8s集群
- 安装CRD:make install
- 卸载CRD:make uninstall
(8)部署controller
- 构建镜像:make docker-build
- 推送镜像:make docker-push
- 部署镜像:make deploy
- 卸载镜像:make undeploy
kubebuilder生成的项目结构如下:
构建镜像如下: