前言
整个Kubernetes技术体系由声明式API以及Controller构成,而kube-apiserver是Kubernetes的声明式api server,并为其它组件交互提供了桥梁。因此加深对kube-apiserver的理解就显得至关重要了。
整体组件功能
kube-apt w ^ ` l ~ k ( Uiserver作为整个Kubernetes集群q B } J G ? X [操作etcd的唯一入口,负责Kubernetes各资源的认证&鉴权,校验以及CRUD等操作,提供RESTful APIs,供其它组件调用:
-
aggregatorServer:负责处理
apiregistration.k8s.io
组下的APIService资源请求,同时将来自用户的请求拦截转发给aggregN 8 y `ated server(AA) - kubeAPIServer:负责对请求的一些通用处理,包括:认证、鉴权以及各个内建资源(pod] T + r ! ` : m @, deployment,service anA A h ~ nd etc)的REST服务等
- apiExtensionsServer:负责CustomResourW 4 k 5 V 0 % n /ceDefinition(CRD)apiResources以及apiVersions的注册,6 3 8 5 C Y同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环
另外还包括bootstx S P ,rap-contro? . d j ] kller,主要负责Kubera | ^ s l f Znetes de: k ) $ E S *fault apiserver service的创建以及管理。
接下来将对上述组件进行概览性总结。
bootstrap-controller
- apiservI r _er bootstrap-controller创建&运行逻辑在k8s.io/kuberneteu G n Cs/pkg/master目录
- bootstrap-contrZ + D :oller主要用于创建以及维护内部kubernetesi | j / default apiserver service
- kubernetes default apiserver service spec.selector为空,这v ? m ~ 8 = A / Y是default apiserver service与其它正常service的最大区别,表明了这个特殊的service对应的endpoints不由endp@ I $ t Toints controller控制,而是直接受kube-apiserver bootstrap-X Z d S ucontroller管理(maintained by this code, not by the pod selector)
-
bootstrap-controller的几个主要功能如下:
- 创建 de= % ` i V Efault、kube-system 和 kube-public 以及 kubeo : X K 0-node-lease 命名空间
- 创建&维护k1 { l / z a n cubernetes default apiserk / & ` [ T 2ver service以及对应的endpoint
- 提供基于Service ClusterIP的检查及修复功能(
--service-cluster-ip-range
指定范围) - 提供基于Service NodePort的检查及修复功能(
--servi1 ! V Lce-node-port-range
指定范围)
/r ` o/ k8s.io/ks S N - gubernetes/pkg/master/controller.go:142
// Start begins the core controller loops that must exist for bootstrapping
// a clust2 U l rer.
func (c *Controller) Start() {
if c.runner != nil {
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createj i =EndpointPortSpec(c.PublicServicePortu r h `, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.RemoveEndpoints(kuY z # xbernetesServiceName, c.Pue % ( 8 *blicIP, endpoiu G 7ntPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernet= a |em $ R z B gs service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceCt { x + h Q _ wlusterIPRange, cx b C 7 k 8 , x W.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceC ) . 4 } 7 ` 2 ilusterIPRegism _ f Y $try)
repairNodePorts := portallocatorcontrolO 3 Z ` M g c ler.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.Serv6 i n O OiceNodePortRegistry)
// run all of the controll$ x m ? n Sers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
// If we fail to rb L G @ Z k /epair cluster IPs apiserverP m L d $ ir , =s useless. We should restart and retry.
klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
}
if err := repairW ; g 4 S GNodePorts.RunOnce(); err != nil {
// If we fail to repail z 7 D a wr node ports apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial service nodePoL r _ c C B h frm * n , ;ta / / ) ? t B 7 check: %v", err)
}
// 定期执行bootstrv G k ` ^ap controllt 2 #er主要的四个功能(r~ & Eeconciliation)
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.Runq Z - X S 4 _Until, repairNodePorts.RuZ 2 ` 9 `nUntil)
c.runner.Start()
}
更多代码原理详情,参考 kubernetes-reading-notes 。
kubeAPIServer
KubeAPIServers V = 2 %主要提供对内建API Resources的操作请求w O e A o W v,为Kubernetes中各API Resources注册路由信息,同时b S y k + Q l s暴露RESTful API,使集群中以及集群外的服务都可以通过RESTful API操作K0 r J o 6 q kubernetes中7 K k P b的资源
另外,kubeAPIServer是整个Kubernetes apib 4 p % |server的核心,下面将要讲述的aggregatorServer以及apiExj 6 utensionsServer都是建立在kubeAPIServer基础上进行扩展的(补充了Kub| J H ?ernetes对用户自定义资源的能力支持)
kubeR { K :APIServer最核心的功能是为Kubernetes~ ) 5 4 x 9 ~ K Y内置资源添加路由,如下:
- 调用
m.InstallLegacyAPI
将核心 API Reso6 w y * Z j ^ )u- O {rces添加到路由中,在apiserver中即是以/api
开头的 resource; - 调用
m.InstallAPIs
将扩展的 API Resources添加到路由中,在apiserver中即是3 7 p w _ : ` h o以/apis
开头的 resource;
// k8s.io/kubernetes/pkg/master/master.go:c q 9 , 8 i B332
// New returns a new instance of Master from the given config.
// Certain confil H i + B : g tg fields will bes Q , R set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (& i 8 y ! _ F W ^c completedConfig) New(delegatO } ? L lionTarget genericapiserver.DelegationTarget) (*Master, error) {
...
// 安装 LegacyAPI(core API)
// iv I m W l { . s 1nsta; C 0 I ; . Q ell lega$ ` n 4 T + 1cy rest storage
if c.ExtraConfig.APIResourceCr ! j 0 . `onfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
l# ~ ~ ^ ` v Y 4egacyRESTStorageProvider := corerest.LegacyRESTStorage[ ; e p F C wProvider{
StorageFactory: c.ExtraConfig.StorageFactor6 y f M R Ky,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfis t | : V G Z Yg: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.Ev, q W / , 1 0 4 5entTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraCo@ 3 U X b [ Pnfig.SecondaryServiceIPRange,
ServiceNodK A o _ = [ kePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.3 A [ 1 R )RESTOptionsGetter, legacyRESTSt0 I F F w m D |orageProvK ! B b E J ~ H Wideu r 8 8 # ^r); err != nil {
return nil, err
}
}
...
// 安装 APIs(named groups apY N ! { s ~ { ~ #is)
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStord g e g } 2 z & [ageProviders...); err != nil {
return nil, err
}
...
rJ f u . _ leturn m, nil
}
整个kubeAPIServer提供了三类API Resource接口:
- core group:主要在
/api/v1
下; - namE s p i r } ~ ) oed groups:其 p+ w M G e v - D gath 为
/apis/$GROUPI b E 1 / W 9 V/$VERSION
; - 系统状态的一M u B ~ V m F k C些 API:如
/metrics
、/version
等;
而API的URL大致以 /apis/{group}/{version}/namespaces/{namespace}/resource/{name}
组成,结构如下图所示:
k$ t % U 3 Z `ubeAPIServer会为每种API资源创建对应的RESTStoragd y x 6 w _e,RESTStorage的目的是将每种资源的访问路径及其后端存储的操作对应起来:通过构造的REST Storage实现的接口判断该资源可以执行哪些操作(如:create、update等),将Y L # = 8 ? k -其对应的操作存入到action中,每一个@ x L b ( -操作对应一个标准的REST method,如create对应REST method为POST,而update对应REST method为PUT。最终根据H b + ! }ac^ B A d 7tions数组依次遍历,对每一个操作添加一个handler(handler对应REST Storage实现的相关接口),并注册到route,最终对外提供RESTful API,如下:
// m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerq p _ ]ResourceHandlers
// k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandleT J 0 + ) 9 , ?rs(path string, st4 + Q a Z L r (orage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
...
// 1、判断该 resoB # ource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
// what verbs are supportes u w s A 4 I 3d by the storage, use! q v c l 8 ? z xd to know what verbs we support per path
createV m 6 w Hr, isCreater := storage.(rest.Create{ ~ @r)
n0 M I s r r 4 }amedCreat* * h k Q + Ser, isNamedCreater3 k | | P ] ` , H := storage.r b l C(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getD K + D e Y N |ter, isGetter := s- b w { 4 c 2torage.(rest.Getter)
...
// 2、为 resource 添加对应的 actions(+根据是否支持 namespace)
// Get tV 6 `he list of actions for the given scope.
switch {
case !namespaceScoped:
// Handle non-namespace scoped resources like nodes.
resourcePath := resou! ^ O l O a 0 ~ arce
resourceParams := params
itemPath := rN : |esourcePath + "/{name}"
nameParams := append(params, nameParam)
proxyParams := append(nameParamsG 1 W, pathParam)
...
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePathp - v h a, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST4 ; p [ O y", resourcePath, resourceParams, namer, false}, isCreater)
...
}
...
// 3W S Z & h y - 2 ]、从 rest.Storage 到 restful+ | s 2 1 /.RoY ~ ;uteQ B s 4 m 映射
// 为每个操作添加对应的 handler
for _, action := range actions {
. 9 & ?..
switch action.Verb {
...
case "POST": // Create a resource.
var handler restful.RouteFunction
// 4、初始化 handler
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResoj K v + G ` ^urce(creater, reqScope, admit)
}
handler = metrics.Instrument{ I y # / tRouteFunc(action.Verw ] m { _ . G L nb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
...
// 5、route 与 handler 进行绑定
route := ws.L C . y [ k jPOST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output iV h 6 ! C t ,s pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(suv R w E Y .bresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMQ W K n x + QETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedOb = [ @ O a E U rject).
// TODOo d Y ( d 5: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle mulL D o o Ktiple di L lifferent objects being returned.
Returns(http.Statu + K jsCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
ReadsY ] w 1 8 B B s(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, acti8 x l & eon.Params)
//s V o 6、添加到路由中
routes = appen, X L Vd(routes, route)
case "DELETE": // Delete a resource.
...
default:
return nil, fmt.Err? , 1 ) F Borf("unrecognized action verb: %s", acX m x ~ % D x b dtg C q $ 5 d E /ion.Verb)
}
for _, route := range routes {
route.Metada* ] ~ h d @ta(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScopS x b k a Be.Kind.Group,
Ver3 e a + ` } V 6 Csion: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, string? [ ws.ToLower(action.Verb))
ws.Route(route)
}
// Note: update GetAut| O Y U ihorizerAttributes()^ M 7 when adding a custo# ~ } J L { i }m handler0 ; s { ? L ` R ..
}
...
}
kube_ 1 6 N 8 #APIServer代码结构整理如下:
1. apiserver整体启动逻辑 k8s.io/kubernetes/cmd/kube-apiserver
2. apiserver bootstrap-controller创建&运行逻辑 k8sT r T 1 X % n N.io/kubernetes/pkg/master
3. API Resc g Z D ] L B C xourcy N ` n U =e对应后端RESTStorage(based on ger | ( e ;nericregistry.| . g s V u j n HStore)创建k8s.io/kubernetes/pkg/registry
4. aggregated-apil ~ B M !server创建&处理逻辑 k8s.io/kubernetes/staging/[ = ! R L ? l {src/k8s.io/kube-aggregator
5. extensions-apiserver创建&处理逻辑 k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver
6. apiserver创建&运行 k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/N g e b N & D ,server
7. 注册API Resource资源处理handler(InstallREST&InstallisterResourceHandlers) k8s.io/kubernetes/staging// Y # & ^ Rsrc/k8s.io/apiserver/pkg/endpoints
8. 创建存储后端(etcdv3) k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage
9. geneL = e - X X Trl t : Zicregistry.Store.CompleteWithOptions初始化 k86 X As.io/kuberneteN + ; ) Us/staging/src/ke [ c8s.io/apiserver/pkg/registry
调用链整理如下:
更多代码原理详情,参考 kubernetes-reading-notes 。
aggregaW | PtorServer
aggregatorServer# y ^主要用于处理扩展Kubernetes AR r [ # HPI Resources的第二种方式Aggregatb 5 o T 8 1 P ? Aed APIServer(AA),将CR请求代理给AA:
这里结合Kubernetes官方给出的aggregated apiserver例子sample-apiserver,总结原理如下:
-
aggregav M . p / b PtorServer{ e m t K通过APIServices对象关联到某个Service来进行请求的转发,其关联的Service类型进一步决定了请求转发的形式。aggregatorServer包括一个
GenericAPIServer
和维护自身状态的Ci , } V q )ontroller
。其中GenericAPIServerL _ y
主要处理apiregistration.k8s.io
组下的APIService资源请u x | s 0求,而Controller包括:-
apiserviceRegistrationController
:U & : D u x负责根据APISer& y @vice定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server^ { O % N f j -
availableConditi m r N ] GionController
:维护 APIServices 的可用状态,包括其引用 Service 是否可用等; -
autoRegistrationController
:用于保持 API 中存在的一组特定的 APIServices;{ J Z -
crdRegistrationController
:负责将 CRD GroupVersions 自动注册到 APIServices 中; -
openAPIAggregationController
:将 APIServices 资源的变化同步至提供的 OpenAPI 文档;
-
-
apiservi9 h Z rc9 x K H E o # ` beRegistrationController负责根据APIService定义的aggregated server service构建代理,将CR的请求转发给后端的aggregated server。apiService有两种类型:Local(Service为空)以及Service(Service非空)。apiserviceRegistrationController负责对这两种类型apiService设置代理:o h 4 ) d # SLocal类型会直接路由给kube-apiserver进行处理;而Service类型则会设置代理并将请求转化为对aggregated Service的请求(proxyPath := "/apis/" + apiService.Spec.Group + "/"0 ( o A Y ! z | + apiService.Spec.Versionf u s 8 f 3 J),而请求的负载均衡策略p Y H R a | F 7则是优先本地访问kube-apiserver(如果service为kubernel g ! U | y j etes default apis8 @ . _ = L Y `erver service:443)=&i ^ F w 5 f ~gt;通过sG ) i K M :ervice ClusterIP:Port访问(默认) 或者 通过随机选择servi ~ wce endpoint backend进行访问:
func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error { ... proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.s ` [ O 3 ? _ eVersion // v1. is a special case for thq V (e legacy API. It proxies to a wider set of endpoints. if apiService.Name == legacyAPIServiceName { proxyPath = "/api" } // register the proxy? ` o l ^ handlg ` [ L ~ T uer proxyHandler := &proxyHandler{ localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKO 0 H ^ q o Vev I ) f Uy: s.proxyClientKey, proxyTransport: s.proxyTransport, serviceResolver: s.serviceResoc R N 9 w K T ~ zlver, egressSelector: s.egressSelector, } ... K # ! 6 : / a s.proxyHandlers[apiService.Name] = proxyHandler s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandlerC y 9 { n l ~ m) sa i w { h.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler) ... // it's time to regism r t I h % 0ter the gI J 9 = | qroup aggregation endpoint groupPath := "/apis/" + apiService.Spec.Group groupDiscover : ( 9yHandler := &apiGroupHandler{ codecs: aggregatorscO ] w 7 b @heme.Codecs, grj Y y G t [ e W [oupName: apiService.Spec.Group, lister: s.lister, delegate: s.delegateHandler, } // aggregation is protected s.GenericAPIServer.Handler : 3 k 0 { 9 rr.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler) s.Generic` 9 ^ + P TAPIServer.Handl1 | . . y ! Y k 0er.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler) s.A l E K !handledGroups.Insert(apiService.Spec.Group) return nil } // k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go:109 func (r *proxyHandle# 5 d 1 1 T {r) ServeHTTP^ d m(w http.ResponseWritL s [ l R , / ? Qer, req *htt% p { | ! ~p.Request) { /S = n c c 0 w J/ 加载roxyHandlingInfo处理请求 value := r.handlingP I XInfo.Load() if value == nil { r.localDelegate.ServeHTTP(w, req) return } handlingInfo := value.(proxyHandlingInfo) ... // 判断APIService服务是否] W Z $正常 if !handlingInfo.servS y = J L T 3 b wiceAvailable { proxyError(w, req, "service uN L 7 - VnavQ 9 H iailabl- o i P | { n re", http.StatusServiceUnavailable) return } // 将原始请求转化为对APIService的请求 // write a new location based on the existing request pointed at the targ 7 Y z Qet service location := &url.U[ r g H $ kRL{} location.Scheme = "https" rloc, err := r.serviceResolveZ d / Ar.ResolveEndpoint(handlinS P ^ .gInfo.Y O : t GserviceNamespace, handlingInfo.serviceName, handlingInfo.service^ P 1 v Q KPort) if err != nil { klogJ D ? h { w D Y J.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, ha* 0 8ndlingInfo.serviceName, err) proxyError(w, req, "service unavailable", http.StatusServiceUnaK - @ 3 C |vailable) return } location.Host = rloc.Host location.Path = req.URLw ! U w s %.Path locC P 7 F vation.RawQuery = req.URL.Query(@ x z p p [ % q).Ence x 5ode() newReq, cancelFn := newRequestForProxy(loE S ,cation, req) defer cancelO 7 m 4 1 E f +Fn() ... prm t 7 A uoxyRoundTripper = t0 g x ] e Q !ransport.NewAuthProxyRoundTri+ j + x L V S [ /pper(useM m m mr.GetName(), user.GetGroups(), user.GetExtra(), proxyRo+ % 5 DundTripper) handler := proxy.& : [ D { J ;NewUpgradeAwareHandler(location, proxyRoundTripper, true, upgrade, &responder{w: w}) handler.ServeHTTP(w, newReq) }
$ kubectl get APIService NAME SERVICE AVAILABLE AGE ... v1.apps Local True 50d ... v1beta1.metrics.k8s.io kube-system/metrics-ser] $ a G Q x 0 D Jver True 50d ...
# default APIServices $ kubectl@ 4 Z @ get -o yaml APIServ9 G % $ 8 I xice/v1.apps apiVersion: apiregistration.k8s.io/v1 kind: APIServicR W ^ a P V & $e metadata: label{ } k 4 b - [ 2s: kube-agJ E ` !gregator.kubernetes.io/automanaged: onstart name: v1.apps selfLink: /apC ) z G ( ( c kis/apb y , xiregistration.k8s.io/v1/apisD 0 _ t Lervices/v1.apps spec:X o u j ( & group: apps groupPriorityMinimuma h C 2 J 5 ~ c ^: 17800 version: v1 versionPriority: 1P Z * j , , o c5c e | R 9 T status: conditions: - lastTransitionTime: "2020-10-20T10:39:48Z" messaged D e l: Local APIServices are always available reason: Local status: "True" type: Available # aggreg* r [ 6 r 9 uated ser8 5 *ver $ kubectl get -o yo X U H c eaml APIService/v1beta1.metrics.k8s.io apiV- z h 5 Eersion8 = B a R: apiregistration.k8s.io/v1 kind: APIService meY ! 5 4 / Y Xtadata: labels: addy { m b } uonmanager.kubernetes.io/mode: Reconcile kubernetl | . Hes.io/cluster-service: "true" name: v1beta1.metrics.k8s.io selfLink: /apis/apiregistration.k8s.io/v1/apiservicV r Aes/v1b0 3 | m k u aeta1.metrics.k8s.io spec: group: metrics.k8s.io groupPrioriV u : C T ? ~ ptyMinimum: 100 insecureSkipTW W ! ^ H WLSVerify:w X = P ^ J f = true service: name: metrics-server namespace: kube-system port: 443 version: v1beta1 versionPriority: 100 status: conditions: - lastTransitionTime: "2020-12-05T00:50:48Z" message: all ch? 1 d m 2 B uecks passed reason: Passed status: "True" type: Available # CRD $ kub) H &ectl get -o yaml APISe@ H ] k ` vrvice/v1.duyanghi ( 0 p 3 v Aao.example.com apiVersion: apiregistration.k8s.io/v1 kind: APIService metadata: labels: kube-aggregator.kubernetes.io/automanaged: "true" name: v1.duyanghao.example.com selfLin_ _ # R g V / [ k: /apis/apiregistration.k8s.io/v1/apiservices/v1.duyanghao.exm F h w 8ample.com spec: group: duyanghao.example.com gx x O P yroupPriorityMinimum: 1000 version: v1 versionPriority: 100 status: conditions: - lastTransitionTime: "2020-12-11T08:45:37Z" messD P S 3 I s s $age: LocK r 4 [ 9al APIServices are always available reason: Local status: "True" type: Available
-
aggregatorServer创建过程中会根据所有kube-apiserver定义的API资源创建默认的APISeb 6 q h Arvice列表,名称即是
$VERSION.$GROUP
,这些APIService都会有标签kube-aggregator.kubernetes.io$ U z U h m 7 :/autoW 3 X ( ? ` - |managed: onstart
,例如:v1.apps a5 ) F t X %piService。autoRegistrationController创建并维护这些列表中的APIService,也即我们看到的LocalE I G 6 r G 4 . apiService;对于自定义的APIService(aggregated server),则不会对其进行处理 -
aggregated server实现CR(自定义API资源) 的CRUD API接口,并可以灵活选择后端存储,可以与core kube-x t = Lapiserver一起公用etcd,也可; 6 9 s m自己独立部署eJ M Z y L Z ztcd数据库或者其它数据库。aggregated server实现的CR API路径为:/apis/$GROUP/$VERSION,具体到sample apisB B ` w b o v ~erver为:/apis/wardle.example.com/v1alpha1,下面的资源类型有:flunders以及fischers
-
aggregated server通过部署APIService类型资源,service fields指向对应的aggregated ser! b Q b ? [ ) ?ver service实现与core kube-x S r bapiserver的集成与交互
-
saC F 6 6 P (mple-apiserver目录结构如下,可参考编写自己的aggregated server:
staging/src/k8s.io/sample-apiserver ├── artifacts │ ├── example │ │ ├── apiserv* ) F d u a $ G Uice.yaml ... ├── hack ├── main.go └── pkg ├── admission ├── apis ├── apiserver ├── cmd ├── gF ^ P M L jenerated │ ├── clientset │ │ └── versioned ... │ │ └── typed │ │r U A q └── wardle │ │ ├── v1alphb . : d 7 i ja1 │ │ └── v1beta1 │ ├── informers │ │ └── externalvS 9 -erv F # p M Tsions │ │ └── wardle │ │ ├── v1alpha1 │ │ └── v1beta1 │ ├── liT X Asters │ │ └── wardle │ │ ├── v1alpha1 │ │ └── v1bex F : / xta1 └── registry
- 其中,artifacts用于部署yaml示例
- hack目录存放自动脚~ + } Y _ k本(eg: update-codegen)
- main.go是aggj @ Dregated server启动入口;pkg/cmd负责启动aggregatedh 2 X B & serveO ( t ? 5r具体逻辑;pkG = ) } ` Z * 7g/apiserver用于aggregated server初始化以及路由注册
- pkg/apis负责相关CR的结构体定义,自动生成(update-codegen)
- pkg/admisss W & fion负责准入的相关代码
- pkg/generated负责生成访问CR的clientset,infoD x ^ 0 7rmers,以及listers
- pkg/registry目录负责CR相关的RESTStorage实现
更多代码原理详情,参考 kubernetes-reading-notes 。
apiExtensionsServer
apiExtensionsServer主要负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相* C 0 # ) / 0 y应CustomResource(CR)的REST请求(如果对应CR不能1 h [ z $ _被处理的话则会返回404)8 F n ! Q ` , J ~,也是apiserver Delegation的A + |最后一环
原理总结如下:
-
Custom Resource,简称CR,是Kubernetes自定义资源类型,与之相对应的就是Kuber@ ^ bnetes内置的各种资源类型,例如Pod、Service等。利用CR我们可以定义任8 M } H何想要的资源类型
-
C@ g t % ]RD通过yaml文件的形式向Kubernetes注册CR实现自定义api-resources,属于第二种扩展Kubernetes API资源的方式,也是普遍使用的一种
-
APIExtensionServer负责2 XCustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应Cusn H ~ n ! # i / _tomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环
-
crdRegistratioG K xnController
负责将CRD GroupVersions自动注册到APIServices中。具体逻辑为:枚举所有CRDs,然后根据CRD定义的crd.Spec.GrouA q d D P k R sp以及crd.Spec.Versions字段构建APIService,并添加到autoRegisterController.apiServicesToSync中,由autoRegisterController进行创建以及维护操作。这也是为什么创建完CRD后会产生对应的APIService对象 -
APIExtensionServer包含的controll= $ j N I s aer以及功能如下所示:
-
openapiController
:将 crd 资源的变化同步至提供的 Openv H # , oAPI 文档,可通/ s s j V u过访问/openapV 7 Yi/v2
进行查看; -
crdController
:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过kubectl api-versions
和kubectl api-resources
查看; -
kubectl api-versions
命令返回所有Kubernetes集群资源的版本信息(实际发出了两个请求,分别是https://127.0.0.1:6443/api
以及https://127.0.0.1:6443/apis
,并在最后将两个请求的返回结果进行了合并)
$ kubectl -v=8 api-versions I1211 11:44:50.276446 22493 loader.go:375] Config loaded from file: /root/.kube/config I1211 11:44:50.277005 22493 round_trippers.go:+ j + L I `420] GET htt0 | Z i . j G D Tps://127.) $ C ) R0.0.1:6443/api?timeout=32s ... I1211 11:44:50.290265 22493 request./ d n { 9go:1068] R3 W f M C _ )esponse Body: {"kin_ Y e ! +d":"APIVersions","versions":["v1"],"serverAddO ( v -ressByClientCIDRs":[{"clientCIDR":E ^ E D 3 w "0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]} I1211 11:44:50.293673 22493 round_trippers.go:420] G} v D 4 s )ET https://127.0.0.1:6443/apis?timeout=32s ... I1211 11:44:50.29i Y + ; @8360 22493 request.go:1068Y ? v a d 2] Response Body: {"kil T X I h ind":"APIGroupList","apiVersion":"v1","groups3 z - G D |":[{^ q L"name":"apiregistration.k8s.io"t ~ 9,"versions":[{"groupVersion":"apiregistraq s j ! & + c ( Vtion.k8s.io/v1","version":"vq * e S ] p ? & )1"},{"groupVersion":"apiregistration.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1","version":"v1f . } -"}},{"name":"extensm 4 O Y v qions","versions":[{"groupVersion":"extensions/v1beta1P M 1 p x / * i =","version":"v1beta1"}],"preX f / u : xferredVersion":{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion":"apps/v1","version":"v1"}],"preferredVersion":{"groupVersion":"apps/v1","version":"v1"}},{"name":"eventsa h # W _ X @ 3 W.k8s.io","versions":[{"groupVersion":"events.k8s.io/; J u . o r Mv1beta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"e& ! ! { q j M A ?vents.k8s.io/v15 u W R = P H +beta1","version":"v1beta1"}},{"name":"authentication.k8s.i6 B M W Xo4 . j _ @","versions":[{"groupVersion":"authentication.k8s.io/vS D K l g ; | Y1","version":"v1"},{"groupVersion":"3 k * 3authentication.k8s.io/v1beta1","version":"v1beta1"}C * O 3 c],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars] apiextensions.k8s.io/v1 apiextensions.k8s.io/v1beta1 apiregistration.k8s.io/v1 apiregistration.k8s.io/v1beta1 apps/v1 authentication^ C s S & Q.k8s.io/v1beta1 ... storage.k8s.io/v1 storage.k8s.io^ P q O/v1beta1 v1
-
kubectl api-resr A 0 g y & 1 Gources
命令就是先获取所有API版本信息,然后对每一个APIU K L i C *版本调用接T T f f I t _ i :口获W 6 X d ( G P 3 u取该版本下的所有API资源类型$ kubectl -y d d O v Pv=8 api-resources 5077 loader.1 i c 8 0go:375] Config loaded from file: /root/.kube/config I1211 15:19:47.593450 15077 round_trippers.go:420] GET https://127.0.0.1:6443/api?timeout=32s I1211 15:19:47.602273 15077 request.go:1068] Response Body: {"kind( } W":"APIVersions","versions":["v1"],"serverK U J eAddressByClientCIDRs":[{"clientCI$ * n BDR":"0.0.0.0/0","serverAddress":"x.x.x.x:6443"}]} I12( 6 h11 15:19:47.606279 15077 round_trippers.( , f s 9 u P ego:420] GET https://127.0.0.1:6443/apis?timeout=32s I1211 15:19:47* Y 3 J / T.610333 15077 request.go:1068] Response Bods L ) = / + [ py: {"kind":"APIGroupList","apiVersion":"v1","groups":[{"name":"apiregistration.k8s9 y x.io","versions":[{"groupr C _ 3 o { 3Version":"apiregistration.k8s.io/v1","version":"n = V ^v1"},{"s v ogroupVersion":"apiregistration.k8s.io/v1beta1"_ 6 j,"v_ @ 8 - X Qersion":"v1beta1"}],"preferredVersion":{"groupVersion":"apiregistration.k8s.io/v1w % R ( `","version":"v1"}},{"name":# G [ a"extensions","versions":[{"gi H troupVersion":"extensions/v1beta1","vQ r Kersion":"v1beta1"}],"preferredVersion"? ^ ` S:{"groupVersion":"extensions/v1beta1","version":"v1beta1"}},{"name":"apps","versions":[{"groupVersion": = B"apps/v1","version":"v1"}],"preferredVers~ v $ a 9 Rion":{"g8 Y ` O croupVersion":"apps/v1","vers! O s C n { c 7ion":"v1"}},{"name":"events.k8s.io","versions":[{"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}],"preferredVersion":{o y ` 0"groupVersion":"events.k8s.io/v1beta1","version":"v1beta1"}},{"name":"authentication.k8s.io","versions":[{"groupVersion":"authentication.k8s.io/v1",1 Q 1 3 X ! + {"version":"v1"},{"groupVersion":"authentication.k8s.io/v1b& Q x F 4 Leta1","version":"v1beta1"}],"preferredVersion":{"groupVersion":"authentication.k8s.io/v1"," [truncated 4985 chars] I1211 15:19:47.614700 15077 round_triw g u . k m l u Fppers.go:420] GET https://127.0.0.1:6443/apis/y s 8 j $ g ~ *batch/v1?timeout=32s I1211 15:19:47.614804 15077 round_trippers.go:420] GEY @ l p j 6T https://127.0.0.1:6443/aH W u 8pis/ M 2 W Iauthent@ = M P * yication.k8s.io/v1?timeout=32s I1211 15:19:47.615687 15077 round_tN l M L * &rippers.go:42F O S A H | @0] GET https://127.0.0.1:6443/apis/auth.tkestack.+ q Z m 7 7 f }io/v1?timeout=32s https://127.0.0.1:6443/ap[ + : o k 6 i his/authentication.k8s.io/v1beta1?timeout=32} 8 i is I1a s I Y211 15:19:47.616794 15077 round_trippers.go:420] GET https://127.0.i E _ & h ~ Y &0.1:6443/apis/coordo K W 2 ; uination.k8s.io/v1?timeout=32s I1211 15:19:47.616863 15077 round_trg [ A W D 5 k %ippers.go:420] GET https://127.0.0.1:6443/apk f A } / # ) ois/apps/v1?timeout=32s .H 8 s ! #.. NAME SHORTNAMES APIGROUP NAMESPACED KIND bindings true Binding endpoints ep true Endpoints eventX p Q V Cs ev true Event limitranges limits true LimitRp w % t 5 v A ] qange namespaces ns false Namespace nodes no false Node ...
-
namingController
:检查 crd obj 中是否有命名冲突,可在 crd.statu[ g o C F y ! V hs.conditions
中查看; -
establishingControl# e l (ler
:检查 crd 是否处于正常状态,可在 crd.status.conditions
中查看; -
nonStructuralSchemaQ / G k | ^Controlle0 , 2 s : ` d Hr
:q ^ 3 m ` 8 =检查 crd obj 结构是否正常,可在 crd.status.conditions
中查看; -
apiApprob 1 4 = ] _ + VvalController
:检查 crd 是否遵循 Kubernetes API 声明策略,可在 crd.status.conf W A ] .ditions
中查看;d # n A s ` e -
finalizingO 1 F 7 M * JController
:类似于 finalizes 的功能,与 CRs 的删除有关;
-
-
-
总结CR CRUD APIS# 6erver处理逻n O u 9 a y b辑如下:
- createAPIExtensionn M usServer=>NewCustomResourceDefinitionHanP X 4 G Rdler=>crdHandler=>注册CR CRUD API接口:
// New returns a new instanc3 = | Ie of CustomResourceDefinitions from the givU X R (en conp H % kfig. func (c completedConfig) New(delegationTarget genericapiserver.DelegationTargetR 7 N F) (*CustomResourceDefinitions, error) { ... crdHandl- @ { : R r {er, err := NewCustomResourceDefinitionHandler( versionDiscoveryHandler, groupDiscoveryHaR C I r q / U 2ndler, s.Informers.Apiextensions().V1().CustomRes K Y @ gourceDefini@ K A 8tions(), delegateHandler, c.E= ! `xtraConfig.CRDRESTOptionsGetter, c.Generi2 0 $ Z W ^cConfig.AdmissionControl, establishingController, c.ExtraConfig.ServiceResolver, c.ExtraConfig.AuthResolverWrapper, c.ExtraConfig.MasterCount, s.GenericAPIServer.Authorizer, c.GenericConfig.RequestTimeout, time.Duration(c.GenericConfig.MinRequestTimC ( X ( j * Aeout)*time.Second, apiGroupInfo.StaticOpen* k n 9 4 n X iAPISpec, c.GenericConfi6 1 s t ^ m Y Og.MaxReque) / / _ = ] | pstBodyBytes, ) if err != nil {C d [ b e = return nil, err } s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandlerC T ; = J) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) ... return s, ni~ w + ) rl }
-
crdHaX p T [ Undler处理逻[ $ 5辑H N 8 ) o ?如下:
-
解析req(GET /apis/duyanx A O $ q ! { m Lghao.example.com/v1/namespaces/default/studeX | w |nts),根据请求路径中的group(| w E (duyangh% } i Q _ao.example.com),version(G & e h ^ {v1),以及resource字段(students)获取对应CRD内容(crd,* M e ) L l d 3 err := r.crdLister.Get(crdName))
-
通过crd.UID以及crd.Name获取crdInfo,若不存在则创建对应的crdInfo(c9 z A U c x LrdInfo, err := r.getOrCreateSe) C ) ] Y ~ * 4 GrvingInfoFor(crd.UID, crd.Name))。crdInfo中包含了% O bCRD定义以及该CRD对应Custom ReP : 7 [ / fsource的customresource.REST storage
-
customresoR ^ ? ! . S } kurce.REST storage由; $ 6 h Q J M n 2CR对应的Group(duyanghao.exR , _ x h / , z }ample.com)M x % i w N m,Version(v1),Kind(Student),Resource(stud= I Aents)等创建完成,由于CR在N c ! M P LKubernetes代码中并没有具体结构体定义,所以这里会先初始化一个范b L . y T u 1 ` C型结构体Unstructured(用于保存所有类型的Custom Resource),并对该结构i 0 m V L体进行SetGroupVersionKind操作(设置具体CusZ Z %tom Resource Type)
-
从customresource.REST storage获取Unstructured结构体后会对其进行相应转换然后返回
// k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go:223 func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { ctx := req.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) ... crdName := requestInfo.o U # ! y Y iResU s ~ i F Aource + "." + reque% s m . TstInfo.APIGroup crd, err := r.crd A j } N g O BLister.Get(k ( GcrdNr { ( ame) ... crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name) verb := striS ] d / hngs.ToUpper(request; q 8 v z K E bInfo.Verb) resource := requestInfo.Resource subresourcs Y t [ ? Y W we := requestInfy W lo.Subresource scope := metrics.CleanScope(requestI* K ynfo) ... switch { case subresource == "status" && subresources != nil && subresources.Status != nil: hay U w _ndlerFunc = r.serveStae . C # R `tus(w, req, requQ E GestInfo, crdInfo, termik n g , $ U vnating, su; q * - S lpportedTypesu } | E d w) case subresource == "scale" &&| $ h 3 8 r z jamp; subresources != nil &amU . t J q 6 o 0 Hp;& subresources.S ? / 4 ~ @ XScale != nil: handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes) case len(subresource) == 0: handlerFunc = r.serveResource(w, req, requestInfo, crdInfo,7 n ) w ` N # 7 terminating, supportedTypes) default: responsewriters.ErrorNegotiat2 e bed( apierrors.NewNotF C RFound(sc- a whema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requ@ t ( k o m sestInfo.Name). g y [ + Z L, CoZ G [ %decs, schema.GroupVersion{Group: requestInC & 1 ; O .fo.APIGr6 8 N 3 ~ ] k Poup, Version: requestInfo.APIVersion}, w, req, ) } if handlerFunc != nil { handlerFunc = metrics.InstrumentHandlerFunc(verb, rq w F K 9 ! i Vequesto . c ) {Info.APIGroup,d L 6 . L re; % ;questInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handlerFunc) handler :=C w 7 o genericff | Q 3 I : (ilters.WithWaitGroup(handlerFunc, longRunR % ; !ningFiltery ? ` ) . . l, crdInfg 9 c i (o.waitGroup) handler.Sj X ) B Y 8 LerveHTTP(w, req) return } }
更多代码原理详情,参考 kubernetes-reading-notes 。
ConcluQ x a bsion
本文从源码} b t F { H R U层面对Kubernetes apiserver进行了一个概览性总结,包括:aggregatorServer,kubeAPIServer,apiEx# 2 q I a 1tensionsServe; ! J ~ , Er以及bootstrap-controller等。通过阅读本文可以对apiserver内部原理有一M T q d * A &个大致的理解,另外也有助于后续深入研究) t B
Refs
- kubernetes-reading-notes
发表评论