K8s Informer原理解析,快速了解informer机制

k8s是典型的server-client架构。etcd存储集群的数据信息,apiserver作为统一的操作入口,任何对数据的操作都必须经过apiserver。

客户端通过ListAndWatch机制查询apiserver,而informer版本号规则模块则封装了List-watch。

《kubernetes源码剖析》一书中的informer机制架构图:

整个架构大体分为以下几个部分:

Index

tools/cache/thread_safe_store.go中,定义了实现了线程安全的存储接口ThreadSafeStore:

type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error) //传入indexName和obj,返回所有和obj有相同index key的obj
IndexKeys(indexName, indexKey string) ([]string, error) // 传入indexName和index key,返回index key指定的所有obj key
ListIndexFuncValues(name string) []string //获取indexName对应的index内的所有index key
ByIndex(indexName, indexKey string) ([]interface{}, error) //和IndexKeys方法类似,只是返回的是index key指定的所有obj
GetIndexers() Indexers //返回目前所有的indexers
AddIndexers(newIndexers Indexers) error //存储数据前调用,添加indexer
Resync() error // Resync is a no-op and is deprecated
}

结构体threadSafeMap将版本号英文缩写资源对象数据存储于一个内存中的map数据结构中:

type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} //实际存所有资源对象的地方
indexers Indexers
indices Indices
}

每次的增、删、改、查操作数据恢复软件免费版都会都会加锁,以保证数据的一致性。k8s.io/client-go/tools/cache/store.go中,定义了存储接口Store其他综合收益

type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}

在tools/cache/index.gose短视频5线路线路中,定义了Indexer接口:

type Indexer interface {
Store // 继承接口Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}

还定义了一些数据结构:

type IndexFunc func(obj interface{}) ([]string, error) // 计算资源对象的index key的函数类型,值得注意的是,返回的是多个index key组成的列表
type Indexers map[string]IndexFunc // 计算索引的方法不止一个,通过给他们命名来加以区别,存储索引名(name)与索引方法(IndexFunc)的映射
type Indices map[string]Index // 索引名(name)与索引(index)的映射
type Index map[string]sets.String // 索引键(index key)与值(Obj Key)列表的映射

它们间的关系如图所示:


                                            K8s Informer原理解析,快速了解informer机制


                                            K8s Informer原理解析,快速了解informer机制

具体实现在tools/cache/store.go中的cache结构体:

type cache struct {   
cacheStorage ThreadSafeStore //cacheStorage是一个ThreadSafeStore类型的对象,实际使用的是threadSafeMap类型
keyFunc KeyFunc //用于计算资源对象的index key
}
type KeyFunc func(obj interface{}) (string, error)

cache结构体封装了threadSafeMap的很多方法,对外提供了Add、Update、Delete等方法;Indexer版本号命名规则接口中规定需色多多app下载安装华为市场要实现的那些方法都是调用的threadSafeMap的实现

通过c色多多荔枝榴莲向日葵ache版本号英文缩写.NewIndexer(keyFunc, Indexers)初始化Indexer对象数据

  1. keyFunc:k8s内部目前使用的自定义的indexFunc有PodPVCIndexFunc 、indexByPodNodeName 、MetaNamespaceIndexFunc
  2. 默认使用MetaNamespaceKeyFunc:根据资源对象计色多多app下载安装华为市场算出<namespace&版本号英文gt;/<name>格式的key,如果资源对象的<namespace>为空,则<nasedo个链接域名me>作为key
  3. Indexers:通过NewThreadSafeStore(indexers, Indices{})得到结构体内的cacheStorage

示例:

// 定义一个IndexFunc,功能为:根据Annotations的users字段返回index key
func UsersIndexFunc(obj interface{}) ([]string, error){
pod := obj.(*v1.Pod)
usersString := pod.Annotations["users"]
return strings.Split(usersString, ","), nil
}

func main() {
index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"byUser":UsersIndexFunc})

pod1 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"one",Annotations:map[string]string{"users":"ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"two",Annotations:map[string]string{"users":"bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta:metav1.ObjectMeta{Name:"three",Annotations:map[string]string{"users":"ernie,elmo"}}}

//添加3个Pod资源对象
index.Add(pod1)
index.Add(pod2)
index.Add(pod3)

//通过index.ByIndex函数(通过执行索引器函数得到索引结果)查询byUser索引器下匹配ernie字段的Pod列表
erniePods, err := index.ByIndex("byUser","ernie")
if err != nil{
panic(err)
}

for _, erniePods := range erniePods{
fmt.Println(erniePods.(*v1.Pod).Name)
}
}

DeltaFIFO其他应付款

tools/cache/delta_fifo.go中定义了Del数据分析师taFIFO。Delta代表变化, FIFO则是先入先出的队列。

DeltaFI版本号怎么看FO将接受来的资源数据废土event版本号规则,转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的数据库本地缓存。

DeltaType是string的别版本号是什么意思名,代表一种变化:

type DeltaType string

类型定义:

const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = “Replaced” // 替换,list出错时,会触发relist,此时会替换
Sync DeltaType = “Sync” // 周期性的同步,底层会当作一个update类型处理
)

Delta由变化类型+资源对象组成:

type Delta struct {
Type DeltaType
Object interface{}
}

Deltas是[]delta切片:

type Deltas []Delta

DeltaFIFO的定义:

type DeltaFIFO struct {
lock sync.RWMutex //读写锁
cond sync.Cond //条件变量
items map[string]Deltas //通过map数据结构的方式存储,value存储的是对象的Deltas数组
queue []string //存储资源对象的key,该key通过KeyOf(obj)函数计算得到
populated bool //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
initialPopulationCount int //通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
keyFunc KeyFunc
knownObjects KeyListerGetter //indexer
closed bool
closedLock sync.Mutex
emitDeltaTypeReplaced bool
}


                                            K8s Informer原理解析,快速了解informer机制

向队列里添加元素:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj  interface{}) error {
id, err := f.KeyOf(obj) //获取obj key
if err != nil {
return KeyError{obj, err}
}
//向items中添加delta,并对操作进行去重,目前来看,只有连续两次操作都是删除操作的情况下,才可以合并,其他操作不会合并
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
//向queue和items中添加元素,添加以后,条件变量发出消息,通知可能正在阻塞的POP方法有事件进队列了
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// 冗余判断,其实是不会走到这个分支的,去重后的delta list长度怎么也不可能小于1
delete(f.items, id)
}
return nil
}

从队列里Pop元素:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{},  error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 { // 如果队列是空的,利用条件变量阻塞住,直到有新的delta
if f.IsClosed() { // 如果Close()被调用,则退出
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
// 如果处理失败了,调用addIfNotPresent:如果queue中没有则添加。本身刚刚从queue和items中取出对象,应该不会存在重复的对象,这里调用addIfNotPresent应该只是为了保险起见
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

Reflector

tools/cache/ref其他货币资金lector.go中定义了Reflector:

type Reflector struct {
name string
expectedTypeName string //被监控的资源的类型名
expectedType reflect.Type // 监控的对象类型
expectedGVK *schema.GroupVersionKind
store Store // 存储,就是Delta_FIFO,这里的Store类型实际是Delta_FIFO的父类
listerWatcher ListerWatcher // 用来进行list&watch的接口对象
backoffManager wait.BackoffManager
resyncPeriod time.Duration //重新同步的周期
ShouldResync func() bool //周期性的判断是否需要重新同步
clock clock.Clock //时钟对象,主要是为了给测试留后门,方便修改时间
……
}

同一类资源Inform数据分析er共享一个Reflector。Reflector通过ListAndWatch函数来se多多破解版修改教程Li版本号21h1什么意思stAndWatch apiserver来获取资源的数据。

其他综合收益取时需要基于版本号命名规则Resource系统运维工资一般多少Version(Etcd生成的全局唯一且递增的资源版本号色的拼音)。通过此序号,客户端可以知道目前与服务端信息同步的状态,每次只取大于等于本地序号的事件。好处是可以实现事件的全局唯一,实现”断点续传“功能色调,不用担心本地客户端偶尔出现的网络异常

ListAnd其他货币资金包括哪些内容watch是k8s统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性、性能等,为声明式风格的API奠定了良好的基础数据库,是k8s架构的精髓。

​List在Controller重启或Watch中断的情况下,调用资源的list API罗列资源对象以进行系统运维工程师全量更新,基于HTTP短链接实系统/运维

(1)r.listerWatcher.List用于获取资源下的所有对象的数据,例如,获取所有Pod的资源数据。获取资源数据是由opti其他ons的ResourceVersion控制的。如果ResourceVersi数据恢复on为0,则表示获取所有Pod的资源数据;如果Resourc系统运维工程师eVersion非0,则数据分析表示根据资源版本号继续获取。

(2)listMetaInterface.Get版本号规则Reso其他综合收益urceVersion用于获取资源版本号。

(3)meta.ExtractList用于数据漫游将资源数据(runtime.Object对象)转换成资源对象列表([]runtime系统运维工程师.Object对象)。

因为r.listerWatcher.List获取的是资源下的所有对象的数据,例如所有的Pod资源数据,所以它是一个资源列表。

(4)r.sync版本号规则With用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并会替换已存在的对象。

(5)r.setLastSyncResourceVersion用于设置最新的资源版本号。

Wa其他应收款是什么科目tch则在多次List之间进行,调用资源的watch API,基于当前的资源版本号监听资源变更(如Added、Updated、Deleted)事件。

通过在Http请求中带上watch=true,表示采用Http长连接持续监听apiserver发来的资源变更事件。

apiserver在response的HTTP Header中设置T数据透视表ransfer-Encoding的值为chunked,表示采用分块传输编码。每当有事件来数据恢复临,返回一个WatchEvent

​Reflector在获取新的资源数据后,调用的Add方法将资源对象的Delta记录存放到本地缓存DeltaFIFO中。

Controller其他

在tool/cache/色多多荔枝榴莲向日葵cont版本号规则roller.go中定义了Controller接口:

type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string}

controller结构体实现了此接口:

type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock}

config结构体中是所有配置:

type Config struct {
Queue //DeltaFIFO
ListerWatcher
Process ProcessFunc //从DeltaFIFO Pop调用时,调用的回调
ObjectType runtime.Object //期待处理的资源对象的类型
FullResyncPeriod time.Duration //全量resync的周期
ShouldResync ShouldResyncFunc //delta fifo周期性同步判断时使用
RetryOnError bool}

Con数据恢复软件免费版troller的processLoop方法会不断地调用的Pop方法从Delta队列中消费弹出delta记录(队列中没有数据时阻塞等待数据):

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}}

Pop方法须传入Process函数——用于接收并处理版本号英文缩写对象的回调方法,默认的Process函数是Informer模块中的HandleDeltas

informer

Kubernetes的其他组件版本号规则都是通过client-go的Informer机制与Kubernetes API Server进行通信的。

Informer版本号也被称为Shared Informer,它是可以共享使用的。

在clientgo的informer/fac版本号英文tory.go中,有接口定义:

type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// 所有已知资源的shared informer
Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface}

sharedInse短视频5线路线路formerFactory结构体实现了此接其他应收款口:

type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool //用于追踪哪种informer被启动了,避免同一资源的Informer被实例化多次,运行过多相同的ListAndWatch}

新建一个sharedInf系统运维工资一般多少ormerFactory结构体:

sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)

第1个参数是用于与Kubernetes API Server交互的客户端;第2个参数用于设置多久进行一次resync(周期性的List操作),如果该参数为0,则禁用resync功能。

sharedInformerFactory结构体实现了所有已知资源的share其他d informer,例如在clientgo的informer/core/vi/pod.go中,定义了如下接口:

type PodInformer interface{
  Informer() cache.SharedIndexInformer
  Listen() v1.PodLister}

podInform数据漫游是什么意思er结构体实现了Informer其他应收款是什么科目方法和Listen方法:

func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) // 如果已经存在同类型的资源Informer,则返回当前Informer,不再继续添加}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())}

通过调用sharedInform系统/运维ers.Core().V1().Pods()获得podInformer结构体

得到具体Pod资源的informer对象:

informer := sharedInformers.Core().V1().Pods().Informer()

最终获得的,是clientgo/其他垃圾tool/cache/s其他垃圾hared_informer.go中的sharedIndexInformer结构体,它实现的接口为:

type SharedIndexInformer interface {
SharedInformer
AddIndexers(indexers Indexers) error //启动informer前为其添加indexers
GetIndexer() Indexer}

它的定义为:

type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex}

通过informer.AddEventHan其他垃圾dler函数可以为资源对象添加资源事件回调方法,支持3种资源事件回调方法:AddFunc、UpdateFunc、DeleteFunc

sharedIndexInformer结构体定义了HandleDeltas函数,作为process回调函数(通过Confi版本号g结构体传给controller)

sharedIndexInformer结构体定义了HandleDeltas函数,作为process回调函数(通过Config结构体传给controller)

数据资源对象的操作类型为Added、Updated、Deleted时,会将系统/运维该资源对象色多多app下载安装华为市场存储至Indexer,并通过distribu版本号20h2te函数将资源对象分发至用户自定义的事件处理函数(通过informer.AddEventHandler添加)中

通过informer.Run数据漫游(stopCH)运行该informer,它是一个持久化的goroutine,通过clientset对象与apiserver交互。

它会启动controller,启动时传入的Config结构体包含了其他应付款

sto系统运维工程师pCH用于在程序进程退出前通系统运维工资一般多少知Informer退出

调用Pod的Informer的示例:

stopCH := make(chan struct{})
defer close(stopCH)
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ //为Pod资源添加资源事件回调方法
AddFunc: func(obj interface{}){
mObj := obj.(v1.Object)
log.Print("创建新Pod:",mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}){
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Print(oObj.GetName(),",",nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj :=obj.(v1.Object)
log.Print("删除旧Pod:",mObj.GetName())
},})
informer.Run(stopCH)

Work Queue(选用)

在开发并行程序时,需要频繁的进行数据同步,本身golang拥有channel 机制,但不能满足一些复杂场色的笔顺景的需求。例如:延时队列、限速队列。

client-其他应收款是什么科目go中提供了多种队列以供选择,可以胜任更多的场景。工作队列会对存储的对象进行去重,从而避免多个woker 处理同一个资源的情况。

用户可以在回调函数里,将资源对象推送到WorkQueue(或其他队列)中,也可以直接处理。

参考资料:

[1]​​https://kubernetes.io/d其他应收款ocs/home/​​

[2]​​https://edu.aliyun.com/roadmap/cloudnative​​

[3] 郑东旭《Kubernetes版本号规则源码剖析》

代码示例:通过informer采集event并存入ES

package main

import (
"bytes"
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"math/rand"
"strconv"
"time")

func mustSuccess(err error) {
if err != nil {
panic(err)
}}

func main() {
rand.Seed(time.Now().UnixNano())
config, err := clientcmd.BuildConfigFromFlags("", "/Users/qiulingwei/Projects/kube-goclient/kubeconfig")
mustSuccess(err)

clientset, err := kubernetes.NewForConfig(config)
mustSuccess(err)
sharedInformers := informers.NewSharedInformerFactory(clientset, 0)
stopChan := make(chan struct{})
defer close(stopChan)

eventInformer := sharedInformers.Events().V1beta1().Events().Informer()
addChan := make(chan v1beta1.Event)
deleteChan := make(chan v1beta1.Event)
eventInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
unstructObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
mustSuccess(err)
event := &v1beta1.Event{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj, event)
mustSuccess(err)
addChan <- *event
},
UpdateFunc: func(oldObj, newObj interface{}) {
},
DeleteFunc: func(obj interface{}) {
},
}, 0)

go func() {
for {
select {
case event := <- addChan:
str, err := json.Marshal(&event)
mustSuccess(err)
esinsert(str)
break
case <-deleteChan:
break
}
}
}()
eventInformer.Run(stopChan)}

func esinsert(str []byte){
cfg := elasticsearch.Config{
Addresses: []string{
"xxxxxx",
"xxxxx",
"xxxxx",
},
Username: "xxx",
Password: "xxxxxx",
}
es, _ := elasticsearch.NewClient(cfg)
req := esapi.CreateRequest{
Index: "qlw-index",
DocumentType: "_doc",
DocumentID: strconv.FormatInt(time.Now().Unix(),10) + strconv.Itoa(rand.Int()),
Body: bytes.NewReader(str),
}
res, err := req.Do(context.Background(), es)
defer res.Body.Close()
if err!=nil {
fmt.Println(res.String())
}}

更多内容请扫码关注


                                            K8s Informer原理解析,快速了解informer机制