Kubernetes APIServer 机制概述中我们介绍到了APIServer的本质其实是一个实现了RESTful API的WebServer,它使用golang的net/http的Server构建,并且Handler是其中非常重要的概念,此外,又简单介绍了APIServer的扩展机制,即Aggregator, APIExtensions以及KubeAPIServer这三者之间通过Delegation的方式实现了扩展。

Kubernetes APIServer Storage 框架解析中,我们介绍了APIServer相关的存储框架,每个API对象,都有对应的REST store以及etcd store

而本篇文章介绍到的GenericAPIServer跟上面两个内容紧密相关,它是APIServer的基础,Aggregator, APIExtensions以及KubeAPIServer每个都包含一个GenericAPIServer,他们各自的API对象都以Group的形式,注册进GenericAPIServer中,并且组织成最终的Handler,然后结合net/http Server,将APIServer运行起来,因此,掌握GenericAPIServer有助于理解APIServer的扩展机制以及运行原理,本篇文章重点介绍GenericAPIServer的以下四方面内容:

  • Handler的构建
  • API对象的注册
  • Handler的处理
  • PostStartHook

其相关的代码在apiserver库中的apiserver/pkg/server/目录下。

基础知识

APIGroupInfo

在API对象进行注册时,都被组织成APIGroupInfo的结构体形式,这里面包含了所有注册需要的信息,其定义如下:


# apiserver/pkg/server/genericapiserver.go

type APIGroupInfo struct {
    PrioritizedVersions []schema.GroupVersion

    // Info about the resources in this group. It's a map from version to resource to the storage.
    VersionedResourcesStorageMap map[string]map[string]rest.Storage

    OptionsExternalVersion *schema.GroupVersion
    MetaGroupVersion *schema.GroupVersion

    Scheme *runtime.Scheme
    NegotiatedSerializer runtime.NegotiatedSerializer
    ParameterCodec runtime.ParameterCodec

    StaticOpenAPISpec *spec.Swagger
}

这里面最关键的信息就是VersionedResourcesStorageMap这个属性,如注释所说,它是一个从version映射到resource,然后再从resource映射到rest storage的两层映射,比如batch组中的cronjobs资源,有v1beta1v2alpha1两个版本,则在该map中,则分别有两个映射,v1beta1 -> cronjobs -> cronjobs rest storage,以及v2alpha1 -> cronjobs -> cronjobs rest storage,这里说的rest storage就是之前介绍到的rest store,每一个版本的API对象,都会有自己的一个rest store,从这里就可以看到,Kubernetes对多版本是如何管理的,本质上,它把不同版本的API对象,其实当成不同的对象,有自己独立的存储。

go-restful

Kubernetes使用go-restful这个第三方库对核心的RESTful API进行了实现,其基础知识以及在Kubernetes中的用法,可阅读这篇文章:go-restful简析,这里不再介绍。

NonGoRestfulMux

NonGoRestfulMux是Kubernetes APIServer中非核心API使用的RESTful框架,因为没有使用go-restful实现,因此称之为NonGoRestfulMux,其详细介绍,见Kubernetes APIServer NonGoRestfulMux

Handler的构建

这里说的Handler指的是最终net/http Server要运行的Handler,它在GenericAPIServer中被构建出来,首先我们来看下GenericAPIServer的结构体:

# apiserver/pkg/server/genericapiserver.go

type GenericAPIServer struct {
    // SecureServingInfo holds configuration of the TLS server.
    SecureServingInfo *SecureServingInfo

    // "Outputs"
    // Handler holds the handlers being used by this API server
    Handler *APIServerHandler

    // delegationTarget is the next delegate in the chain. This is never nil.
    delegationTarget DelegationTarget

    ......
}

一个GenericAPIServer包含的信息非常的多,上面结构体并没有列出全部属性,在这里我们只关注几个重点信息就行:

  • SecureServingInfo *SecureServingInfo: 这里面包含的是运行APIServer需要的TLS相关的信息
  • Handler *APIServerHandler: 这个就是要运行APIServer需要使用到的Handler,各个API对象向APIServer中注册,说的就是向Handler注册,它是最重要的信息
  • delegationTarget DelegationTarget: 这个是扩展机制中用到的,指定该GenericAPIServer的delegation是谁

再来看下Handler *APIServerHandler的结构体信息:

# apiserver/pkg/server/handler.go

type APIServerHandler struct {
	// FullHandlerChain is the one that is eventually served with.  It should include the full filter
	// chain and then call the Director.
	FullHandlerChain http.Handler
	// The registered APIs.  InstallAPIs uses this.  Other servers probably shouldn't access this directly.
	GoRestfulContainer *restful.Container
	// NonGoRestfulMux is the final HTTP handler in the chain.
	// It comes after all filters and the API handling
	// This is where other servers can attach handler to various parts of the chain.
	NonGoRestfulMux *mux.PathRecorderMux

	// Director is here so that we can properly handle fall through and proxy cases.
	// This looks a bit bonkers, but here's what's happening.  We need to have /apis handling registered in gorestful in order to have
	// swagger generated for compatibility.  Doing that with `/apis` as a webservice, means that it forcibly 404s (no defaulting allowed)
	// all requests which are not /apis or /apis/.  We need those calls to fall through behind goresful for proper delegation.  Trying to
	// register for a pattern which includes everything behind it doesn't work because gorestful negotiates for verbs and content encoding
	// and all those things go crazy when gorestful really just needs to pass through.  In addition, openapi enforces unique verb constraints
	// which we don't fit into and it still muddies up swagger.  Trying to switch the webservices into a route doesn't work because the
	//  containing webservice faces all the same problems listed above.
	// This leads to the crazy thing done here.  Our mux does what we need, so we'll place it in front of gorestful.  It will introspect to
	// decide if the route is likely to be handled by goresful and route there if needed.  Otherwise, it goes to PostGoRestful mux in
	// order to handle "normal" paths and delegation. Hopefully no API consumers will ever have to deal with this level of detail.  I think
	// we should consider completely removing gorestful.
	// Other servers should only use this opaquely to delegate to an API server.
	Director http.Handler
}

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

可以看到,APIServerHandler中包含一个go-restful构建出来的Container,GoRestfulContainer,以及一个PathRecorderMux构建出来的NonGoRestfulMux,注意,他们都是指针类型的,此外还有一个FullHandlerChain以及Director,都是对一个director结构体的引用,来看看这个结构体:

# apiserver/pkg/server/handler.go

type director struct {
	name               string
	goRestfulContainer *restful.Container
	nonGoRestfulMux    *mux.PathRecorderMux
}

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    ......
}

它里面又包含了goRestfulContainernonGoRestfulMux,但是注意他们也是以指针的形式作为成员变量的,并且该director还实现了ServeHTTP()方法,即director还是一个Handler。上面的APIServerHandler中也包含了GoRestfulContainer, NonGoRestfulMux的指针类型的成员变量,他们指针指向的其实是同一个实体,即在NewAPIServerHandler()方法中New出来的实体。为什么在APIServerHandler中已经有这两个变量了,还要再单独生成一个director结构体来引用这两个变量,其实这跟他们的用法有关,下面会讲到。

现在先来说下FullHandlerChainDirector的区别,他们两个都是对director的引用,区别是FullHandlerChain在director外面还包围了一层Chain,我们来看看这个Chain是什么:

# apiserver/pkg/server/config.go

handlerChainBuilder := func(handler http.Handler) http.Handler {
    return c.BuildHandlerChainFunc(handler, c.Config)
}

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
    if c.FlowControl != nil {
    	handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
    } else {
    	handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    }
    handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
    handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
    failedHandler := genericapifilters.Unauthorized(c.Serializer)
    failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
    handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
    handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
    handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
    handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
    if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
    	handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
    }
    handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
    handler = genericapifilters.WithCacheControl(handler)
    handler = genericfilters.WithPanicRecovery(handler)
    return handler
}

上面的BuildHandlerChainFunc默认为DefaultBuildHandlerChain(),看到该方法中传入一个Handler,然后再该Handler外面,像包洋葱一样,包了一层又一层的filter,这些filter的作用其实就是在请求到来时,在Handler真正处理之前,先要经过的一系列认证,授权,审计等等检查,如果通过了,才会由最终的Handler来处理该请求,没通过,则会报相应的错误,可见,认证授权等操作,就是在这个阶段生效的,经过一系列filter的包装,最终构建出来的Handler,就是FullHandlerChain,而director就是这个被层层包装的Handler。而Director这个成员变量,没有被filter包装,这样通过Director就可以绕过认证授权这些filter,直接由Handler进行处理。那么问题来了,难道还有请求不需要认证授权的?这个Director存在的意义是什么?的确是有请求不需要认证授权,这就涉及到APIServer的扩展机制了,后面会介绍到。

小结一下,APIServerHandler中包含4个成员变量,FullHandlerChain和Director其实是两个Handler,一个是带认证授权这些filter的,一个是不带的,都是对director的引用,而GoRestfulContainer和NonGoRestfulMux则分别是指针类型的引用,指向真正的goRestfulContainer和nonGoRestfulMux实体,同时这两个实体,又被director所引用。

从这里就大概可以看出GoRestfulContainer和NonGoRestfulMux这两个变量在这里的作用了,在上层向goRestfulContainer和nonGoRestfulMux实体中注册API对象时,就是通过调用这两个变量来对真正的实体进行操作的,如下面的示例:

apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)

因为是指针,都指向同一个实体,这样director作为Handler也就能用到注册进来的API对象了。

API对象的注册

所谓API对象的注册,其实就是向GenericAPIServer的Handler中添加各个API对象的WebService和Route,GenericAPIServer提供了InstallLegacyAPIGroup(), InstallAPIGroups(), InstallAPIGroup()这三个方法,供外部调用,向其中注册APIGroupInfo,APIGroupInfo在上面基础知识中介绍过,里面存储了这个APIGroup的version, resource以及对应的REST storage实体,上面的三个方法,最后都会调用到同一个内部函数:

# apiserver/pkg/server/genericapiserver.go

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
            continue
        }

        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }
        apiGroupVersion.OpenAPIModels = openAPIModels
        apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes

        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
        }
    }

    return nil
}

这个函数的逻辑,就是遍历APIGroupInfo中的version,按照version的维度来进行安装API对象,即构建出来一个APIGroupVersion,将该版本的resource和REST storage存储到该结构体中,然后执行安装操作: apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer);,可以看到这里传的就是上面说到的APIServerHandler中的GoRestfulContainer

# apiserver/pkg/endpoints/groupversion.go

func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}

	apiResources, ws, registrationErrors := installer.Install()
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws)
	container.Add(ws)
	return utilerrors.NewAggregate(registrationErrors)
}

在该方法中,又构造了一个APIInstaller结构体,将APIGroupVersion的指针传给它,由它去执行安装操作:

# apiserver/pkg/endpoints/installer.go

func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, ws, errors
}

在这里面New了一个WebService,然后遍历group中的REST storage,将storage中的path取出来,进行排序,然后再遍历这个path数组,针对每一个path: storage向WebService中执行注册操作,即registerResourceHandlers(),这就来到了最关键的地方,这是一个非常长的函数,这里面,对Storage进行类型转换,因为Storage实现了rest store的各种接口,所以首先将其转换成getter, creater, lister, updater等类型,分别对应该API对象在数据库层面的增删查改等操作,然后构造对应的Handler,然后再创建WebService的Route,最后将其添加到WebService中,我们以getter为例,简单看下这个过程:

# apiserver/pkg/endpoints/installer.go

(a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) {
    ......
    getter, isGetter := storage.(rest.Getter)
    handler = restfulGetResource(getter, exporter, reqScope)
    route := ws.GET(action.Path).To(handler)
    for _, route := range routes {
        ws.Route(route)
    }
    ......
}

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.GetResource(r, e, &scope)(res.ResponseWriter, req.Request)
	}
}

# apiserver/pkg/endpoints/handlers/get.go

func GetResource(r rest.Getter, e rest.Exporter, scope *RequestScope) http.HandlerFunc {
    return getResourceHandler(scope,
        func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
            return r.Get(ctx, name, &options)
        }
}

func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        namespace, name, err := scope.Namer.Name(req)
        result, err := getter(ctx, name, req, trace)
    }
}

可以看到,在Handler方法中,去调用rest.Getter的Get方法,调用rest storage去数据库中获取对应的name的资源进行返回。在这里,我们就看到了在Kubernetes APIServer Storage 框架解析介绍的REST store是如何应用的。

Handler的处理

Handler构建出来,并且向其中注册了API对象,最后我们来看下,Handler是如何处理请求的,核心的逻辑,其实在上面已经介绍过,即在director的ServeHTTP()方法中:

# apiserver/pkg/server/handler.go

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

	// check to see if our webservices want to claim this path
	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
		switch {
		case ws.RootPath() == "/apis":
			// if we are exactly /apis or /apis/, then we need special handling in loop.
			// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
			// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
			if path == "/apis" || path == "/apis/" {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}

		case strings.HasPrefix(path, ws.RootPath()):
			// ensure an exact match or a path boundary match
			if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}

	// if we didn't find a match, then we just skip gorestful altogether
	klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	a.FullHandlerChain.ServeHTTP(w, r)
}

首先是APIServerHandler的ServeHTTP()方法,调用了FullHandlerChain的ServeHTTP()方法,经过了层层的filter,最终到了director的ServeHTTP()方法,在该方法中,首先遍历goRestfulContainer中注册的WebService,看path跟哪个WebService中的路径匹配,如果匹配,则调用goRestfulContainer.Dispatch()处理该请求,如果都没有匹配上,则最终调用nonGoRestfulMux来处理该请求。

PostStartHook

在GenericAPIServer中还有一个重要的机制,就是这个PostStartHook,它是在APIServer启动之后,执行的一些Hook函数,这些Hook函数是在APIServer创建的过程中,注册进去的,在APIServer启动之后,做一些初始化或者周期性循环的任务,来看下相关的代码:

# apiserver/pkg/server/hooks.go

func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc) error {
	if len(name) == 0 {
		return fmt.Errorf("missing name")
	}
	if hook == nil {
		return fmt.Errorf("hook func may not be nil: %q", name)
	}
	if s.disabledPostStartHooks.Has(name) {
		klog.V(1).Infof("skipping %q because it was explicitly disabled", name)
		return nil
	}

	s.postStartHookLock.Lock()
	defer s.postStartHookLock.Unlock()

	if s.postStartHooksCalled {
		return fmt.Errorf("unable to add %q because PostStartHooks have already been called", name)
	}
	if postStartHook, exists := s.postStartHooks[name]; exists {
		// this is programmer error, but it can be hard to debug
		return fmt.Errorf("unable to add %q because it was already registered by: %s", name, postStartHook.originatingStack)
	}

	// done is closed when the poststarthook is finished.  This is used by the health check to be able to indicate
	// that the poststarthook is finished
	done := make(chan struct{})
	if err := s.AddBootSequenceHealthChecks(postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil {
		return err
	}
	s.postStartHooks[name] = postStartHookEntry{hook: hook, originatingStack: string(debug.Stack()), done: done}

	return nil
}

func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
	s.postStartHookLock.Lock()
	defer s.postStartHookLock.Unlock()
	s.postStartHooksCalled = true

	context := PostStartHookContext{
		LoopbackClientConfig: s.LoopbackClientConfig,
		StopCh:               stopCh,
	}

	for hookName, hookEntry := range s.postStartHooks {
		go runPostStartHook(hookName, hookEntry, context)
	}
}

通过AddPostStartHook()方法向GenericAPIServer中添加Hook,然后在APIServer启动时,调用RunPostStartHooks(),遍历postStartHooks列表,使用goroutine运行每一个hook,来看一个添加PostStartHook的例子:

# kubernetes/cmd/kube-apiserver/app/aggregator.go

err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
		go crdRegistrationController.Run(5, context.StopCh)
		go func() {
			// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
			// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
			// we only need to do this if CRDs are enabled on this server.  We can't use discovery because we are the source for discovery.
			if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
				crdRegistrationController.WaitForInitialSync()
			}
			autoRegistrationController.Run(5, context.StopCh)
		}()
		return nil
	})

上面就是一个周期循环的Hook,用来将crd对象,不断轮询,转换成aggregator中的apiservices对象。

总结

通过上面的分析,可以看到,本质上,GenericAPIServer最核心的功能,就是对net/http Handler的构造,为了理解其过程,介绍了go-restful, NonGoRestfulMux, APIGroupInfo等基础知识,Handler构建,API对象注册的过程,以及PostStartHook,还涉及到一些以前介绍过的知识,比如REST store,在这里我们看到了其是如何应用的,除了这些核心内容,还有一些将net/http Server如何Run起来的一些内容,逻辑比较简单,前面也介绍过,这里就不再介绍了。