一文读懂 Kubernetes APIServer 原理荐

前言

整个Kubernetes技术体系由声明式API以及Controller构成,而kube-apiserver是Kubernetes的声明式api server,并为其它组件交互提供了桥梁。因此加深对kube-apiserver的理解就显得至关重要了。

一文读懂 Kubernetes APIServer 原理荐

整体组件功能

kube-apt w ^ ` l ~ k ( Uiserver作为整个Kubernetes集群q B } J G ? X [操作etcd的唯一入口,负责Kubernetes各资源的认证&鉴权,校验以及CRUD等操作,提供RESTful APIs,供其它组件调用:

一文读懂 Kubernetes APIServer 原理荐

kube-apiserver包含三种APIServer

  • aggregatorServer:负责处理 apiregistration.k8s.io 组下的APIService资源请求,同时将来自用户的请求拦截转发给aggregN 8 y `ated server(AA)
  • kubeAPIServer:负责对请求的一些通用处理,包括:认证、鉴权以及各个内建资源(pod] T + r ! ` : m @, deployment,service anA A h ~ nd etc)的REST服务等
  • apiExtensionsServer:负责CustomResourW 4 k 5 V 0 % n /ceDefinition(CRD)apiResources以及apiVersions的注册,6 3 8 5 C Y同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环

另外还包括bootstx S P ,rap-contro? . d j ] kller,主要负责Kubera | ^ s l f Znetes de: k ) $ E S *fault apiserver service的创建以及管理。

接下来将对上述组件进行概览性总结。

bootstrap-controller

  • apiservI r _er bootstrap-controller创建&运行逻辑在k8s.io/kuberneteu G n Cs/pkg/master目录
  • bootstrap-contrZ + D :oller主要用于创建以及维护内部kubernetesi | j / default apiserver service
  • kubernetes default apiserver service spec.selector为空,这v ? m ~ 8 = A / Y是default apiserver service与其它正常service的最大区别,表明了这个特殊的service对应的endpoints不由endp@ I $ t Toints controller控制,而是直接受kube-apiserver bootstrap-X Z d S ucontroller管理(maintained by this code, not by the pod selector)
  • bootstrap-controller的几个主要功能如下:

    • 创建 de= % ` i V Efault、kube-system 和 kube-public 以及 kubeo : X K 0-node-lease 命名空间
    • 创建&维护k1 { l / z a n cubernetes default apiserk / & ` [ T 2ver service以及对应的endpoint
    • 提供基于Service ClusterIP的检查及修复功能(--service-cluster-ip-range指定范围)
    • 提供基于Service NodePort的检查及修复功能(--servi1 ! V Lce-node-port-range指定范围)
/r ` o/ k8s.io/ks S N - gubernetes/pkg/master/controller.go:142
// Start begins the core controller loops that must exist for bootstrapping
// a clust2 U l rer.
func (c *Controller) Start() {
if c.runner != nil {
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createj i =EndpointPortSpec(c.PublicServicePortu r h `, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.RemoveEndpoints(kuY z  # xbernetesServiceName, c.Pue % ( 8 *blicIP, endpoiu G 7ntPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernet= a |em $ R z B gs service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceCt { x + h Q _ wlusterIPRange, cx b C 7 k 8 , x W.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceC ) . 4 } 7 ` 2 ilusterIPRegism _ f Y  $try)
repairNodePorts := portallocatorcontrolO 3 Z  ` M g c ler.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.Serv6 i n O OiceNodePortRegistry)
// run all of the controll$ x m ? n Sers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
// If we fail to rb L G @ Z k /epair cluster IPs apiserverP m L d $ ir , =s useless. We should restart and retry.
klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
}
if err := repairW  ; g 4 S GNodePorts.RunOnce(); err != nil {
// If we fail to repail z 7 D a wr node ports apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial service nodePoL r _ c C B h frm * n , ;ta / / ) ? t B 7 check: %v", err)
}
// 定期执行bootstrv G k `  ^ap controllt 2 #er主要的四个功能(r~ & Eeconciliation)
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.Runq Z - X S 4 _Until, repairNodePorts.RuZ 2 `  9 `nUntil)
c.runner.Start()
}

更多代码原理详情,参考 kubernetes-reading-notes 。

kubeAPIServer

KubeAPIServers V = 2 %主要提供对内建API Resources的操作请求w O e A o W v,为Kubernetes中各API Resources注册路由信息,同时b S y k + Q l s暴露RESTful API,使集群中以及集群外的服务都可以通过RESTful API操作K0 r J o 6 q kubernetes中7 K k P b的资源

另外,kubeAPIServer是整个Kubernetes apib 4 p % |server的核心,下面将要讲述的aggregatorServer以及apiExj 6 utensionsServer都是建立在kubeAPIServer基础上进行扩展的(补充了Kub| J H ?ernetes对用户自定义资源的能力支持)

kubeR { K :APIServer最核心的功能是为Kubernetes~ ) 5 4 x 9 ~ K Y内置资源添加路由,如下:

  • 调用 m.InstallLegacyAPI 将核心 API Reso6 w y * Z j ^ )u- O {rces添加到路由中,在apiserver中即是以 /api 开头的 resource;
  • 调用 m.InstallAPIs 将扩展的 API Resources添加到路由中,在apiserver中即是3 7 p w _ : ` h o/apis 开头的 resource;
// k8s.io/kubernetes/pkg/master/master.go:c q 9 , 8 i B332
// New returns a new instance of Master from the given config.
// Certain confil H i + B : g tg fields will bes Q , R set to a default value if unset.
// Certain config fields must be specified, including:
//   KubeletClientConfig
func (& i 8 y ! _ F W ^c completedConfig) New(delegatO } ? L lionTarget genericapiserver.DelegationTarget) (*Master, error) {
...
// 安装 LegacyAPI(core API)
// iv I m W l { . s 1nsta; C 0 I ; .  Q ell lega$ ` n 4 T + 1cy rest storage
if c.ExtraConfig.APIResourceCr ! j 0  . `onfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
l# ~ ~ ^ ` v Y 4egacyRESTStorageProvider := corerest.LegacyRESTStorage[ ; e p F C wProvider{
StorageFactory:              c.ExtraConfig.StorageFactor6 y f M R Ky,
ProxyTransport:              c.ExtraConfig.ProxyTransport,
KubeletClientConfis t | : V G Z Yg:         c.ExtraConfig.KubeletClientConfig,
EventTTL:                    c.ExtraConfig.Ev, q W / , 1 0 4 5entTTL,
ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange:     c.ExtraCo@ 3 U X b [ Pnfig.SecondaryServiceIPRange,
ServiceNodK A o _ = [ kePortRange:        c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.3 A [ 1 R )RESTOptionsGetter, legacyRESTSt0 I F F w m D |orageProvK ! B b E J ~ H Wideu r 8 8 # ^r); err != nil {
return nil, err
}
}
...
// 安装 APIs(named groups apY N ! { s ~ { ~ #is)
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStord g e g } 2 z & [ageProviders...); err != nil {
return nil, err
}
...
rJ f u . _ leturn m, nil
}

整个kubeAPIServer提供了三类API Resource接口:

  • core group:主要在 /api/v1 下;
  • namE s p i r } ~ ) oed groups:其 p+ w M G e v - D gath 为 /apis/$GROUPI b E 1 / W 9 V/$VERSION
  • 系统状态的一M u B ~ V m F k C些 API:如/metrics/version 等;

而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name} 组成,结构如下图所示:

一文读懂 Kubernetes APIServer 原理荐

k$ t % U 3 Z `ubeAPIServer会为每种API资源创建对应的RESTStoragd y x 6 w _e,RESTStorage的目的是将每种资源的访问路径及其后端存储的操作对应起来:通过构造的REST Storage实现的接口判断该资源可以执行哪些操作(如:create、update等),将Y L # = 8 ? k -其对应的操作存入到action中,每一个@ x L b ( -操作对应一个标准的REST method,如create对应REST method为POST,而update对应REST method为PUT。最终根据H b + ! }ac^ B A d 7tions数组依次遍历,对每一个操作添加一个handler(handler对应REST Storage实现的相关接口),并注册到route,最终对外提供RESTful API,如下:

// m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerq p _   ]ResourceHandlers
// k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandleT J 0 + ) 9 , ?rs(path string, st4 + Q a Z L r (orage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
...
// 1、判断该 resoB # ource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
// what verbs are supportes u w s A 4 I 3d by the storage, use! q v c l 8 ? z xd to know what verbs we support per path
createV m 6 w Hr, isCreater := storage.(rest.Create{ ~ @r)
n0 M I s r r 4 }amedCreat* * h k Q + Ser, isNamedCreater3 k | | P ] ` , H := storage.r b l C(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getD K + D e Y N |ter, isGetter := s- b w { 4 c 2torage.(rest.Getter)
...
// 2、为 resource 添加对应的 actions(+根据是否支持 namespace)
// Get tV 6 `he list of actions for the given scope.
switch {
case !namespaceScoped:
// Handle non-namespace scoped resources like nodes.
resourcePath := resou! ^ O l O a 0 ~ arce
resourceParams := params
itemPath := rN : |esourcePath + "/{name}"
nameParams := append(params, nameParam)
proxyParams := append(nameParamsG 1 W, pathParam)
...
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePathp - v h a, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST4 ; p [ O y", resourcePath, resourceParams, namer, false}, isCreater)
...
}
...
// 3W S Z & h y - 2 ]、从 rest.Storage 到 restful+ | s 2 1 /.RoY ~ ;uteQ B s 4 m 映射
// 为每个操作添加对应的 handler
for _, action := range actions {
. 9 & ?..
switch action.Verb {
...
case "POST": // Create a resource.
var handler restful.RouteFunction
// 4、初始化 handler
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResoj K v + G ` ^urce(creater, reqScope, admit)
}
handler = metrics.Instrument{ I y # / tRouteFunc(action.Verw ] m { _ . G L nb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
...
// 5、route 与 handler 进行绑定
route := ws.L C . y [ k jPOST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output iV  h 6 ! C t ,s pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(suv R  w E Y .bresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMQ W K n x + QETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedOb = [ @ O a E U rject).
// TODOo d Y ( d 5: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle mulL D o o Ktiple di L lifferent objects being returned.
Returns(http.Statu + K jsCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
ReadsY ] w 1 8 B B s(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, acti8 x l & eon.Params)
//s  V o 6、添加到路由中
routes = appen, X L Vd(routes, route)
case "DELETE": // Delete a resource.
...
default:
return nil, fmt.Err? , 1 ) F Borf("unrecognized action verb: %s", acX m x ~ % D x b dtg C q $ 5 d E /ion.Verb)
}
for _, route := range routes {
route.Metada* ] ~ h d @ta(ROUTE_META_GVK, metav1.GroupVersionKind{
Group:   reqScopS x b k a Be.Kind.Group,
Ver3 e a + ` } V 6 Csion: reqScope.Kind.Version,
Kind:    reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, string? [ ws.ToLower(action.Verb))
ws.Route(route)
}
// Note: update GetAut| O Y U ihorizerAttributes()^ M 7 when adding a custo# ~ } J L { i }m handler0 ; s { ? L ` R ..
}
...
}

kube_ 1 6 N 8 #APIServer代码结构整理如下:

1. apiserver整体启动逻辑 k8s.io/kubernetes/cmd/kube-apiserver
2. apiserver bootstrap-controller创建&运行逻辑 k8sT r T 1 X % n N.io/kubernetes/pkg/master
3. API Resc g Z D ] L B C xourcy N ` n U =e对应后端RESTStorage(based on ger | ( e ;nericregistry.| . g s V u j n HStore)创建k8s.io/kubernetes/pkg/registry
4. aggregated-apil ~ B M !server创建&处理逻辑 k8s.io/kubernetes/staging/[ = ! R L ? l {src/k8s.io/kube-aggregator
5. extensions-apiserver创建&处理逻辑 k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver
6. apiserver创建&运行 k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/N g e b  N & D ,server
7. 注册API Resource资源处理handler(InstallREST&InstallisterResourceHandlers) k8s.io/kubernetes/staging// Y # & ^ Rsrc/k8s.io/apiserver/pkg/endpoints
8. 创建存储后端(etcdv3) k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage
9. geneL = e - X X Trl t : Zicregistry.Store.CompleteWithOptions初始化 k86 X As.io/kuberneteN + ; ) Us/staging/src/ke [ c8s.io/apiserver/pkg/registry

调用链整理如下:

一文读懂 Kubernetes APIServer 原理荐

更多代码原理详情,参考 kubernetes-reading-notes 。

aggregaW | PtorServer

aggregatorServer# y ^主要用于处理扩展Kubernetes AR r [ # HPI Resources的第二种方式Aggregatb 5 o T 8 1 P ? Aed APIServer(AA),将CR请求代理给AA:

一文读懂 Kubernetes APIServer 原理荐

这里结合Kubernetes官方给出的aggregated apiserver例子sample-apiserver,总结原理如下:

  • aggregav M . p / b PtorServer{ e m t K通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发的形式。aggregatorServer包括一个GenericAPIServer和维护自身状态的Ci , } V q )ontroller。其中GenericAPIServerL _ y主要处理apiregistration.k8s.io组下的APIService资源请u x | s 0求,而Controller包括:

    • apiserviceRegistrationControllerU & : D u x负责根据APISer& y @vice定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server^ { O % N f j
    • availableConditi m r N ] GionController:维护 APIServices 的可用状态,包括其引用 Service 是否可用等;
    • autoRegistrationController:用于保持 API 中存在的一组特定的 APIServices;{ J Z
    • crdRegistrationController:负责将 CRD GroupVersions 自动注册到 APIServices 中;
    • openAPIAggregationController:将 APIServices 资源的变化同步至提供的 OpenAPI 文档;
  • apiservi9 h Z rc9 x K H E o # ` beRegistrationController负责根据APIService定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server。apiService有两种类型:Local(Service为空)以及Service(Service非空)。apiserviceRegistrationController负责对这两种类型apiService设置代理:o h 4 ) d # SLocal类型会直接路由给kube-apiserver进行处理;而Service类型则会设置代理并将请求转化为对aggregated Service的请求(proxyPath := "/apis/" + apiService.Spec.Group + "/"0 ( o A Y ! z | + apiService.Spec.Versionf u s 8 f 3 J),而请求的负载均衡策略p Y H R a | F 7则是优先本地访问kube-apiserver(如果service为kubernel g ! U | y j etes default apis8 @ . _ = L Y `erver service:443)=&i ^ F w 5 f ~gt;通过sG ) i K M :ervice ClusterIP:Port访问(默认) 或者 通过随机选择servi ~ wce endpoint backend进行访问:

    func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
    ...
    proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.s ` [ O 3 ? _ eVersion
    // v1. is a special case for thq V (e legacy API.  It proxies to a wider set of endpoints.
    if apiService.Name == legacyAPIServiceName {
    proxyPath = "/api"
    }
    // register the proxy? ` o l ^ handlg ` [ L ~ T uer
    proxyHandler := &proxyHandler{
    localDelegate:   s.delegateHandler,
    proxyClientCert: s.proxyClientCert,
    proxyClientKO 0 H ^ q o Vev I ) f Uy:  s.proxyClientKey,
    proxyTransport:  s.proxyTransport,
    serviceResolver: s.serviceResoc R N 9 w K T ~ zlver,
    egressSelector:  s.egressSelector,
    }
    ... K # ! 6 : / a
    s.proxyHandlers[apiService.Name] = proxyHandler
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandlerC y 9 { n l ~  m)
    sa i w { h.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
    ...
    // it's time to regism r t I h % 0ter the gI J 9 = | qroup aggregation endpoint
    groupPath := "/apis/" + apiService.Spec.Group
    groupDiscover : ( 9yHandler := &apiGroupHandler{
    codecs:    aggregatorscO ] w 7 b @heme.Codecs,
    grj Y y G t [ e W [oupName: apiService.Spec.Group,
    lister:    s.lister,
    delegate:  s.delegateHandler,
    }
    // aggregation is protected
    s.GenericAPIServer.Handler : 3 k 0 { 9 rr.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
    s.Generic` 9 ^ + P TAPIServer.Handl1 | . . y ! Y k 0er.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
    s.A l E K !handledGroups.Insert(apiService.Spec.Group)
    return nil
    }
    // k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go:109
    func (r *proxyHandle# 5 d 1 1 T {r) ServeHTTP^ d m(w http.ResponseWritL s [ l R , / ? Qer, req *htt% p { | ! ~p.Request) {
    /S = n c  c 0 w J/ 加载roxyHandlingInfo处理请求
    value := r.handlingP I XInfo.Load()
    if value == nil {
    r.localDelegate.ServeHTTP(w, req)
    return
    }
    handlingInfo := value.(proxyHandlingInfo)
    ...
    // 判断APIService服务是否] W Z $正常
    if !handlingInfo.servS y = J L T 3 b wiceAvailable {
    proxyError(w, req, "service uN L  7 - VnavQ 9 H iailabl- o i P | { n re", http.StatusServiceUnavailable)
    return
    }
    // 将原始请求转化为对APIService的请求
    // write a new location based on the existing request pointed at the targ 7 Y z Qet service
    location := &url.U[ r g H $ kRL{}
    location.Scheme = "https"
    rloc, err := r.serviceResolveZ d / Ar.ResolveEndpoint(handlinS P ^ .gInfo.Y O : t GserviceNamespace, handlingInfo.serviceName, handlingInfo.service^ P 1 v Q KPort)
    if err != nil {
    klogJ D ? h { w D Y J.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, ha* 0 8ndlingInfo.serviceName, err)
    proxyError(w, req, "service unavailable", http.StatusServiceUnaK - @ 3 C |vailable)
    return
    }
    location.Host = rloc.Host
    location.Path = req.URLw ! U w s %.Path
    locC P 7 F vation.RawQuery = req.URL.Query(@ x z  p p [ % q).Ence x 5ode()
    newReq, cancelFn := newRequestForProxy(loE S ,cation, req)
    defer cancelO 7 m 4 1 E f +Fn()
    ...
    prm t 7  A uoxyRoundTripper = t0 g x ] e Q !ransport.NewAuthProxyRoundTri+ j + x L V S [ /pper(useM m m mr.GetName(), user.GetGroups(), user.GetExtra(), proxyRo+ % 5 DundTripper)
    handler := proxy.& : [ D { J ;NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w})
    handler.ServeHTTP(w, newReq)
    }
    $ kubectl get APIService
    NAME                                   SERVICE                      AVAILABLE   AGE
    ...
    v1.apps                                Local                        True        50d
    ...
    v1beta1.metrics.k8s.io                 kube-system/metrics-ser] $ a G Q x 0 D Jver   True        50d
    ...
    # default APIServices
    $ kubectl@ 4 Z  @ get -o yaml APIServ9 G % $ 8 I xice/v1.apps
    apiVersion: apiregistration.k8s.io/v1
    kind: APIServicR W ^ a P V & $e
    metadata:
    label{ } k 4 b - [ 2s:
    kube-agJ E ` !gregator.kubernetes.io/automanaged: onstart
    name: v1.apps
    selfLink: /apC ) z  G ( ( c kis/apb y , xiregistration.k8s.io/v1/apisD 0 _ t  Lervices/v1.apps
    spec:X o u j ( &
    group: apps
    groupPriorityMinimuma h C 2 J 5 ~ c ^: 17800
    version: v1
    versionPriority: 1P Z * j , , o c5c e | R 9 T
    status:
    conditions:
    - lastTransitionTime: "2020-10-20T10:39:48Z"
    messaged D e l: Local APIServices are always available
    reason: Local
    status: "True"
    type: Available
    # aggreg* r [ 6 r 9 uated ser8 5 *ver
    $ kubectl get -o yo X U H c eaml APIService/v1beta1.metrics.k8s.io
    apiV- z h 5 Eersion8 = B a R: apiregistration.k8s.io/v1
    kind: APIService
    meY ! 5 4 / Y Xtadata:
    labels:
    addy { m b } uonmanager.kubernetes.io/mode: Reconcile
    kubernetl | . Hes.io/cluster-service: "true"
    name: v1beta1.metrics.k8s.io
    selfLink: /apis/apiregistration.k8s.io/v1/apiservicV r Aes/v1b0 3 | m k u aeta1.metrics.k8s.io
    spec:
    group: metrics.k8s.io
    groupPrioriV u : C T ? ~ ptyMinimum: 100
    insecureSkipTW W ! ^ H WLSVerify:w X = P ^ J f = true
    service:
    name: metrics-server
    namespace: kube-system
    port: 443
    version: v1beta1
    versionPriority: 100
    status:
    conditions:
    - lastTransitionTime: "2020-12-05T00:50:48Z"
    message: all ch? 1 d m 2 B uecks passed
    reason: Passed
    status: "True"
    type: Available
    # CRD
    $ kub)  H &ectl get -o yaml APISe@ H ] k ` vrvice/v1.duyanghi ( 0 p 3 v Aao.example.com
    apiVersion: apiregistration.k8s.io/v1
    kind: APIService
    metadata:
    labels:
    kube-aggregator.kubernetes.io/automanaged: "true"
    name: v1.duyanghao.example.com
    selfLin_ _ # R g V / [ k: /apis/apiregistration.k8s.io/v1/apiservices/v1.duyanghao.exm F h w 8ample.com
    spec:
    group: duyanghao.example.com
    gx x O P yroupPriorityMinimum: 1000
    version: v1
    versionPriority: 100
    status:
    conditions:
    - lastTransitionTime: "2020-12-11T08:45:37Z"
    messD P S 3 I s s $age: LocK r 4 [ 9al APIServices are always available
    reason: Local
    status: "True"
    type: Available
  • aggregatorServer创建过程中会根据所有kube-apiserver定义的API资源创建默认的APISeb 6 q h Arvice列表,名称即是$VERSION.$GROUP,这些APIService都会有标签kube-aggregator.kubernetes.io$ U z U h m 7 :/autoW 3 X ( ? ` - |managed: onstart,例如:v1.apps a5 ) F t X %piService。autoRegistrationController创建并维护这些列表中的APIService,也即我们看到的LocalE I G 6 r G 4 . apiService;对于自定义的APIService(aggregated server),则不会对其进行处理

  • aggregated server实现CR(自定义API资源) 的CRUD API接口,并可以灵活选择后端存储,可以与core kube-x t = Lapiserver一起公用etcd,也可; 6 9 s m自己独立部署eJ M Z y L Z ztcd数据库或者其它数据库。aggregated server实现的CR API路径为:/apis/$GROUP/$VERSION,具体到sample apisB B ` w b o v ~erver为:/apis/wardle.example.com/v1alpha1,下面的资源类型有:flunders以及fischers

  • aggregated server通过部署APIService类型资源,service fields指向对应的aggregated ser! b Q b ? [ ) ?ver service实现与core kube-x S r bapiserver的集成与交互

  • saC F 6 6 P (mple-apiserver目录结构如下,可参考编写自己的aggregated server:

    staging/src/k8s.io/sample-apiserver
    ├── artifacts
    │   ├── example
    │   │   ├── apiserv* ) F d u a $ G Uice.yaml
    ...
    ├── hack
    ├── main.go
    └── pkg
    ├── admission
    ├── apis
    ├── apiserver
    ├── cmd
    ├── gF ^ P M L jenerated
    │   ├── clientset
    │   │   └── versioned
    ...
    │   │       └── typed
    │   │r  U A q           └── wardle
    │   │               ├── v1alphb . : d 7 i ja1
    │   │               └── v1beta1
    │   ├── informers
    │   │   └── externalvS 9 -erv F # p M Tsions
    │   │       └── wardle
    │   │           ├── v1alpha1
    │   │           └── v1beta1
    │   ├── liT X Asters
    │   │   └── wardle
    │   │       ├── v1alpha1
    │   │       └── v1bex F : / xta1
    └── registry
    • 其中,artifacts用于部署yaml示例
    • hack目录存放自动脚~ + } Y _ k本(eg: update-codegen)
    • main.go是aggj @ Dregated server启动入口;pkg/cmd负责启动aggregatedh 2 X B & serveO ( t ? 5r具体逻辑;pkG = ) } ` Z * 7g/apiserver用于aggregated server初始化以及路由注册
    • pkg/apis负责相关CR的结构体定义,自动生成(update-codegen)
    • pkg/admisss W & fion负责准入的相关代码
    • pkg/generated负责生成访问CR的clientset,infoD x ^ 0 7rmers,以及listers
    • pkg/registry目录负责CR相关的RESTStorage实现

更多代码原理详情,参考 kubernetes-reading-notes 。

apiExtensionsServer

apiExtensionsServer主要负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相* C 0 # ) / 0 y应CustomResource(CR)的REST请求(如果对应CR不能1 h [ z $ _被处理的话则会返回404)8 F n ! Q ` , J ~,也是apiserver Delegation的A + |最后一环

原理总结如下:

  • Custom Resource,简称CR,是Kubernetes自定义资源类型,与之相对应的就是Kuber@ ^ bnetes内置的各种资源类型,例如Pod、Service等。利用CR我们可以定义任8 M } H何想要的资源类型

  • C@ g t % ]RD通过yaml文件的形式向Kubernetes注册CR实现自定义api-resources,属于第二种扩展Kubernetes API资源的方式,也是普遍使用的一种

  • APIExtensionServer负责2 XCustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应Cusn H ~ n ! # i / _tomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环

  • crdRegistratioG K xnController负责将CRD GroupVersions自动注册到APIServices中。具体逻辑为:枚举所有CRDs,然后根据CRD定义的crd.Spec.GrouA q d D P k R sp以及crd.Spec.Versions字段构建APIService,并添加到autoRegisterController.apiServicesToSync中,由autoRegisterController进行创建以及维护操作。这也是为什么创建完CRD后会产生对应的APIService对象

  • APIExtensionServer包含的controll= $ j N I s aer以及功能如下所示:

    • openapiController:将 crd 资源的变化同步至提供的 Openv H # , oAPI 文档,可通/ s s j V u过访问 /openapV 7 Yi/v2 进行查看;

    • crdController:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过 kubectl api-versionskubectl api-resources 查看;

    • kubectl api-versions命令返回所有Kubernetes集群资源的版本信息(实际发出了两个请求,分别是https://127.0.0.1:6443/api以及https://127.0.0.1:6443/apis,并在最后将两个请求的返回结果进行了合并)
    $ kubectl -v=8 api-versions
    I1211 11:44:50.276446   22493 loader.go:375] Config loaded from file:  /root/.kube/config
    I1211 11:44:50.277005   22493 round_trippers.go:+ j + L I `420] GET htt0 | Z i . j G D Tps://127.) $ C ) R0.0.1:6443/api?timeout=32s
    ...
    I1211 11:44:50.290265   22493 request./ d n { 9go:1068] R3  W f M C _ )esponse Body: {"kin_ Y e ! +d":"APIVersions","versions":["v1"],"serverAddO ( v -ressByClientCIDRs":[{"clientCIDR":E ^ E D 3 w "0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]}
    I1211 11:44:50.293673   22493 round_trippers.go:420] G} v D 4 s )ET https://127.0.0.1:6443/apis?timeout=32s
    ...
    I1211 11:44:50.29i Y + ; @8360   22493 request.go:1068Y ? v a d 2] Response Body: {"kil T X I h ind":"APIGroupList","apiVersion":"v1","groups3 z - G D |":[{^ q L"name":"apiregistration.k8s.io"t ~ 9,"versions":[{"groupVersion":"apiregistraq s j ! & + c ( Vtion.k8s.io/v1","version":"vq * e S ] p ? & )1"},{"groupVersion":"apiregistration.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1","version":"v1f . } -"}},{"name":"extensm 4 O Y v qions","versions":[{"groupVersion":"extensions/v1beta1P M 1 p x / * i =","version":"v1beta1"}],"preX f / u : xferredVersion":{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}},{"name":"eventsa h # W _ X @ 3 W.k8s.io","versions":[{"groupVersion":"events.k8s.io/; J u . o r Mv1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"e& ! ! { q j M A ?vents.k8s.io/v15 u W R = P H +beta1","version":"v1beta1"}},{"name":"authentication.k8s.i6 B M W Xo4 . j _ @","versions":[{"groupVersion":"authentication.k8s.io/vS D K l g ; | Y1","version":"v1"},{"groupVersion":"3 k * 3authentication.k8s.io/v1beta1","version":"v1beta1"}C * O 3 c],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars]
    apiextensions.k8s.io/v1
    apiextensions.k8s.io/v1beta1
    apiregistration.k8s.io/v1
    apiregistration.k8s.io/v1beta1
    apps/v1
    authentication^ C s S & Q.k8s.io/v1beta1
    ...
    storage.k8s.io/v1
    storage.k8s.io^ P q O/v1beta1
    v1
    
    • kubectl api-resr A 0 g y & 1 Gources命令就是先获取所有API版本信息,然后对每一个APIU K L i C *版本调用接T T f f I t _ i :口获W 6 X d ( G P 3 u取该版本下的所有API资源类型

      $ kubectl -y d d O v Pv=8 api-resources
      5077 loader.1 i c 8 0go:375] Config loaded from file:  /root/.kube/config
      I1211 15:19:47.593450   15077 round_trippers.go:420] GET https://127.0.0.1:6443/api?timeout=32s
      I1211 15:19:47.602273   15077 request.go:1068] Response Body: {"kind( } W":"APIVersions","versions":["v1"],"serverK U J eAddressByClientCIDRs":[{"clientCI$ * n BDR":"0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]}
      I12( 6 h11 15:19:47.606279   15077 round_trippers.( , f s 9 u P ego:420] GET https://127.0.0.1:6443/apis?timeout=32s
      I1211 15:19:47* Y 3 J / T.610333   15077 request.go:1068] Response Bods L ) = / + [ py: {"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apiregistration.k8s9 y x.io","versions":[{"groupr C _ 3 o { 3Version":"apiregistration.k8s.io/v1","version":"n = V ^v1"},{"s v ogroupVersion":"apiregistration.k8s.io/v1beta1"_ 6 j,"v_ @ 8 - X Qersion":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1w % R ( `","version":"v1"}},{"name":# G [ a"extensions","versions":[{"gi H troupVersion":"extensions/v1beta1","vQ r Kersion":"v1beta1"}],"preferredVersion"? ^ ` S:{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion": = B"apps/v1","version":"v1"}],"preferredVers~ v $ a 9 Rion":{"g8 Y ` O croupVersion":"apps/v1","vers! O s C n { c 7ion":"v1"}},{"name":"events.k8s.io","versions":[{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{o y ` 0"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}},{"name":"authentication.k8s.io","versions":[{"groupVersion":"authentication.k8s.io/v1",1 Q 1 3 X ! + {"version":"v1"},{"groupVersion":"authentication.k8s.io/v1b& Q x F 4 Leta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars]
      I1211 15:19:47.614700   15077 round_triw g u . k m l u Fppers.go:420] GET https://127.0.0.1:6443/apis/y s 8 j $ g ~ *batch/v1?timeout=32s
      I1211 15:19:47.614804   15077 round_trippers.go:420] GEY @ l p j 6T https://127.0.0.1:6443/aH W u 8pis/ M 2 W Iauthent@ = M P * yication.k8s.io/v1?timeout=32s
      I1211 15:19:47.615687   15077 round_tN l M L * &rippers.go:42F O S A H | @0] GET https://127.0.0.1:6443/apis/auth.tkestack.+ q Z m 7 7 f }io/v1?timeout=32s
      https://127.0.0.1:6443/ap[ + : o k 6 i his/authentication.k8s.io/v1beta1?timeout=32} 8 i is
      I1a s I Y211 15:19:47.616794   15077 round_trippers.go:420] GET https://127.0.i E _ & h ~ Y &0.1:6443/apis/coordo K W 2 ; uination.k8s.io/v1?timeout=32s
      I1211 15:19:47.616863   15077 round_trg [ A  W D 5 k %ippers.go:420] GET https://127.0.0.1:6443/apk f A } / # ) ois/apps/v1?timeout=32s
      .H 8 s ! #..
      NAME                              SHORTNAMES   APIGROUP                       NAMESPACED   KIND
      bindings                                                                      true         Binding
      endpoints                         ep                                          true         Endpoints
      eventX p Q V Cs                            ev                                          true         Event
      limitranges                       limits                                      true         LimitRp w % t 5 v A ] qange
      namespaces                        ns                                          false        Namespace
      nodes                             no                                          false        Node
      ...
      • namingController:检查 crd obj 中是否有命名冲突,可在 crd .statu[ g o C F y ! V hs.conditions 中查看;

      • establishingControl# e l (ler:检查 crd 是否处于正常状态,可在 crd .status.conditions 中查看;

      • nonStructuralSchemaQ / G k | ^Controlle0 , 2 s : ` d Hrq ^ 3 m ` 8 =检查 crd obj 结构是否正常,可在 crd .status.conditions 中查看;

      • apiApprob 1 4 = ] _ + VvalController:检查 crd 是否遵循 Kubernetes API 声明策略,可在 crd .status.conf W A ] .ditions 中查看;d # n A s ` e

      • finalizingO 1 F 7 M * JController:类似于 finalizes 的功能,与 CRs 的删除有关;
  • 总结CR CRUD APIS# 6erver处理逻n O u 9 a y b辑如下:

    • createAPIExtensionn M usServer=>NewCustomResourceDefinitionHanP X 4 G Rdler=>crdHandler=>注册CR CRUD API接口:
    // New returns a new instanc3 =  | Ie of CustomResourceDefinitions from the givU X R (en conp H % kfig.
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTargetR 7 N F) (*CustomResourceDefinitions, error) {
    ...
    crdHandl- @ { : R r {er, err := NewCustomResourceDefinitionHandler(
    versionDiscoveryHandler,
    groupDiscoveryHaR C I r q / U 2ndler,
    s.Informers.Apiextensions().V1().CustomRes K Y @ gourceDefini@ K A 8tions(),
    delegateHandler,
    c.E= ! `xtraConfig.CRDRESTOptionsGetter,
    c.Generi2 0 $ Z W ^cConfig.AdmissionControl,
    establishingController,
    c.ExtraConfig.ServiceResolver,
    c.ExtraConfig.AuthResolverWrapper,
    c.ExtraConfig.MasterCount,
    s.GenericAPIServer.Authorizer,
    c.GenericConfig.RequestTimeout,
    time.Duration(c.GenericConfig.MinRequestTimC ( X ( j * Aeout)*time.Second,
    apiGroupInfo.StaticOpen* k n 9 4 n X iAPISpec,
    c.GenericConfi6 1 s t ^ m Y Og.MaxReque) / / _ = ] | pstBodyBytes,
    )
    if err != nil {C d [ b e =
    return nil, err
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandlerC T ; = J)
    s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
    ...
    return s, ni~ w + ) rl
    }
    
    • crdHaX p T [ Undler处理逻[ $ 5H N 8 ) o ?如下:

    • 解析req(GET /apis/duyanx A O $ q ! { m Lghao.example.com/v1/namespaces/default/studeX | w |nts),根据请求路径中的group(| w E (duyangh% } i Q _ao.example.com),version(G & e h ^ {v1),以及resource字段(students)获取对应CRD内容(crd,* M e ) L l d 3 err := r.crdLister.Get(crdName))

    • 通过crd.UID以及crd.Name获取crdInfo,若不存在则创建对应的crdInfo(c9 z A U c x LrdInfo, err := r.getOrCreateSe) C ) ] Y ~ * 4 GrvingInfoFor(crd.UID, crd.Name))。crdInfo中包含了% O bCRD定义以及该CRD对应Custom ReP : 7 [ / fsource的customresource.REST storage

    • customresoR ^ ? ! . S } kurce.REST storage由; $ 6 h Q J M n 2CR对应的Group(duyanghao.exR , _ x h / , z }ample.com)M x % i w N m,Version(v1),Kind(Student),Resource(stud= I Aents)等创建完成,由于CR在N c ! M P LKubernetes代码中并没有具体结构体定义,所以这里会先初始化一个范b L . y T u 1 ` C型结构体Unstructured(用于保存所有类型的Custom Resource),并对该结构i 0 m V L体进行SetGroupVersionKind操作(设置具体CusZ Z %tom Resource Type)

    • 从customresource.REST storage获取Unstructured结构体后会对其进行相应转换然后返回

      // k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go:223
      func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
      ctx := req.Context()
      requestInfo, ok := apirequest.RequestInfoFrom(ctx)
      ...
      crdName := requestInfo.o U # ! y Y iResU  s ~ i F Aource + "." + reque% s m . TstInfo.APIGroup
      crd, err := r.crd A j } N g O BLister.Get(k ( GcrdNr { ( ame)
      ...
      crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
      verb := striS ] d / hngs.ToUpper(request; q 8 v z K E bInfo.Verb)
      resource := requestInfo.Resource
      subresourcs Y t [ ? Y W we := requestInfy W lo.Subresource
      scope := metrics.CleanScope(requestI* K ynfo)
      ...
      switch {
      case subresource == "status" && subresources != nil && subresources.Status != nil:
      hay U  w _ndlerFunc = r.serveStae . C # R  `tus(w, req, requQ E GestInfo, crdInfo, termik n g , $ U vnating, su; q * - S lpportedTypesu } | E d w)
      case subresource == "scale" &&| $ h 3 8 r z jamp; subresources != nil &amU . t J q 6 o 0 Hp;& subresources.S  ? / 4 ~ @ XScale != nil:
      handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
      case len(subresource) == 0:
      handlerFunc = r.serveResource(w, req, requestInfo, crdInfo,7 n )  w ` N # 7 terminating, supportedTypes)
      default:
      responsewriters.ErrorNegotiat2 e bed(
      apierrors.NewNotF C RFound(sc- a  whema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requ@ t ( k o m sestInfo.Name). g y [ + Z L,
      CoZ G [ %decs, schema.GroupVersion{Group: requestInC & 1 ; O .fo.APIGr6 8 N 3 ~ ] k Poup, Version: requestInfo.APIVersion}, w, req,
      )
      }
      if handlerFunc != nil {
      handlerFunc = metrics.InstrumentHandlerFunc(verb, rq w F K 9 ! i Vequesto . c  ) {Info.APIGroup,d L 6 . L  re; % ;questInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc)
      handler :=C  w 7 o genericff | Q 3 I : (ilters.WithWaitGroup(handlerFunc, longRunR % ; !ningFiltery ? ` ) . . l, crdInfg 9 c i (o.waitGroup)
      handler.Sj X ) B Y 8 LerveHTTP(w, req)
      return
      }
      }
      

更多代码原理详情,参考 kubernetes-reading-notes 。

ConcluQ x a bsion

本文从源码} b t F { H R U层面对Kubernetes apiserver进行了一个概览性总结,包括:aggregatorServer,kubeAPIServer,apiEx# 2 q I a 1tensionsServe; ! J ~ , Er以及bootstrap-controller等。通过阅读本文可以对apiserver内部原理有一M T q d * A &个大致的理解,另外也有助于后续深入研究) t B

Refs

  • kubernetes-reading-notes