Kubernetes APIServer Storage 框架解析中,我们介绍了APIServer相关的存储框架,每个API资源,都有对应的REST store以及etcd store。在Kubernetes APIServer GenericAPIServer中介绍了GenericAPIServer的Handler是如何构建,API对象是如何以APIGroupInfo的形式注册进Handler中的。在Kubernetes APIServer 机制概述中简单介绍了APIServer的扩展机制,即Aggregator, APIExtensions以及KubeAPIServer这三者之间通过Delegation的方式实现了扩展。本篇文章就重点介绍下这三个”扩展对象”中的API对象资源是如何组织成APIGroupInfo的,然后怎么调用GenericAPIServer中暴露出来的安装方法进行安装的,最后盘点下当前版本的Kubernetes中,都有哪些API对象资源。

KubeAPIServer是Kubernetes内置的API对象所在的APIServer,而Aggregator和APIExtensions是Kubernetes API的两个扩展机制,对这两个扩展机制的介绍见官方文档APIExtensions就是CRD的实现,而Aggregator是一种高级扩展,可以让Kubernetes APIServer跟外部的APIServer进行联动,这三者中,每个都包含一个GenericAPIServer,先来看下这三个对应的结构体:

# kubernetes/pkg/master/master.go

// KubeAPIServer
type Master struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
}

# kube-aggregator/pkg/apiserver/apiserver.go

// Aggregator
type APIAggregator struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	delegateHandler http.Handler

	// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
	// this to confirm the proxy's identity
	proxyClientCert []byte
	proxyClientKey  []byte
	proxyTransport  *http.Transport

	// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
	proxyHandlers map[string]*proxyHandler
	// handledGroups are the groups that already have routes
	handledGroups sets.String

	......
}

# apiextensions-apiserver/pkg/apiserver/apiserver.go

// APIExtensions
type CustomResourceDefinitions struct {
	GenericAPIServer *genericapiserver.GenericAPIServer

	// provided for easier embedding
	Informers externalinformers.SharedInformerFactory
}

他们各自的API对象都是安装注册到各自的GenericAPIServer中的,除了Master中Kubernetes API内置的像pods, services这些API对象外,APIAggregator和CustomResourceDefinitions也都内置了各自的API对象,不过这些API对象也都是为了本身的扩展而设计的,APIAggregator中内置的API对象叫做apiservices,所属的组为apiregistration.k8s.io,每一个外部的APIServer都抽象为这个apiservices,注册到APIAggregator中,而apiextensions中内置的API对象就叫做customresourcedefinations,所属的组为apiextensions.k8s.io,这就是我们常说的CRD了,每一个自定义的资源,都抽象为一个CRD。

注意,这里面的名词,KubeAPIServer和Master对应,Aggretator和APIAggregator对应,APIExtensions和CustomResourceDefinitions对应,前者是在代码中他们各自的GenericAPIServer的name,而后者是对应的结构体的名字。

实例化上面的三个结构体,就是在Kubernetes APIServer 机制概述介绍的CreateServerChain()阶段做的,通过Config->Complete->New模式被初始化出来,核心的逻辑,在New()方法中,我们重点关注下其中的安装API对象的逻辑,来分别看下。

KubeAPIServer

KubeAPIServer中内置的对象分为两类,一类是Legacy的,是早期设计的API,那时候还没有分组的设计,它里面API对象的前缀统一是这样的: /api/v1,像pods, services, nodes都属于这一类,路径中不带组信息,一般我们称他们为core/legacy组,另一类就是有分组设计的了,它里面API对象的前缀都是带组和版本信息的: /apis/$GROUP_NAME/$VERSION,像deployments, daemonsets都属于这一类的,这种我们称之为named group。这两种API,在Master的New()方法中,有不同的组织方式:


# kubernetes/pkg/master/master.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {

    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)

    ......

    m := &Master{
	    GenericAPIServer:          s,
	    ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
	}
    ......

    // install legacy rest storage
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
    	legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
    		StorageFactory:              c.ExtraConfig.StorageFactory,
    		ProxyTransport:              c.ExtraConfig.ProxyTransport,
    		KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
    		EventTTL:                    c.ExtraConfig.EventTTL,
    		ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
    		SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
    		ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
    		LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
    		ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
    		ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
    		ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
    		APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
    	}
    	if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
    		return nil, err
    	}
    }

    // The order here is preserved in discovery.
    restStorageProviders := []RESTStorageProvider{
    	authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
    	authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
    	autoscalingrest.RESTStorageProvider{},
    	batchrest.RESTStorageProvider{},
    	certificatesrest.RESTStorageProvider{},
    	coordinationrest.RESTStorageProvider{},
    	discoveryrest.StorageProvider{},
    	extensionsrest.RESTStorageProvider{},
    	networkingrest.RESTStorageProvider{},
    	noderest.RESTStorageProvider{},
    	policyrest.RESTStorageProvider{},
    	rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
    	schedulingrest.RESTStorageProvider{},
    	settingsrest.RESTStorageProvider{},
    	storagerest.RESTStorageProvider{},
    	flowcontrolrest.RESTStorageProvider{},
    	// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
    	// See https://github.com/kubernetes/kubernetes/issues/42392
    	appsrest.StorageProvider{},
    	admissionregistrationrest.RESTStorageProvider{},
    	eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
    }
    if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
    	return nil, err
    }

    ......
}

可以看到,针对每个group,构造了一个RESTStorageProvider结构体,然后调用InstallLegacyAPI()和InstallAPIs(),将RESTStorageProvider当做参数传进去,进行安装,来看下这两个安装API的方法:

# kubernetes/pkg/master/master.go

func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

    ......

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
    	return fmt.Errorf("error in registering group versions: %v", err)
    }
    return nil
}

func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
    apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

    ......

    for _, restStorageBuilder := range restStorageProviders {
    	groupName := restStorageBuilder.GroupName()
    	apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
    	apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
    }

    ......

    if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
    	return fmt.Errorf("error in registering group versions: %v", err)
    }
    return nil
}

可以看到,通过RESTStorageProviderNewLegacyRESTStorage(), NewRESTStorage()构造出APIGroupInfo,然后分别调用了GenericAPIServer的暴露的InstallLegacyAPIGroup()InstallAPIGroups()方法进行安装注册。下面我们来看下这个APIGroupInfo是如何构建出来的,以pod为例:


# kubernetes/pkg/registry/core/rest/storage_core.go

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
    	PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
    	VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
    	Scheme:                       legacyscheme.Scheme,
    	ParameterCodec:               legacyscheme.ParameterCodec,
    	NegotiatedSerializer:         legacyscheme.Codecs,
    }

    ......

    podStorage, err := podstore.NewStorage(
        restOptionsGetter,
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )

    ......

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach
    }

    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
    return restStorage, apiGroupInfo, nil
)

可以看到在这里面首先构造了一个APIGroupInfo结构体,然后通过NewStorage()构造了pod相关的REST store,然后将其放到一个restStorageMap里面,然后将这个map存储到VersionedResourcesStorageMap中。restStorageMap中存储的不仅仅是pod的REST store,还有诸如services, nodes这些legacy类型的API对象对应的REST store,都是通过他们对应的NewStorage()方法构建出来,然后放到这个map里面。

再来看一个named group中的API对象,以apps组中的对象为例:


# kubernetes/pkg/registry/apps/rest/storage_apps.go

func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
	// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.
	// TODO refactor the plumbing to provide the information in the APIGroupInfo

	if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
		storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
		if err != nil {
			return genericapiserver.APIGroupInfo{}, false, err
		}
		apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
	}

    return apiGroupInfo, true, nil
}

func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
    storage := map[string]rest.Storage{}

    // deployments
    deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
    if err != nil {
    	return storage, err
    }
    storage["deployments"] = deploymentStorage.Deployment
    storage["deployments/status"] = deploymentStorage.Status
    storage["deployments/scale"] = deploymentStorage.Scale

    ......
}

NewStorage()方法的细节,这里就不介绍了,主要是构建对应API对象资源的REST store,在Kubernetes APIServer Storage 框架解析中介绍REST store的上层应用时有介绍过。

Aggregator

在Aggregator的New()方法中,也有类似上面的逻辑:


# kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {

    ......

    genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)

    ......

    s := &APIAggregator{
	    GenericAPIServer:         genericServer,
	    delegateHandler:          delegationTarget.UnprotectedHandler(),
	    proxyClientCert:          c.ExtraConfig.ProxyClientCert,
	    proxyClientKey:           c.ExtraConfig.ProxyClientKey,
	    proxyTransport:           c.ExtraConfig.ProxyTransport,
	    proxyHandlers:            map[string]*proxyHandler{},
	    handledGroups:            sets.String{},
	    lister:                   informerFactory.Apiregistration().V1().APIServices().Lister(),
	    APIRegistrationInformers: informerFactory,
	    serviceResolver:          c.ExtraConfig.ServiceResolver,
	    openAPIConfig:            openAPIConfig,
	    egressSelector:           c.GenericConfig.EgressSelector,
    }

    ......

    apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)

    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
    ......
}

# kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go

func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) genericapiserver.APIGroupInfo {
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)

	if apiResourceConfigSource.VersionEnabled(v1beta1.SchemeGroupVersion) {
		storage := map[string]rest.Storage{}
		apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
		storage["apiservices"] = apiServiceREST
		storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
		apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
	}

	if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
		storage := map[string]rest.Storage{}
		apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
		storage["apiservices"] = apiServiceREST
		storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
	}

	return apiGroupInfo
}

Aggregator中,就只有apiservices这一个API对象资源,并且有v1v1beta1两个版本。

APIExtensions

再来看看APIExtensions的New()方法,也是类似的:

# kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
    s := &CustomResourceDefinitions{
        GenericAPIServer: genericServer,
    }

    apiResourceConfig := c.GenericConfig.MergedResourceConfig
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
        storage := map[string]rest.Storage{}
        // customresourcedefinitions
        customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
        if err != nil {
            return nil, err
        }
        storage["customresourcedefinitions"] = customResourceDefinitionStorage
        storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

        apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
    }
    if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
        storage := map[string]rest.Storage{}
        // customresourcedefinitions
        customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
        if err != nil {
            return nil, err
        }
        storage["customresourcedefinitions"] = customResourceDefintionStorage
        storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

        apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
    }

    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }

}

APIExtensions中也只定义了customresourcedefinitions这一个资源,并且有v1和v1beta1两个版本。

总结

以上,分别介绍了KubeAPIServer, Aggregator和APIExtensions中各自的APIGroupInfo是如何构建的,如何调用到GenericAPIServer中的安装方法进行安装的,可以看到,不同版本的API对象,其实是分别构建了其REST store,即在数据库中独立存储的。下面来盘点下按照上述方式,看Kubernetes API中,都内置了哪些对象,当前Kubernetes最新的版本为1.19.0

apiextensions-apiserver
* resources
    * /apis/apiextensions.k8s.io/
        * customresourcedefinations  -> GoRestfulContainer
        * customresourcedefinations/status -> GoRestfulContainer
        * /apis  -> NonGoRestfulMux  -> crdHandler  // Handle()
        * /apis/  -> NonGoRestfulMux  -> crdHandler  // HandlePrefix(),CRD定义的自定义资源的CRUD操作都在这个Handler中操作

apiserver
* resources
    * /api/v1  -> GoRestfulContainer
        * pods
        * pods/attach
        * pods/status
        * pods/log
        * pods/exec
        * pods/portforward
        * pods/proxy
        * pods/binding
        * pods/eviction
        * pods/ephemeralcontainers
        * bindings
        * podTemplates
        * replicationControllers
        * replicationControllers/status
        * replicationControllers/scale
        * services
        * services/proxy
        * services/status
        * endpoints
        * nodes
        * nodes/status
        * nodes/proxy
        * events
        * limitRanges
        * resourceQuotas
        * resourceQuotas/status
        * namespaces
        * namespaces/status
        * namespaces/finalize
        * secrets
        * serviceAccounts
        * serviceAccounts/token
        * persistentVolumes
        * persistentVolumes/status
        * persistentVolumeClaims
        * persistentVolumeClaims/status
        * configMaps
        * componentStatuses
    * /apis  -> GoRestfulContainer
        * authentication.k8s.io
            * tokenreviews
        * authorization.k8s.io
            * subjectaccessreviews
            * selfsubjectaccessreviews
            * localsubjectaccessreviews
            * selfsubjectrulesreviews
        * autoscaling
            * horizontalpodautoscalers
            * horizontalpodautoscalers/status
        * batch
            * v1
                * jobs
                * jobs/status
            * v1beta1
                * cronjobs
                * cronjobs/status
            * v2alpha1
                * cronjobs
                * cronjobs/status
        * certificates.k8s.io
            * certificatesigningrequests
            * certificatesigningrequests/status
            * certificatesigningrequests/approval
        * coordination.k8s.io
            * leases
        * discovery.k8s.io
            * endpointslices
        * extensions
            * v1beta1
                * ingresses
                * ingresses/status
        * networking.k8s.io
            * v1
                * networkpolicies
            * v1beta1
                * ingresses
                * ingresses/status
                * ingressclasses
        * node.k8s.io
            * v1alpha1
                * runtimeclasses
            * v1beta1
                * runtimeclasses
        * policy
            * v1beta1
                * poddisruptionbudgets
                * poddisruptionbudgets/status
                * podsecuritypolicies
        * rbac.authorization.k8s.io
            * roles
            * rolebindings
            * clusterroles
            * clusterrolebindings
        * scheduling.k8s.io
            * priorityclasses
        * settings.k8s.io
            * podpresets
        * storage.k8s.io
            * v1alpha1
                * volumeattachments
            * v1beta1
                * storageclasses
                * volumeattachments
                * csinodes
                * csidrivers
            * v1
                * storageclasses
                * volumeattachments
                * volumeattachments/status
                * csinodes
                * csidrivers
        * flowcontrol.apiserver.k8s.io
            * flowschemas
            * flowschemas/status
            * prioritylevelconfigurations
            * prioritylevelconfigurations/status
        * apps
            * deployments
            * deployments/status
            * deployments/scale
            * statefulsets
            * statefulsets/status
            * statefulsets/scale
            * daemonsets
            * daemonsets/status
            * replicasets
            * replicasets/status
            * replicasets/scale
            * controllerrevisions
        * admissionregistration.k8s.io
            * validatingwebhookconfigurations
            * mutatingwebhookconfigurations
        * events.k8s.io
            * events

aggregator
* resources
    * /apis  -> GoRestfulContainer
        * apiregistration.k8s.io
            * apiservices
            * apiservices/status
    * /apis  -> apisHandler  -> NonGoRestfulMux
    * /apis/  -> apisHandler  -> NonGoRestfulMux
    * "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version -> proxyHandler -> NonGoRestfulMux
        * 在该proxyHandler中,最终将请求proxy给extension-apiserver
        * 在apiservice-registration-controller poststarthook中通过AddAPIService在添加APIService时,注册进proxyHandler中
    * "/apis/" + apiService.Spec.Group -> groupDiscoveryHandler -> NonGoRestfulMux

可以看到当前版本的Kubernetes API已经非常丰富了,Group就有19个之多,以后内置的API对象肯定还会不断添加,再结合CRD和Aggregator进行扩展,这云原生的头把交椅真不是盖的。