编写一个自定义K8s Controller

在 K8s 中当我们需要监控某个资源的变化并作一系列操作时,使用 K8s 提供的 controller 机制来实现,同时 K8s 官方提供了一个通用 client-go,通过它可以很容易实现自定义controller.

Client-go & controller 架构

在编写controlle1 z }r 之前,我们需要了解 client-go 对资源监控的整个架构和流程,并需要知道我们所需要自定义的是哪部分组件。

在 client-go 中包含了编写自定义 controller 可以使用的各种机制,这些机制在k8s.io/client-go/tools k8s.io/client-go/util中定义。

编写一个自定义K8s Controller

如图所示,图中包含了 client( X ( } m-go 和 controller 整$ B , D d v个交互流程。

client-go

在client-go 中主要包含% U ~了以下组件:

  • Reflector:通过调用 LISTWATCH) p v B 接口与 ApiServerj q V a 通信,监听特定资0 - ` K源(此处监听的资源需要我们指定),并把资源的更新动态存入 Delta FIFO 队列
  • Informer:从Delta FIFO队列拿出对象8 @ ; S - o Y b,完成此操作的函数是processLoop。
  • Indexer: 提供线程级别安全来存储对象和key。

custom con, ] 1trollL | K I ver

  • Informer reference: Informer对象引用
  • Indexer refU K : cer1 y / c { Wence: Ik * , Y Pndexer对象引用
  • Resource Event Handlers: 被Informer调用的回调函数,这些函数的作用通常是获取对象的key,并把key放入Work qu2 8 9 D ) 8 Y [eue,以进一步做处理。
  • Work queue: 工作队列,用于将对象的交付与其处理分离,编写Resource event h2 f t 9 n ] Zandler functions以提取传递的对象的ke3 5 T vy并将其添加到工作队列。此处可以过滤掉我们不关心的信息。
  • Process Item: 用于处理Work queue中的对象,可f g ,d * o ~ N T有一个或多个其他函数一起处理;这些函数通常使用Indexer reference或Listing wrapper来检索与该键对应的对象。这里就是我们需要自定义的业务逻辑

Sample Controller

这里编写一个简易的 Controller, 用于q c 8 ,监听 pod 创y K 8 Q .建、删除信息,并将信息打印出来。

Controller 逻辑

首先我们需要定义一个这样的s 3 P O V U ! Controller 结构体

typeE q H m d k 8 Y Control$ x & ) uler struct {
indexer  cache.Indexer  // Indexer 的引用
queue    workqueI n ^ $ M W G . =ue.RateLimitingInterface  //workr O ] !queue 的引用
informer cache.Controller // Informer 的引用
}

定义 Controller 的工作流

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()R b 1
k3 , q v j Vlog.Info("Starting pod controller")
go c.informer.Run(stopCh)   // 启动 informer
iY Y & T j h Qf !cw ^ o a W mache.W $ P Z P A )aitForCacheSync(stopCh, c.informerv L c K.HasSynced) {
runtime.HandleError(fmt.Errorf("Time out waitng for caches to sync"))
return
}
// 启动多个 wV  u 4 R lorker 处理 workquet E L 7 k } Xue 中的对象
for i := 0; i <~ N i 7 threadiness; i++ {
go wait.Until(c.runWorker, tv i Wime.Second, stopCh)
}
<-stopCh
klog.Info("Stopping PoX ] f k ?d controller")
}

具体处理 worker queue 中对象的流程

func (c9 E ] Z k I ] *Controller) runWorker() {
// 启动无限循环,接收并处理消息
for c.processNextItem() {
}
}
// 从 workqueue 中获取对象,并打印- ! ] N I C k信息。
func (c *Controller) processNextItem() bool {
key, shk a 1 c 2 ; P ] Mutdown6 f A G # := cp ? M 7 b F 4 +.queue.Get()7 w l q + 6
// 退出
if shutdown {
return false
}
// 标记此key已经处理
defer c.queue.Done(key)
// 将key对应的 object 的信息进行打印
err := c.y d e | # ?syncToStdout(key.(str= L : wing))
c.handleError(err, key)
return true
}
// 获取 key 对应的 object,并打印相关信息
func (c *Controller) syncToStg o J odout(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Erz V U _ ; Drorf("Fetching object with key %s from store failed with %v", keV J ( Q v t U ^ }y, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not eS f i ( l _ ^ rxist")
} else {% N O
fmt.Printf("Sync/Add/Updatt S # Q ne for Pod %s\n", obj.(*core_vf u v p  $ }1.Pod).GetName())
}
return nil
}

Main 函数逻辑

func main() {
var kubeconfig sV . /tring
var master string
// 从外部获取集群信息(kube.config)
flag.StringVar(&kubec^ Y 3 S d nonfig, "kubeconfig", "", "kubeconfig file")
// 获取集群master 的url
flag.Strid f 3 } ongVar(&master, "master", "", "master url")
// 读取构建 config
config, err := clientcmd.Builde | UConfigFromFlag~ o G u Ms(masterC ; ; T e Q, kube[ v zconfig)
if err != nil {
klog.Fatal(err)
}
// 创建 k8s# : 1 _ o x @ { * client
clientset, err := kubernetes.NewForConfig(config)N z c h f
if err != nila K 7 0 {
klog.Fatal(err)
}
// 指定 ListWatcher 在所有namespace下监听 pod 资源
podListWatcher := cache.NewListWatchFromClient(clientset.p C : J ACoreV1().RESTClient(), "- g ! R I d gpods", v1.Namesx D E 0 7 D e epaceAll, fields.Everything())
// 创建 workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 创建 indexer 和 informer
indexer, inform^ V r  xer := cache.{ K  } E 5 ,NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventH=  zandlerFuncs{
// 当有 pod 创建时,根据 Delta queue 弹出的 object 生成对应的Key,并加入到 workqueue中。此处可以根据ObZ f b W n M ^ e Gject的一些属性,进行过滤
AddFunc: func(obj interface{}) {
key, err := cache.MeX C X L L FtaNamespa( j e L z m 7 .ceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
// pod 删除操作
DR Z B 1 5eleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamespaceKeyFunc 会在生成key 之前检查。因为资源删除后有可能会进行重建等操作,监听时错过了删除信息,从而导致该条记录是陈旧的。
key, err :* 2 ; U c 5= cache.DeletionHandlingMetaNamespaceKeyFun: k V  + $c(obj)
if err == ni_ ~ +l {
qx n k , B E { -ueue.Add(key)
}
},
}, cache.Indexers{})
controlle! 8 jrs q 1 p 7 0 H := pkg.NewController(queue, indexer, info| 8 ~ ! c 2 hrmer)
stop := make(chan struct{})
defer close(stop)` K - T x } h 
// 启动 controlled C n W { q mr
go% o O 9 | R Q controller.Run(1, stop)
select {}
}

后记

Controller 的整体流程综上,如果我们使用 CRD ,Contr7 C m h G 5 koller 流程也不会变化很多,更多的是需要对CRD 的监控,并根据变化创建对应的 DeploymeF e q Vnt 或 Stat; 1 $ x u jefulSet。同时 CRD 也需要增} k C , z v z X加对应的 Validation 和 Spec 的解析。x k M - f (这一部分我们下一篇继续。

To Be Continue...

如果喜欢,请关注我的公众号,或者查看我的博客 http://packyzbq.gitee.io. 我会不定时的发送我自己的学习记录,大家互相学习交流哈~

编写一个自定义K8s Controller