Kubernetes使用etcd作为后端存储,etcd是一个分布式的,高可靠性的键值存储系统,跟传统的平台系统不同,Kubernetes把所有的数据都存储到了kv数据库中,而没有像OpenStack一样使用像MySQL这种关系型数据库,做这种选型的原因,我想可能一方面是由于Kubernetes中存储的数据,关系性不是很强,更多的是类似配置管理这类数据,一方面是由于etcd的特性,像效率比较高的gRPC接口、支持事务以及Kubernetes严重依赖的Watch机制等,能够通过单一数据库就满足它的需求,不用再引入其他组件实现类似功能,简化了架构的复杂性。
本篇文章主要来介绍下Kubernetes APIServer是如何跟存储打交道的,不涉及存储底层的细节,只到存储接口层,即主要介绍Kubernetes的存储框架是怎么样的,如何做的抽象,它里面的资源是如何存到etcd里面去的。在介绍具体的流程机制之前,我们先来介绍下Kubernetes里面几个相关的抽象,从顶层看下是如何做的设计。
顶层抽象
资源、类别以及对象
在API中抽象出来资源(Resource)、类别(Kind)以及对象(Object)这几个概念,其相关的结构如下:
type GroupVersionResource struct {
Group string
Version string
Resource string
}
type GroupVersionKind struct {
Group string
Version string
Kind string
}
type GroupResource struct {
Group string
Resource string
}
type GroupKind struct {
Group string
Kind string
}
type Object interface {
GetObjectKind() schema.ObjectKind
DeepCopyObject() Object
}
type ObjectKind interface {
SetGroupVersionKind(kind GroupVersionKind)
GroupVersionKind() GroupVersionKind
}
我们知道Kubernetes中的API对象都是带版本以及分组的,比如/apis/networking.k8s.io/v1beta1/ingresses
,/apis
是前缀,networking.k8s.io
就是组(Group),v1beta1
就是版本(Version),ingresses
就是上面提到的资源(Resource)或者类别(Kind),至于Object
则是对API对象的抽象接口,具体的API对象则都实现了这些接口,在golang里,实现了这些接口的结构体,都可以用这个type xxx interface
来统一表示,类似于父类的概念,所以Object
可以代表所有实现了它的接口的对象,通常作为方法的参数或者返回值。可见,这三个概念其实都代表的是同一个意思,都是对像pod
, service
, ingress
等这些API对象的抽象,但是表现形式不同,用途也不一样……:-(所以之前说的Kubernetes代码复杂就复杂在这些地方,抽象的云里雾里的:-)
这些结构体和接口定义在apimachinery这个库中,这个库可以说是Kubernetes中最高层的抽象,除了上面说的Resource, Kind, Object,还有各种类型定义、序列化、类型转换之类的抽象,都是会被其他的库引用到的一些结构体或者方法。
etcd存储接口
所谓etcd存储接口是对etcd数据库的增删查改的抽象,其定义在apiserver的apiserver/pkg/storage/interfaces.go
文件中:
type Interface interface {
Versioner() Versioner
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
Count(key string) (int64, error)
}
可以看到,跟我们传统的数据库应用不同的地方在于,它的接口比较少,只有这么几个,比如Create()
方法,一般我们写应用程序,要存一个东西,都会有类似CreateXXX()
这样的方法,比如CreatePerson(person Person)
,就是保存一个Person
对象到数据库中,然后会有一堆这样的CreateXXX()
方法来对不同的对象进行存储,但是这里Create()
方法定义的,则是一个高度抽象的方法,obj
是要存进去的对象,即上面说到的Object
,至于这个对象具体是什么,其实是看方法调用者构造了一个什么对象传给它,key
则是其键值,实际上etcd3 store
在实现这些接口时,会将obj
进行编码,即序列化,然后再存到数据库中,这种方法大大减少了数据库层的代码量,也充分利用了kv数据库的特性。
此外,apiserver 这个库是将构建APIServer的一些通用代码抽出来,独立构成了一个库,以便代码复用,可以给第三方应用构建扩展使用,Kubernetes APIServer的实现大量依赖了该库。
REST存储接口
Kubernetes API是RESTful API,它的每一种REST资源,比如pod
, service
, ingress
,在APIServer中,都有一个Store与之对应,通过实现统一的接口,来实现对REST资源的增删改查等操作,这些统一的接口定义在 apiserver/pkg/registry/rest/rest.go
文件中,列举几个:
// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
// Creater is an object that can create an instance of a RESTful object.
type Creater interface {
New() runtime.Object
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}
// GracefulDeleter knows how to pass deletion options to allow delayed deletion of a
// RESTful object.
type GracefulDeleter interface {
Delete(ctx context.Context, name string, deleteValidation ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error)
}
// Watcher should be implemented by all Storage objects that
// want to offer the ability to watch for changes through the watch api.
type Watcher interface {
Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error)
}
即针对某一个API对象的REST操作,会由对应的Store中的方法进行处理,这个Store又引用了实现了etcd存储接口的Store,从而可以对数据库进行操作。
底层实现
上一小节,主要介绍了两类存储接口,一类是针对etcd的存储接口,一类是针对REST的存储接口,下面我们来分别说一下实现这两类接口的方法和结构体。
etcd存储接口实现
针对etcd存储接口的实现,在apiserver/pkg/storage/etcd3/store.go
这个文件中,最主要的结构体为:
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
可见该结构体最重要的属性为client
,即直接调用到了etcd的client库,通过该client
可以对etcd进行操作,在该结构体上,还实现了apiserver/pkg/storage/interfaces.go
中定义的接口,比如:
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool){
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64){
data, err := runtime.Encode(s.codec, obj)
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
以上代码为简化代码,忽略了一些不重要的逻辑,且只列出GET
和Create
两个方法,其他未列出。可以看到GET
方法,通过一个string
类型的key
,从etcd中取出了对应的value
,然后通过decode进行解码,将数据解码到out
这个Object
中,然后将其返回。而CREATE
方法,则反过来,先将obj
进行编码,然后通过etcd client
将数据通过事务的方式保存到etcd中。这个编码解码的过程,就是常说的序列化的过程。
REST存储接口实现
REST存储接口的实现在 apiserver/pkg/registry/generic/retistry/store.go
这个文件中,定义了如下的结构体:
type Store struct {
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
KeyFunc func(ctx context.Context, name string) (string, error)
ObjectNameFunc func(obj runtime.Object) (string, error)
......
Storage DryRunnableStorage // etcd3.store, implement the storage.Interface
DestroyFunc func()
StorageVersioner runtime.GroupVersioner
}
该结构体实现了上面小节中REST存储定义的各种接口:
func (e *Store) New() runtime.Object {
return e.NewFunc()
}
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}
name, err := e.ObjectNameFunc(obj)
key, err := e.KeyFunc(ctx, name)
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
out := e.NewFunc()
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
......
}
......
return out, nil
}
在Store
结构体中,有一个非常重要的成员 Storage DryRunnableStorage
,它即是对etcd store的引用,其定义如下:
type DryRunnableStorage struct {
Storage storage.Interface
Codec runtime.Codec
}
func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
}
它的成员Storage storage.Interface
即是上面小节中提到的etcd存储接口
,之所以中间又套了一层DryRunnableStorage
主要是为了编写单元测试方便,针对真实写数据库的操作,可以让它DryRun,而不实际写数据库。
至此,我们知道Kubernetes中定义了两种Store,分别是针对REST资源的Store,以及针对etcd数据库的Store,包括他们各自实现的接口方法,后文我们将他们称为REST store
以及etcd store
,这两个store之间是引用的关系是REST store
引用了etcd store
。上层实例化一个REST store
,则会同时实例化一个etcd store
,用于对数据库的增删查改操作,即上面代码中的e.Storage.Create()
,e.Storage.Get()
等,即是调用etcd store
去读写数据库。
上层应用
这里所说的上层应用,指的是Kubernetes中是如何使用上面说到的REST store
和etcd store
这两个Store的。
etcd存储
本小节主要是介绍下如何构建出etcd store
实体的,首先来看下最上面的代码逻辑:
# cmd/kube-apiserver/app/server.go
buildGenericConfig() {
genericConfig = genenricapiserver.NewConfig()
// 下面三行代码,构造了一个StorageFactoryConfig,给它的各个属性赋值
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
// 从StorageFactoryConfig New了一个DefaultStorageFactory
storageFactory, lastErr = completedStorageFactoryConfig.New()
// 使用storageFactory构造了一个StorageFactoryRestOptionsFactory,赋值给genericConfig的RESTOptionsGetter属性
s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
}
buildGenericConfig()
就是在Kubernetes APIServer 机制概述中介绍过的CreateServerChain
阶段,构建的配置项,将和APIServer相关的很多通用的配置项都集合在这个里面,这里我们只关注和存储相关的配置项,即上面列出的那几行代码,其实这几行代码,体现了两个在Kubernetes中非常常见的设计模式,一个是Config->Complete->New
模式,一个是Factory
工厂模式。
所谓Config->Complete->New
模式,即首先构建一个Config,即配置项,然后通过Complete()方法进一步补充完善该Config,然后从该Config创建出真正的实体,创建该实体相关的信息,都在该Config中。在上面的例子中,StorageFactoryConfig,就是Config,然后通过Complete()方法,从s.Etcd中再进一步获取相关信息,补充完善该Config,得到completedStorageFactoryConfig,然后再调用New()方法,就可以得到真正的实体,这里就是storageFactory。
# kubernetes/cmd/kube-apiserver/app/options/options.go
s.Etcd = genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil))
# k8s.io/apiserver/pkg/server/options/etcd.go
type EtcdOptions struct {
StorageConfig storagebackend.Config // 在newEtcdOptions时就赋值进去的
EncryptionProviderConfigFilepath string
EtcdServersOverrides []string
DefaultStorageMediaType string
DeleteCollectionWorkers int
EnableGarbageCollection bool
EnableWatchCache bool
DefaultWatchCacheSize int
WatchCacheSizes []string
}
# kubernetes/pkg/kubeapiserver/default_storage_factory_builder.go
// NewStorageFactoryConfig returns a new StorageFactoryConfig set up with necessary resource overrides.
func NewStorageFactoryConfig() *StorageFactoryConfig {
resources := []schema.GroupVersionResource{
batch.Resource("cronjobs").WithVersion("v1beta1"),
networking.Resource("ingresses").WithVersion("v1beta1"),
networking.Resource("ingressclasses").WithVersion("v1beta1"),
apisstorage.Resource("csidrivers").WithVersion("v1beta1"),
}
return &StorageFactoryConfig{
Serializer: legacyscheme.Codecs,
DefaultResourceEncoding: serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme),
ResourceEncodingOverrides: resources,
}
}
// StorageFactoryConfig is a configuration for creating storage factory.
type StorageFactoryConfig struct {
StorageConfig storagebackend.Config
APIResourceConfig *serverstorage.ResourceConfig
DefaultResourceEncoding *serverstorage.DefaultResourceEncodingConfig
DefaultStorageMediaType string
Serializer runtime.StorageSerializer
ResourceEncodingOverrides []schema.GroupVersionResource
EtcdServersOverrides []string
EncryptionProviderConfigFilepath string
}
// Complete completes the StorageFactoryConfig with provided etcdOptions returning completedStorageFactoryConfig.
func (c *StorageFactoryConfig) Complete(etcdOptions *serveroptions.EtcdOptions) (*completedStorageFactoryConfig, error) {
c.StorageConfig = etcdOptions.StorageConfig // 从etcdOptions获取初始的StorageConfig
c.DefaultStorageMediaType = etcdOptions.DefaultStorageMediaType
c.EtcdServersOverrides = etcdOptions.EtcdServersOverrides
c.EncryptionProviderConfigFilepath = etcdOptions.EncryptionProviderConfigFilepath
return &completedStorageFactoryConfig{c}, nil
}
type completedStorageFactoryConfig struct {
*StorageFactoryConfig
}
// New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) {
resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
storageFactory := serverstorage.NewDefaultStorageFactory(
c.StorageConfig,
c.DefaultStorageMediaType,
c.Serializer,
resourceEncodingConfig,
c.APIResourceConfig,
SpecialDefaultResourcePrefixes)
......
return storageFactory, nil
}
通过上面的方式,创建出来DefaultStorageFactory
实例,然后就到了上面说到的Factory
工厂模式,顾名思义,就是从工厂中生产出类似的实体,放到上面的例子中,DefaultStorageFactory
的作用,就是根据resource
创建出StorageConfig
,即下面的NewConfig()
方法:
# k8s.io/apiserver/pkg/server/storage/storage_factory.go
type StorageFactory interface {
NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)
ResourcePrefix(groupResource schema.GroupResource) string
Backends() []Backend
}
## DefaultStorageFactory实现了StorageFactory Interface
type DefaultStorageFactory struct {
StorageConfig storagebackend.Config
Overrides map[schema.GroupResource]groupResourceOverrides
DefaultResourcePrefixes map[schema.GroupResource]string
DefaultMediaType string
DefaultSerializer runtime.StorageSerializer
ResourceEncodingConfig ResourceEncodingConfig
APIResourceConfigSource APIResourceConfigSource
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
}
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)
// operate on copy
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType,
StorageSerializer: s.DefaultSerializer,
}
if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
codecConfig.Config = storageConfig
storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
return &storageConfig, nil
}
因为Kubernetes的核心API对象,它的存储位置是可以放在不同的etcd中的,即默认有一个etcd,也可以通过--etcd-servers-overrides
配置项,来给某个单独的resource
指定不同的后端存储。上面的NewConfig()
就是做这个事情的,获取到了某个resource
对应的存储配置StorageConfig
,即etcd相关的连接等信息,再加上一些序列化相关的配置,这些构成了一个API对象要保存到数据库中,所需要的所有的配置信息,其对应的结构体为:
# k8s.io/apiserver/pkg/storage/storagebackend/config.go
type Config struct {
Type string
Prefix string
Transport TransportConfig
Paging bool
Codec runtime.Codec
EncodeVersioner runtime.GroupVersioner
CompactionInterval time.Duration
CountMetricPollPeriod time.Duration
}
最终,s.Etcd.ApplyWithStorageFactoryTo()
则是将上面构建出来的storageFactory
构建出另外一个结构体StorageFactoryRestOptionsFactory
,然后将其赋值给genericConfig
的RESTOptionsGetter
属性:
# apiserver/pkg/server/options/etcd.go
(s *EtcdOptions)ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config){
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
}
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}
(f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error){
// 拿到该resource对应的storageconfig,每个resource可以有不同的storage配置,主要设置上storageConfig的codec和encodeversioner
storageConfig, err := f.StorageFactory.NewConfig(resource)
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage, // decorator->factory->store 最终拿到了Storage.Interface
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
return ret
}
StorageFactoryRestOptionsFactory
实现了一个非常重要的方法:GetRESTOptions()
,在这个里面,首先通过上面介绍到的StorageFactory
的NewConfig()
方法来创建出针对某一个特定resource的存储配置项,然后构建了一个RESTOptions
的结构体,里面包含了另外一个重要的成员:Decorator
,其定义如下:
# apiserver/pkg/registry/generic/storage_decorator.go
func UndecoratedStorage(
config *storagebackend.Config,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
}
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
}
这里就又用到工厂模式,使用StorageConfig
中的信息,来创建最终的Store
,其流程如下:
# apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
# apiserver/pkg/storage/storagebackend/factory/etcd3.go
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c.Transport)
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) {
cfg := clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
DialOptions: dialOptions,
Endpoints: c.ServerList,
TLS: tlsConfig,
}
return clientv3.New(cfg)
}
# apiserver/pkg/storage/etcd3/store.go
// Implement storage.Interface in apiserver/pkg/storage/interfaces.go
type store struct {
client *clientv3.Client
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
}
经过了山路十八弯,终于看到创建出来了etcd store
,即上面newETCD3Storage()
方法内的逻辑,先创建了一个etcd client
,然后将该client传给newStore()
,构建出etcd store
结构体,可以通过client
跟etcd
打交道,同时序列化等信息存在codec
等变量里。
综上,可以看到,虽然实现非常复杂,但是使用起来还是很简单的,因为实际上构建出来的StorageFactory
是放到了GenericConfig
中的RESTOptionsGetter
,而GenericConfig
是在CreateServerChain
阶段就提前构建好的,因此,只需要向下面这样使用,就可以得到一个etcd store
:
opts := genericConfig.RESTOptionsGetter.GetRESTOptions(resource)
store := opts.Decorator(
opts.StorageConfig,
...
)
REST存储
在APIServer中,每一个API对象,都有一个REST Store
与之对应,Kubernetes内置的API对象的REST store
的相关逻辑,都位于kubernetes/pkg/registry/
目录下,我们以pod
为例,来说明下REST Store
是如何构建出来的:
# pkg/registry/core/pod/storage/storage.go
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter...) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
DefaultQualifiedResource: api.Resource("pods"),
}
options := &generic.StoreOptions{
RESTOptions: optsGetter
}
store.CompleteWithOptions(options)
return PodStorage{
Pod: &REST{store, proxyTransport},
}
}
上面的 genericregistry.Store
就是REST store
,通过NewStorage()
方法创建出该结构体,注意该方法接受了一个参数optsGetter generic.RESTOptionsGetter
,这个就是在上面etcd 存储
应用中,介绍到的存储到genericConfig
的RESTOptionsGetter
,我们来看看这里是怎么使用这个genericConfig.RESTOptionsGetter
的:
// 每一种resource都是由这个Store组成的,它实现了rest的各种接口,而它里面又包含了一个Storage属性,是对etcd的一个封装,实现了在数据库层面的各种增删查改的接口,即实现了storage.Interface
// Implement rest Interfaces in apiserver/pkg/registry/rest/rest.go
// like Getter, Lister, Creater, Updater, Patcher, Watcher etc.
type Store struct {
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
KeyFunc func(ctx context.Context, name string) (string, error)
ObjectNameFunc func(obj runtime.Object) (string, error)
Storage DryRunnableStorage // etcd3.store, implement the storage.Interface
DestroyFunc func()
StorageVersioner runtime.GroupVersioner
}
(e *Store) CompleteWithOptions(options *generic.StoreOptions){
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
e.Storage.Codec = opts.StorageConfig.Codec
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
}
即在store.CompleteWithOptions(options)
方法中,调用了GetRESTOptions()
方法获取到存储配置信息,然后再调用Decorator()
方法创建出etcd store
实体,存储到DryRunnableStorage
中。
总结
本文主要梳理了API对象存储相关的两个层面的存储接口以及其实现和应用,即REST store
和etcd store
,在脑海中建立起来的整体画像应该是,每一个API对象,都有对应的REST store
和etcd store
,这两者之间是引用的关系,REST store
引用etcd store
来操作数据库etcd,REST store
是面向RESTful api
侧, etcd store
则面向数据库侧。