编程技术分享

  • 首页
  1. 首页
  2. K8S
  3. 正文

K8S Controller开发

2024年7月22日 2176点热度 6人点赞 0条评论

1、背景

K8S在云原生领域已经成为事实性的任务编排调度标准,在实际开发过程中我们经常会接触到它(除非你仍采用原始的裸机部署应用),随着深入的使用,当K8S提供的原生能力无法满足需求时,就需要进行二次扩展,比如Controller、调度等,本文接下来会介绍Controller原理及如何扩展。

2、Controller原理

K8S采用声明式API方式定义各种资源的预期状态,然后通过一个Reconcile(调谐)过程不断修正状态以达到预期,这个过程就是通过Controller来完成。K8S提供了很多内置Controller,包括:Deployment Controller、ReplicaSet Controller、StatefulSet Controller、Job Controller、Service Controller等。整个Reconcile过程如下:

3、List Watch机制

通过上面介绍,Controller的关键是需要监控资源的状态及变化,K8S中采用List Watch机制来实现,List用于同步当前资源状态,Watch用于监听资源状态变化。可以通过以下方式来初步了解K8S的List Watch。
启动API Server代理:kubectl proxy --port 8080
监听default namespace中pod变化:curl http://localhost:8080/api/v1/namespaces/default/pods?watch=true
结果如下:
通过tcpdump抓包可以看到API Server是采用http chunked分块传输以及long-polling方式实现watch机制:

3.1 long-polling

看到上面,如果我们要开发像API Server一样具备watch机制的服务器要如何实现,具体代码如下:
package main

import (
        "net/http"
        "fmt"
        "time"
)

func poll(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Header().Set("Connection", "keep-alive")
        w.Header().Set("Cache-Control", "no-cache")

        flusher, ok := w.(http.Flusher)
        if !ok {
                http.Error(w, "Streaming error", http.StatusInternalServerError)
                return
        }
        for i:=0;;i++ {
                fmt.Fprintf(w, "data: %d\n", i)
                flusher.Flush()
                time.Sleep(1*time.Second)
        }
}

func main() {
        http.HandleFunc("/poll", poll)
        if err := http.ListenAndServe(":8080", nil); err != nil {
                fmt.Println("start server error:", err)
        }
}

3.2 http watch

通过前面curl请求API Server可以看到只需简单封装下API接口即可实现Controller需要的watch功能,具体代码如下:
package main

import (
        "encoding/json"
        "fmt"
        "net/http"
        "time"
)

const apiServer = "http://localhost:8080"

type Pod struct {
        Metadata struct {
                Name              string    `json:"name"`
                Namespace         string    `json:"namespace"`
                CreationTimestamp time.Time `json:"creationTimestamp"`
        } `json:"metadata"`
}

type Event struct {
        EventType string `json:"type"`
        Object    Pod    `json:"object"`
}

func main() {
        client := &http.Client{}
        req, err := http.NewRequest("GET", apiServer+"/api/v1/namespaces/default/pods?watch=true", nil)
        if err != nil {
                fmt.Println(err)
                return
        }
        resp, err := client.Do(req)
        if err != nil {
                fmt.Println(err)
                return
        }
        defer resp.Body.Close()

        var event Event
        decoder := json.NewDecoder(resp.Body)

        for {
                if err := decoder.Decode(&event); err != nil {
                        fmt.Println(err)
                        return
                }
                fmt.Printf("%s Pod %s \n", event.EventType, event.Object.Metadata.Name)
        }
}

3.3 client-go watch

为了简化HTTP调用,K8S提供了client-go库用于实现watch功能,具体代码如下:
package main

import (
        "context"
        "fmt"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/watch"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/clientcmd"
)

func main() {
        // 创建Kubernetes API client
        config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
        if err != nil {
                fmt.Println(err)
                return
        }
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
                fmt.Println(err)
                return
        }

        // 监听pods变化
        watcher, err := clientset.CoreV1().Pods("argo").Watch(context.Background(), metav1.ListOptions{})
        if err != nil {
                fmt.Println(err)
                return
        }

        // 循环处理event
        for event := range watcher.ResultChan() {
                pod := event.Object.(*corev1.Pod)
                switch event.Type {
                case watch.Added:
                        fmt.Printf("Pod %s added\n", pod.Name)
                        // todo: 调谐逻辑
                case watch.Modified:
                        fmt.Printf("Pod %s modified\n", pod.Name)
                        // todo: 调谐逻辑
                case watch.Deleted:
                        fmt.Printf("Pod %s deleted\n", pod.Name)
                        // todo: 调谐逻辑
                }
        }
}

4、Informer机制

client-go库除了提供上面介绍的watch机制外,还提供了更高层级的Informer机制用于高效查询和监听资源变化,接下来介绍下Informer机制原理及使用。

4.1 Informer原理

client-go通过ListWatch方式封装了Http请求,并通过内部缓存机制减少API调用,使用Informer机制开发的自定义controller的执行流程如下:
主要执行流程如下:
  1. Reflector调用API Server接口拉取待处理资源对象的全量列表和监听变更事件,并将列表或变更事件添加到队列DeltaFIFO中;
  2. 另一协程定期从事件队列中获取消息,然后根据消息类型分发到自定义处理函数ResourceEventHandler,另一方面会将获取的消息进行缓存用于查询对象。

4.1.1 结构定义

主要结构定义如下:
//实现ListWatch
type Reflector struct {
    ...
    expectedTypereflect.Type //期望处理的类型,如v1.Pod
    storeStore //消息队列,用于存储watch获取到的变更对象,实际类型为DeltaFIFO
    listerWatcherListerWatcher //用于执行ListWatch机制的函数
    ...
}

//DeltaFIFO用于存储变更对象,是消费生产队列,Reflector是生产者,消费者调用Pop函数
type DeltaFIFO struct {
    ...
    items map[string]Deltas //所有变更对象,键为资源对象的key,默认为namespace+对象名,值为变更消息列表
    queue []string //按照FIFO顺序的资源对象key列表
    keyFunc KeyFunc //获取资源对象的key函数,默认为namespace+对象名
    knownObjects KeyListerGetter //对象缓存区Indexer,实例化cache类
    ...
}

//Indexer的具体实现类
type cache struct {
    cacheStorage ThreadSafeStore //线程安全缓存区,实例化threadSafeMap类
    keyFunc KeyFunc //获取资源对象的key函数
}

//ThreadSafeStore的具体实现类
type threadSafeMap struct {
    lock sync.RWMutex
    items map[string]interface{} //对象缓存区,键为对象key,值为对象
    index *storeIndex //
}
主要结构之间的引用关系如下:

4.1.2 源码分析

创建Informer、Indexer对象流程如下:
cache.NewIndexerInformer()
|---tools/cache/controller.go/NewIndexerInformer()
    |---tools/cache/store.go/NewIndexer() //创建对象缓存区indexer,实际实例化的是cache对象
    |---tools/cache/controller.go/newInformer() //创建controller对象
        |---tools/cache/delta_fifo.go/NewDeltaFIFOWithOptions() //创建消息队列DeltaFIFO对象
        |---tools/cache/controller.go/New() //实例化controller对象
Informer执行流程如下:
informer.Run() //启动informer运行
|---tools/cache/controller.go/controller.Run()
    |---tools/cache/reflector.go/NewReflectorWithOptions() //实例化reflector进行ListWatch处理
    |---tools/cache/reflector.go/Reflector.Run() //启动协程拉取和监听资源,并往DeltaFIFO队列中增加变更消息,相当于生产者
    |   |---tools/cache/reflector.go/Reflector.ListAndWatch()
    |       |---tools/cache/reflector.go/Reflector.list() //获取全量列表
    |       |   |---tools/cache/reflector.go/Reflector.syncWith() //将列表同步到DeltaFIFO队列和对象缓存区indexer
    |       |---tools/cache/reflector.go/Reflector.watch() //监听变更事件,事件类型:watch.Added、watch.Modified、watch.Deleted
    |           |---tools/cache/reflector.go/watchHandler() //事件处理函数,将对应消息加入DeltaFIFO队列
    |               |---tools/cache/delta_fifo.go/DeltaFIFO.Add()
    |               |---tools/cache/delta_fifo.go/DeltaFIFO.Update()
    |               |---tools/cache/delta_fifo.go/DeltaFIFO.Delete()
    |---tools/cache/controller.go/controller.processLoop() //循环处理DeltaFIFO消息队列,相当于消费者
        |---tools/cache/delta_fifo.go/DeltaFIFO.Pop() //从DeltaFIFO中pop元素进行处理
            |---tools/cache/controller.go/processDeltas() //处理变更消息,根据消息类型,更新缓存区数据及调用传入的外部处理函数

4.2 Informer使用

使用Informer实现controller控制default命名空间的pod:
package main

import (
    "fmt"
    "time"

    "k8s.io/klog/v2"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

// 自定义controller
type Controller struct {
    indexer  cache.Indexer
    queue    workqueue.RateLimitingInterface
    informer cache.Controller
}

// 新建controller
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
    return &Controller{
        informer: informer,
        indexer:  indexer,
        queue:    queue,
    }
}

// 处理队列中下一key
func (c *Controller) processNextItem() bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)
    //处理key
    err := c.syncToStdout(key.(string))
    //处理错误
    c.handleErr(err, key)
    return true
}

// 输出到stdout
func (c *Controller) syncToStdout(key string) error {
    //通过key查询缓冲区中资源对象
    obj, exists, err := c.indexer.GetByKey(key)
    if err != nil {
        klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }

    if !exists {
        fmt.Printf("Pod %s does not exist anymore\n", key)
    } else {
        fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
    }
    return nil
}

// 处理错误
func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
        c.queue.Forget(key)
        return
    }
    //判断该key是否需要重试
    if c.queue.NumRequeues(key) < 5 {
        klog.Infof("Error syncing pod %v: %v", key, err)
        //重入队列
        c.queue.AddRateLimited(key)
        return
    }
    //达到最大重试次数,错误
    c.queue.Forget(key)
    runtime.HandleError(err)
    klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

// 运行controller
func (c *Controller) Run(workers int, stopCh chan struct{}) {
    defer runtime.HandleCrash()
    defer c.queue.ShutDown()
    klog.Info("Starting Pod controller")
    //启动Informer ListWatch
    go c.informer.Run(stopCh)
    //等待Informer完成同步
    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }
    //启动controller多实例数
    for i := 0; i < workers; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    <-stopCh
    klog.Info("Stopping Pod controller")
}

func (c *Controller) runWorker() {
    for c.processNextItem() {
    }
}

func main() {
    //创建配置
    config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
    if err != nil {
        klog.Fatal(err)
    }

    //创建clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatal(err)
    }

    //创建pod ListWatch
    podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

    //创建用户区消息队列
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    //创建indexer、informer对象,indexer用于缓存资源对象,informer用于监听对象变更消息
    indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(old interface{}, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    }, cache.Indexers{})

    controller := NewController(queue, indexer, informer)

    //启动controller
    stop := make(chan struct{})
    defer close(stop)
    go controller.Run(1, stop)

    select {}
}

5、自定义controller

client-go中只提供了内置资源类型相关的API接口,因此默认的Informer机制只适用于内置资源,针对自定义资源则需要通过code-generator工具生成对应的Informer需要的框架代码。
使用code-generator生成框架代码步骤:

(1)创建一个项目democontroller

mkdir democontroller && cd democontroller
go mod init democontroller

(2)定义CRD结构

创建CRD group为democontroller,版本为v1alpha1
mkdir -p pkg/apis/democontroller/v1alpha1 && cd pkg/apis/democontroller/v1alpha1
创建doc.go,为整个package中所有结构生成DeepCopy方法
// +k8s:deepcopy-gen=package
// +groupName=democontroller

// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1 // import "demo-controller/pkg/apis/samplecontroller/v1alpha1"
创建types.go,定义CRD对应的go结构
package v1alpha1

import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Demo is a specification for a Demo resource
type Demo struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`

        Spec   DemoSpec   `json:"spec"`
        Status DemoStatus `json:"status"`
}

// DemoSpec is the spec for a Demo resource
type DemoSpec struct {
        DeploymentName string `json:"deploymentName"`
        Replicas       *int32 `json:"replicas"`
}

// DemoStatus is the status for a Demo resource
type DemoStatus struct {
        AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// DemoList is a list of Demo resources
type DemoList struct {
        metav1.TypeMeta `json:",inline"`
        metav1.ListMeta `json:"metadata"`

        Items []Demo `json:"items"`
}
创建register.go,将上面定义的go结构注册到Scheme
package v1alpha1

import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"
)

// SchemeGroupVersion is group version used to register these objects
// 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "democontroller", Version: "v1alpha1"}

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
        return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
        return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
        SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
        AddToScheme   = SchemeBuilder.AddToScheme
)

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
        //注意,添加了Demo/Demolist 两个资源到scheme
        scheme.AddKnownTypes(SchemeGroupVersion,
                &Demo{},
                &DemoList{},
        )
        metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
        return nil
}
(3)创建代码生成脚本
创建脚本目录
mkdir hack && cd hack
创建boilerplate.go.txt,用于提供代码声明模板
/*
Copyright 2024 jemuelmiao

Licensed under the Apache License
*/
创建tools.go,引入依赖库
//go:build tools
// +build tools

// This package imports things required by build scripts, to force `go mod` to see them as dependencies
package tools

import _ "k8s.io/code-generator"
创建代码生成脚本update-codegen.sh
#!/usr/bin/env bash

set -o errexit
set -o nounset
set -o pipefail

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${SCRIPT_ROOT}/vendor/k8s.io/code-generator

source "${CODEGEN_PKG}/kube_codegen.sh"

THIS_PKG="democontroller"

kube::codegen::gen_helpers \
    --boilerplate "${SCRIPT_ROOT}/hack/boilerplate.go.txt" \
    "${SCRIPT_ROOT}/pkg/apis"

kube::codegen::gen_client \
    --with-watch \
    --output-dir "${SCRIPT_ROOT}/pkg/generated" \
    --output-pkg "${THIS_PKG}/pkg/generated" \
    "${SCRIPT_ROOT}/pkg/apis"
(4)生成框架代码
go mod tidy
go mod vendor
chmod -R 777 vendor
cd hack && chmod 755 update-codegen.sh && ./update-codegen.sh
框架代码生成前:
框架代码生成后:
(5)创建main.go验证
创建cmd/main.go
package main

import (
        "democontroller/pkg/generated/clientset/versioned"
        "democontroller/pkg/generated/clientset/versioned/typed/democontroller/v1alpha1"
        "democontroller/pkg/generated/informers/externalversions"
        "fmt"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/client-go/tools/clientcmd"
        "time"
        "context"
)

func main() {
    	config, e := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
        if e != nil {
                panic(e.Error())
        }
    	//使用v1alpha1
        client, e := v1alpha1.NewForConfig(config)
        if e != nil {
                panic(e.Error())
        }
        demoList, e := client.Demos("test").List(context.TODO(), metav1.ListOptions{})
        fmt.Println(demoList, e)

    	//使用versioned
        clientset, e := versioned.NewForConfig(config)
        factory := externalversions.NewSharedInformerFactory(clientset, 30*time.Second)
        demo, e := factory.Democontroller().V1alpha1().Demos().Lister().Demos("test").Get("test")
        if e != nil {
                panic(e.Error())
        }
        fmt.Println(demo, e)
}
(6)编译运行
go mod tidy && go mod vendor
go build cmd/main.go

6、kubebuilder

上面通过code-generator生成自定义controller稍显繁琐,有些工具如kubebuilder可以很方便的生成框架脚手架代码,简化了自定义controller的开发。
使用kubebuilder生成脚手架代码步骤:
(1)前提
  • go version v1.20.0+
  • docker version 17.03+
  • kubectl version v1.11.3+
  • Access to a Kubernetes v1.11.3+ cluster
(2)安装kubebuilder
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)"
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
(3)创建项目
mkdir guestbook && cd guestbook
kubebuilder init --project-name guestbook --domain com --repo com/guestbook
  • repo:go module名
  • domain:是自定义CRD group后缀
  • project-name:项目名,用于label名或kustomization.yaml中名称前缀等
(4)创建API
kubebuilder create api --group webapp --version v1 --kind Guestbook
  • group:CRD group,上面创建的apiVersion为webapp.com/v1
(5)修改CRD go结构
生成的CRD对应的go结构在文件api/v1/guestbook_types.go中,根据需要调整该结构内容
(6)生成CRD yaml
每次修改CRD go结构后,执行make manifests重新生成CRD yaml文件,该文件在config/crd/bases/webapp.com_guestbooks.yaml。
(7)安装CRD到k8s集群
  • 安装CRD:make install
  • 卸载CRD:make uninstall
(8)部署controller
  • 构建镜像:make docker-build
  • 推送镜像:make docker-push
  • 部署镜像:make deploy
  • 卸载镜像:make undeploy
kubebuilder生成的项目结构如下:
构建镜像如下:

 

标签: 暂无
最后更新:2024年7月25日

jemuel

这个人很懒,什么都没留下

点赞
下一篇 >
文章目录
  • 1、背景
  • 2、Controller原理
  • 3、List Watch机制
    • 3.1 long-polling
    • 3.2 http watch
    • 3.3 client-go watch
  • 4、Informer机制
    • 4.1 Informer原理
      • 4.1.1 结构定义
      • 4.1.2 源码分析
    • 4.2 Informer使用
  • 5、自定义controller
  • 6、kubebuilder
最新 热点 随机
最新 热点 随机
K8S源码分析系列2—远程调试K8S组件 Volcano源码分析系列—调度篇 K8S源码分析系列1—搭建K8S调试集群 K8S Controller开发 6.5840 Lab 1: MapReduce MongoDB源码分析系列1——编译环境搭建
K8S源码分析系列2—远程调试K8S组件
K8S源码分析系列2—远程调试K8S组件 6.5840 Lab 1: MapReduce K8S Controller开发 MongoDB源码分析系列1——编译环境搭建 GraphQL介绍及使用 Golang优先级调度

COPYRIGHT © 2021 www.miaozhouguang.com. ALL RIGHTS RESERVED.

THEME KRATOS MADE BY VTROIS

粤ICP备2022006024号

粤公网安备 44030602006568号