Kubernetes使用etcd作为后端存储,etcd是一个分布式的,高可靠性的键值存储系统,跟传统的平台系统不同,Kubernetes把所有的数据都存储到了kv数据库中,而没有像OpenStack一样使用像MySQL这种关系型数据库,做这种选型的原因,我想可能一方面是由于Kubernetes中存储的数据,关系性不是很强,更多的是类似配置管理这类数据,一方面是由于etcd的特性,像效率比较高的gRPC接口、支持事务以及Kubernetes严重依赖的Watch机制等,能够通过单一数据库就满足它的需求,不用再引入其他组件实现类似功能,简化了架构的复杂性。

本篇文章主要来介绍下Kubernetes APIServer是如何跟存储打交道的,不涉及存储底层的细节,只到存储接口层,即主要介绍Kubernetes的存储框架是怎么样的,如何做的抽象,它里面的资源是如何存到etcd里面去的。在介绍具体的流程机制之前,我们先来介绍下Kubernetes里面几个相关的抽象,从顶层看下是如何做的设计。

顶层抽象

资源、类别以及对象

在API中抽象出来资源(Resource)、类别(Kind)以及对象(Object)这几个概念,其相关的结构如下:

type GroupVersionResource struct {
	Group    string
	Version  string
	Resource string
}

type GroupVersionKind struct {
	Group   string
	Version string
	Kind    string
}

type GroupResource struct {
	Group    string
	Resource string
}

type GroupKind struct {
	Group string
	Kind  string
}

type Object interface {
	GetObjectKind() schema.ObjectKind
	DeepCopyObject() Object
}

type ObjectKind interface {
	SetGroupVersionKind(kind GroupVersionKind)
	GroupVersionKind() GroupVersionKind
}

我们知道Kubernetes中的API对象都是带版本以及分组的,比如/apis/networking.k8s.io/v1beta1/ingresses/apis是前缀,networking.k8s.io就是组(Group),v1beta1就是版本(Version),ingresses就是上面提到的资源(Resource)或者类别(Kind),至于Object则是对API对象的抽象接口,具体的API对象则都实现了这些接口,在golang里,实现了这些接口的结构体,都可以用这个type xxx interface来统一表示,类似于父类的概念,所以Object可以代表所有实现了它的接口的对象,通常作为方法的参数或者返回值。可见,这三个概念其实都代表的是同一个意思,都是对像pod, service, ingress等这些API对象的抽象,但是表现形式不同,用途也不一样……:-(所以之前说的Kubernetes代码复杂就复杂在这些地方,抽象的云里雾里的:-)

这些结构体和接口定义在apimachinery这个库中,这个库可以说是Kubernetes中最高层的抽象,除了上面说的Resource, Kind, Object,还有各种类型定义、序列化、类型转换之类的抽象,都是会被其他的库引用到的一些结构体或者方法。

etcd存储接口

所谓etcd存储接口是对etcd数据库的增删查改的抽象,其定义在apiserverapiserver/pkg/storage/interfaces.go文件中:

type Interface interface {
	Versioner() Versioner
	Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
	Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
	Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
	WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
	Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
	GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
	List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
	GuaranteedUpdate(
		ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
		precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
	Count(key string) (int64, error)
}

可以看到,跟我们传统的数据库应用不同的地方在于,它的接口比较少,只有这么几个,比如Create()方法,一般我们写应用程序,要存一个东西,都会有类似CreateXXX()这样的方法,比如CreatePerson(person Person),就是保存一个Person对象到数据库中,然后会有一堆这样的CreateXXX()方法来对不同的对象进行存储,但是这里Create()方法定义的,则是一个高度抽象的方法,obj是要存进去的对象,即上面说到的Object,至于这个对象具体是什么,其实是看方法调用者构造了一个什么对象传给它,key则是其键值,实际上etcd3 store在实现这些接口时,会将obj进行编码,即序列化,然后再存到数据库中,这种方法大大减少了数据库层的代码量,也充分利用了kv数据库的特性。

此外,apiserver 这个库是将构建APIServer的一些通用代码抽出来,独立构成了一个库,以便代码复用,可以给第三方应用构建扩展使用,Kubernetes APIServer的实现大量依赖了该库。

REST存储接口

Kubernetes API是RESTful API,它的每一种REST资源,比如pod, service, ingress,在APIServer中,都有一个Store与之对应,通过实现统一的接口,来实现对REST资源的增删改查等操作,这些统一的接口定义在 apiserver/pkg/registry/rest/rest.go 文件中,列举几个:

// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
	Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}

// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
	New() runtime.Object
	Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}

// GracefulDeleter knows how to pass deletion options to allow delayed deletion of a
// RESTful object.
type GracefulDeleter interface {
	Delete(ctx context.Context, name string, deleteValidation ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error)
}

// Watcher should be implemented by all Storage objects that
// want to offer the ability to watch for changes through the watch api.
type Watcher interface {
	Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}

即针对某一个API对象的REST操作,会由对应的Store中的方法进行处理,这个Store又引用了实现了etcd存储接口的Store,从而可以对数据库进行操作。

底层实现

上一小节,主要介绍了两类存储接口,一类是针对etcd的存储接口,一类是针对REST的存储接口,下面我们来分别说一下实现这两类接口的方法和结构体。

etcd存储接口实现

针对etcd存储接口的实现,在apiserver/pkg/storage/etcd3/store.go 这个文件中,最主要的结构体为:

type store struct {
	client *clientv3.Client
	// getOpts contains additional options that should be passed
	// to all Get() calls.
	getOps        []clientv3.OpOption
	codec         runtime.Codec
	versioner     storage.Versioner
	transformer   value.Transformer
	pathPrefix    string
	watcher       *watcher
	pagingEnabled bool
	leaseManager  *leaseManager
}

可见该结构体最重要的属性为client,即直接调用到了etcd的client库,通过该client可以对etcd进行操作,在该结构体上,还实现了apiserver/pkg/storage/interfaces.go中定义的接口,比如:


func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool){
    getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
    kv := getResp.Kvs[0]
    data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
    return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64){
    data, err := runtime.Encode(s.codec, obj)
    newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))

    txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key),
	).Then(
		clientv3.OpPut(key, string(newData), opts...),
    ).Commit()

    if out != nil {
		putResp := txnResp.Responses[0].GetResponsePut()
		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
	}
}

以上代码为简化代码,忽略了一些不重要的逻辑,且只列出GETCreate两个方法,其他未列出。可以看到GET方法,通过一个string类型的key,从etcd中取出了对应的value,然后通过decode进行解码,将数据解码到out这个Object中,然后将其返回。而CREATE方法,则反过来,先将obj进行编码,然后通过etcd client将数据通过事务的方式保存到etcd中。这个编码解码的过程,就是常说的序列化的过程。

REST存储接口实现

REST存储接口的实现在 apiserver/pkg/registry/generic/retistry/store.go 这个文件中,定义了如下的结构体:

type Store struct {
    NewFunc func() runtime.Object
    NewListFunc func() runtime.Object
    KeyFunc func(ctx context.Context, name string) (string, error)
    ObjectNameFunc func(obj runtime.Object) (string, error)

    ......

    Storage DryRunnableStorage  // etcd3.store, implement the storage.Interface
    DestroyFunc func()
    StorageVersioner runtime.GroupVersioner
}

该结构体实现了上面小节中REST存储定义的各种接口:

func (e *Store) New() runtime.Object {
    return e.NewFunc()
}

func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	obj := e.NewFunc()
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, err
	}
	if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return obj, nil
}

func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
		return nil, err
	}

	if createValidation != nil {
		if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
			return nil, err
		}
	}

	name, err := e.ObjectNameFunc(obj)

	key, err := e.KeyFunc(ctx, name)

	qualifiedResource := e.qualifiedResourceFromContext(ctx)
	ttl, err := e.calculateTTL(obj, 0, false)

	out := e.NewFunc()

	if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
	......
	}
	......
	return out, nil
}

Store结构体中,有一个非常重要的成员 Storage DryRunnableStorage,它即是对etcd store的引用,其定义如下:

type DryRunnableStorage struct {
	Storage storage.Interface
	Codec   runtime.Codec
}

func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
	return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
}

它的成员Storage storage.Interface即是上面小节中提到的etcd存储接口,之所以中间又套了一层DryRunnableStorage主要是为了编写单元测试方便,针对真实写数据库的操作,可以让它DryRun,而不实际写数据库。

至此,我们知道Kubernetes中定义了两种Store,分别是针对REST资源的Store,以及针对etcd数据库的Store,包括他们各自实现的接口方法,后文我们将他们称为REST store以及etcd store,这两个store之间是引用的关系是REST store引用了etcd store。上层实例化一个REST store,则会同时实例化一个etcd store,用于对数据库的增删查改操作,即上面代码中的e.Storage.Create()e.Storage.Get()等,即是调用etcd store去读写数据库。

上层应用

这里所说的上层应用,指的是Kubernetes中是如何使用上面说到的REST storeetcd store这两个Store的。

etcd存储

本小节主要是介绍下如何构建出etcd store实体的,首先来看下最上面的代码逻辑:

# cmd/kube-apiserver/app/server.go

buildGenericConfig() {
    genericConfig = genenricapiserver.NewConfig()

    // 下面三行代码,构造了一个StorageFactoryConfig,给它的各个属性赋值
    storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
    storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
    completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)

    // 从StorageFactoryConfig New了一个DefaultStorageFactory
    storageFactory, lastErr = completedStorageFactoryConfig.New()

    // 使用storageFactory构造了一个StorageFactoryRestOptionsFactory,赋值给genericConfig的RESTOptionsGetter属性
    s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
}

buildGenericConfig()就是在Kubernetes APIServer 机制概述中介绍过的CreateServerChain阶段,构建的配置项,将和APIServer相关的很多通用的配置项都集合在这个里面,这里我们只关注和存储相关的配置项,即上面列出的那几行代码,其实这几行代码,体现了两个在Kubernetes中非常常见的设计模式,一个是Config->Complete->New模式,一个是Factory工厂模式。

所谓Config->Complete->New模式,即首先构建一个Config,即配置项,然后通过Complete()方法进一步补充完善该Config,然后从该Config创建出真正的实体,创建该实体相关的信息,都在该Config中。在上面的例子中,StorageFactoryConfig,就是Config,然后通过Complete()方法,从s.Etcd中再进一步获取相关信息,补充完善该Config,得到completedStorageFactoryConfig,然后再调用New()方法,就可以得到真正的实体,这里就是storageFactory。


# kubernetes/cmd/kube-apiserver/app/options/options.go

s.Etcd = genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil))

# k8s.io/apiserver/pkg/server/options/etcd.go

type EtcdOptions struct {
	StorageConfig  storagebackend.Config  // 在newEtcdOptions时就赋值进去的
	EncryptionProviderConfigFilepath string
	EtcdServersOverrides []string
	DefaultStorageMediaType string
	DeleteCollectionWorkers int
	EnableGarbageCollection bool
	EnableWatchCache bool
	DefaultWatchCacheSize int
	WatchCacheSizes []string
}

# kubernetes/pkg/kubeapiserver/default_storage_factory_builder.go

// NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides.
func NewStorageFactoryConfig() *StorageFactoryConfig {

	resources := []schema.GroupVersionResource{
		batch.Resource("cronjobs").WithVersion("v1beta1"),
		networking.Resource("ingresses").WithVersion("v1beta1"),
		networking.Resource("ingressclasses").WithVersion("v1beta1"),
		apisstorage.Resource("csidrivers").WithVersion("v1beta1"),
	}

	return &StorageFactoryConfig{
		Serializer:                legacyscheme.Codecs,
		DefaultResourceEncoding:   serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
		ResourceEncodingOverrides: resources,
	}
}

// StorageFactoryConfig is a configuration for creating storage factory.
type StorageFactoryConfig struct {
	StorageConfig                    storagebackend.Config
	APIResourceConfig                *serverstorage.ResourceConfig
	DefaultResourceEncoding          *serverstorage.DefaultResourceEncodingConfig
	DefaultStorageMediaType          string
	Serializer                       runtime.StorageSerializer
	ResourceEncodingOverrides        []schema.GroupVersionResource
	EtcdServersOverrides             []string
	EncryptionProviderConfigFilepath string
}

// Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig.
func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) {
	c.StorageConfig = etcdOptions.StorageConfig // 从etcdOptions获取初始的StorageConfig
	c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType
	c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides
	c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath
	return &completedStorageFactoryConfig{c}, nil
}

type completedStorageFactoryConfig struct {
	*StorageFactoryConfig
}

// New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) {
    resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
    storageFactory := serverstorage.NewDefaultStorageFactory(
    	c.StorageConfig,
    	c.DefaultStorageMediaType,
    	c.Serializer,
    	resourceEncodingConfig,
    	c.APIResourceConfig,
    	SpecialDefaultResourcePrefixes)

    ......

    return storageFactory, nil
}

通过上面的方式,创建出来DefaultStorageFactory实例,然后就到了上面说到的Factory工厂模式,顾名思义,就是从工厂中生产出类似的实体,放到上面的例子中,DefaultStorageFactory的作用,就是根据resource创建出StorageConfig,即下面的NewConfig()方法:

# k8s.io/apiserver/pkg/server/storage/storage_factory.go

type StorageFactory interface {
	NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)
	ResourcePrefix(groupResource schema.GroupResource) string
	Backends() []Backend
}

##  DefaultStorageFactory实现了StorageFactory Interface

type DefaultStorageFactory struct {
	StorageConfig   storagebackend.Config
	Overrides map[schema.GroupResource]groupResourceOverrides
	DefaultResourcePrefixes map[schema.GroupResource]string
	DefaultMediaType string
	DefaultSerializer runtime.StorageSerializer
	ResourceEncodingConfig ResourceEncodingConfig
	APIResourceConfigSource APIResourceConfigSource
	newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
}

func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
	chosenStorageResource := s.getStorageGroupResource(groupResource)

	// operate on copy
	storageConfig := s.StorageConfig
	codecConfig := StorageCodecConfig{
		StorageMediaType:  s.DefaultMediaType,
		StorageSerializer: s.DefaultSerializer,
	}

	if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
		override.Apply(&storageConfig, &codecConfig)
	}
	if override, ok := s.Overrides[chosenStorageResource]; ok {
		override.Apply(&storageConfig, &codecConfig)
	}

	codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
	codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
	codecConfig.Config = storageConfig

	storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)

	return &storageConfig, nil
}

因为Kubernetes的核心API对象,它的存储位置是可以放在不同的etcd中的,即默认有一个etcd,也可以通过--etcd-servers-overrides配置项,来给某个单独的resource指定不同的后端存储。上面的NewConfig()就是做这个事情的,获取到了某个resource对应的存储配置StorageConfig,即etcd相关的连接等信息,再加上一些序列化相关的配置,这些构成了一个API对象要保存到数据库中,所需要的所有的配置信息,其对应的结构体为:

# k8s.io/apiserver/pkg/storage/storagebackend/config.go

type Config struct {
	Type string
	Prefix string
	Transport TransportConfig
	Paging bool
	Codec runtime.Codec
	EncodeVersioner runtime.GroupVersioner
	CompactionInterval time.Duration
	CountMetricPollPeriod time.Duration
}

最终,s.Etcd.ApplyWithStorageFactoryTo()则是将上面构建出来的storageFactory构建出另外一个结构体StorageFactoryRestOptionsFactory,然后将其赋值给genericConfigRESTOptionsGetter属性:

# apiserver/pkg/server/options/etcd.go

(s *EtcdOptions)ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config){
    c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
}

type StorageFactoryRestOptionsFactory struct {
    Options        EtcdOptions
    StorageFactory serverstorage.StorageFactory
}

(f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error){
    // 拿到该resource对应的storageconfig,每个resource可以有不同的storage配置,主要设置上storageConfig的codec和encodeversioner
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    ret := generic.RESTOptions{
    	StorageConfig: storageConfig,
    	Decorator:     generic.UndecoratedStorage, // decorator->factory->store 最终拿到了Storage.Interface
    	ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
    }
    return ret
}

StorageFactoryRestOptionsFactory实现了一个非常重要的方法:GetRESTOptions(),在这个里面,首先通过上面介绍到的StorageFactoryNewConfig()方法来创建出针对某一个特定resource的存储配置项,然后构建了一个RESTOptions的结构体,里面包含了另外一个重要的成员:Decorator,其定义如下:

# apiserver/pkg/registry/generic/storage_decorator.go

func UndecoratedStorage(
	config *storagebackend.Config,
	resourcePrefix string,
	keyFunc func(obj runtime.Object) (string, error),
	newFunc func() runtime.Object,
	newListFunc func() runtime.Object,
	getAttrsFunc storage.AttrFunc,
	trigger storage.IndexerFuncs,
	indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
	return NewRawStorage(config)
}

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
	return factory.Create(*config)
}

这里就又用到工厂模式,使用StorageConfig中的信息,来创建最终的Store,其流程如下:

# apiserver/pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
	switch c.Type {
	case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
		return newETCD3Storage(c)
	default:
		return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
	}
}

# apiserver/pkg/storage/storagebackend/factory/etcd3.go

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
        client, err := newETCD3Client(c.Transport)
	return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) {
    cfg := clientv3.Config{
		DialTimeout:          dialTimeout,
		DialKeepAliveTime:    keepaliveTime,
		DialKeepAliveTimeout: keepaliveTimeout,
		DialOptions:          dialOptions,
		Endpoints:            c.ServerList,
		TLS:                  tlsConfig,
	}

	return clientv3.New(cfg)
}

# apiserver/pkg/storage/etcd3/store.go

// Implement storage.Interface in apiserver/pkg/storage/interfaces.go
type store struct {
	client *clientv3.Client
	getOps        []clientv3.OpOption
	codec         runtime.Codec
	versioner     storage.Versioner
	transformer   value.Transformer
	pathPrefix    string
	watcher       *watcher
	pagingEnabled bool
	leaseManager  *leaseManager
}

func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
	return newStore(c, pagingEnabled, codec, prefix, transformer)
}

func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
	versioner := APIObjectVersioner{}
	result := &store{
		client:        c,
		codec:         codec,
		versioner:     versioner,
		transformer:   transformer,
		pagingEnabled: pagingEnabled,
		pathPrefix:   path.Join("/", prefix),
		watcher:      newWatcher(c, codec, versioner, transformer),
		leaseManager: newDefaultLeaseManager(c),
	}
	return result
}

经过了山路十八弯,终于看到创建出来了etcd store,即上面newETCD3Storage()方法内的逻辑,先创建了一个etcd client,然后将该client传给newStore(),构建出etcd store结构体,可以通过clientetcd打交道,同时序列化等信息存在codec等变量里。

综上,可以看到,虽然实现非常复杂,但是使用起来还是很简单的,因为实际上构建出来的StorageFactory是放到了GenericConfig中的RESTOptionsGetter,而GenericConfig是在CreateServerChain阶段就提前构建好的,因此,只需要向下面这样使用,就可以得到一个etcd store

opts := genericConfig.RESTOptionsGetter.GetRESTOptions(resource)
store := opts.Decorator(
	opts.StorageConfig,
	...
)

REST存储

在APIServer中,每一个API对象,都有一个REST Store与之对应,Kubernetes内置的API对象的REST store的相关逻辑,都位于kubernetes/pkg/registry/目录下,我们以pod为例,来说明下REST Store是如何构建出来的:

# pkg/registry/core/pod/storage/storage.go

type REST struct {
	*genericregistry.Store
	proxyTransport http.RoundTripper
}

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter...) {
    store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &api.Pod{} },
		NewListFunc:            func() runtime.Object { return &api.PodList{} },
		DefaultQualifiedResource: api.Resource("pods"),
     }

    options := &generic.StoreOptions{
		RESTOptions: optsGetter
    }

    store.CompleteWithOptions(options)

    return PodStorage{
		Pod:                 &REST{store, proxyTransport},
    }
}

上面的 genericregistry.Store 就是REST store,通过NewStorage()方法创建出该结构体,注意该方法接受了一个参数optsGetter generic.RESTOptionsGetter,这个就是在上面etcd 存储应用中,介绍到的存储到genericConfigRESTOptionsGetter,我们来看看这里是怎么使用这个genericConfig.RESTOptionsGetter的:

// 每一种resource都是由这个Store组成的,它实现了rest的各种接口,而它里面又包含了一个Storage属性,是对etcd的一个封装,实现了在数据库层面的各种增删查改的接口,即实现了storage.Interface
// Implement rest Interfaces in apiserver/pkg/registry/rest/rest.go
// like Getter, Lister, Creater, Updater, Patcher, Watcher etc.
type Store struct {
	NewFunc func() runtime.Object
	NewListFunc func() runtime.Object
        KeyFunc func(ctx context.Context, name string) (string, error)
        ObjectNameFunc func(obj runtime.Object) (string, error)

        Storage DryRunnableStorage  // etcd3.store, implement the storage.Interface
        DestroyFunc func()
        StorageVersioner runtime.GroupVersioner
}

(e *Store) CompleteWithOptions(options *generic.StoreOptions){
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    e.Storage.Codec = opts.StorageConfig.Codec
    e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
			opts.StorageConfig,
			prefix,
			keyFunc,
			e.NewFunc,
			e.NewListFunc,
			attrFunc,
			options.TriggerFunc,
			options.Indexers,
		)
    e.StorageVersioner = opts.StorageConfig.EncodeVersioner
}

即在store.CompleteWithOptions(options)方法中,调用了GetRESTOptions()方法获取到存储配置信息,然后再调用Decorator()方法创建出etcd store实体,存储到DryRunnableStorage中。

总结

本文主要梳理了API对象存储相关的两个层面的存储接口以及其实现和应用,即REST storeetcd store,在脑海中建立起来的整体画像应该是,每一个API对象,都有对应的REST storeetcd store,这两者之间是引用的关系,REST store引用etcd store来操作数据库etcd,REST store是面向RESTful api侧, etcd store则面向数据库侧。