​ 基于云原生的周边工具

# Kubectl

kubectl 是 Kubernetes 的命令行工具 (opens new window)(CLI),是 Kubernetes 用户和管理员必备的管理

工具。该kubectl工具控制Kubernetes集群管理器。它可以让您检查集群资源,创建、删除和更新组

件以及更多功能。kubectl 提供了大量的子命令,方便管理 Kubernetes 集群中的各种功能。

  • kubectl -h 查看子命令列表
  • kubectl options 查看全局选项
  • kubectl --help 查看子命令的帮助
  • kubectl command -o= 设置输出格式(如 json、yaml、jsonpath 等)
  • kubectl explain RESOURCE 查看资源的定义

# krew

krew 是一个用来管理 kubectl 插件的工具,类似于 apt 或 yum,支持搜索、安装和管理kubectl 插件。

安装

  set -x; cd "$(mktemp -d)" &&
  curl -fsSLO "https://github.com/kubernetes-sigs/krew/releases/download/v0.3.2/krew.{tar.gz,yaml}" &&
  tar zxvf krew.tar.gz &&
  ./krew-"$(uname | tr '[:upper:]' '[:lower:]')_amd64" install \
    --manifest=krew.yaml --archive=krew.tar.gz
1
2
3
4
5

添加环境变量

export PATH="${KREW_ROOT:-$HOME/.krew}/bin:$PATH"
source ~/.bashrc
1
2

使用

kubectl krew update: 插件索引更新

kubectl krew search: 插件搜索

kubectl krew install get-all: 安装插件

kubectl krew list: 插件已装插件

kubectl krew info ns: 查看插件详情

kubectl krew upgrade : 升级安装的插件

kubectl krew remove view-secret : 卸载插件

krew自身也作为一个“kubectl 插件”,因此,可以使用命令kubectl krew upgrade命令来升级krew

https://cloud.tencent.com/developer/article/1543178

# Kubeadm

Kubeadm 是一个提供了 kubeadm initkubeadm join 的工具, 作为创建 Kubernetes 集群的 “快捷途径” 的最佳实践。

kubeadm 通过执行必要的操作来启动和运行最小可用集群。 按照设计,它只关注启动引导,而非配置机器。同样的, 安装各种 “锦上添花” 的扩展,例如 Kubernetes Dashboard、 监控方案、以及特定云平台的扩展,都不在讨论范围内。

相反, kubeadm 之上构建更高级别以及更加合规的工具, 理想情况下,使用 kubeadm 作为所有部署工作的基准将会更加易于创建一致性集群。

常用命令

kubeadm init: 此命令初始化一个 Kubernetes 控制平面节点。

kubeadm join: 此命令用来初始化 Kubernetes 工作节点并将其加入集群。

当节点加入 kubeadm 初始化的集群时,我们需要建立双向信任。 这个过程可以分解为发现(让待加入节点信任 Kubernetes 控制平面节点)和 TLS 引导(让 Kubernetes 控制平面节点信任待加入节点)两个部分。

有两种主要的发现方案。 第一种方法是使用共享令牌和 API 服务器的 IP 地址。 第二种是以文件形式提供标准 kubeconfig 文件的一个子集。 发现/kubeconfig 文件支持令牌、client-go 鉴权插件(“exec”)、“tokenFile" 和 "authProvider"。该文件可以是本地文件,也可以通过 HTTPS URL 下载。 格式是 kubeadm join --discovery-token abcdef.1234567890abcdef 1.2.3.4:6443kubeadm join --discovery-file path/to/file.conf 或者 kubeadm join --discovery-file https://url/file.conf。 只能使用其中一种。 如果发现信息是从 URL 加载的,必须使用 HTTPS。 此外,在这种情况下,主机安装的 CA 包用于验证连接。

如果使用共享令牌进行发现,还应该传递 --discovery-token-ca-cert-hash 参数来验证 Kubernetes 控制平面节点提供的根证书颁发机构(CA)的公钥。 此参数的值指定为 ":", 其中支持的哈希类型为 "sha256"。哈希是通过 Subject Public Key Info(SPKI)对象的字节计算的(如 RFC7469)。 这个值可以从 "kubeadm init" 的输出中获得,或者可以使用标准工具进行计算。 可以多次重复 --discovery-token-ca-cert-hash 参数以允许多个公钥。

如果无法提前知道 CA 公钥哈希,则可以通过 --discovery-token-unsafe-skip-ca-verification 参数禁用此验证。 这削弱了 kubeadm 安全模型,因为其他节点可能会模仿 Kubernetes 控制平面节点。

kubeadm config:在 kubeadm init 执行期间,kubeadm 将 ClusterConfiguration 对象上传 到你的集群的 kube-system 名字空间下名为 kubeadm-config 的 ConfigMap 对象中。 然后在 kubeadm joinkubeadm resetkubeadm upgrade 执行期间读取此配置。

kubeadm config print 命令打印默认静态配置, kubeadm 运行 kubeadm init and kubeadm join 时将使用此配置

kubeadm config migrate 来转换旧配置文件

kubeadm config images list : 列出 kubeadm 所需的镜像。

kubeadm config images pull : 拉取 kubeadm 所需的镜像。

kubeadm token: kubeadm init创建了一个有效期为24小时的令牌,token命令用于令牌相关的操作

kubeadm token create: 创建一个引导令牌。 你可以设置此令牌的用途,"有效时间" 和可选的人性化的描述。

kubeadm token delete: 删除指定的引导令牌列表

kubeadm token generate: 此命令将打印一个随机生成的可以被 "init" 和 "join" 命令使用的引导令牌。

kubeadm token list: 列出服务器上的引导令牌

kubeadm certs: 处理Kubernetes证书的相关命令

kubeadm certs renew: 可以使用 all 子命令来续订所有 Kubernetes 证书,也可以选择性地续订部分证书。

kubeadm certs certificate-key: 此命令可用来生成一个新的控制面证书密钥。密钥可以作为 --certificate-key 标志的取值传递给 kubeadm init (opens new window)kubeadm join (opens new window) 命令,从而在添加新的控制面节点时能够自动完成证书复制

kubeadm certs check-expiration: 此命令检查 kubeadm 所管理的本地 PKI 中的证书是否以及何时过期。

kubeadm certs generate-csr: 此命令可用来为所有控制面证书和 kubeconfig 文件生成密钥和 CSR(签名请求)。 用户可以根据自身需要选择 CA 为 CSR 签名。

kubeadm upgrade: 一个对用户友好的命令,它将复杂的升级逻辑包装在一个命令后面,支持升级的规划和实际执行。

kubeadm upgrade plan: 检查可升级到哪些版本,并验证你当前的集群是否可升级。

kubeadm upgrade apply: 将Kubernetes集群升级到指定版本

kubeadm upgrade diff: 显示哪些差异将被应用于现有的静态 pod 资源清单

kubeadm upgrade node: 升级集群中某个节点的命令

kubeadm reset: 还原kubeadm init或者kubeadm join命令对主机所做的任何更改

kubeadm init phase: kubeadm init phase 能确保调用引导过程的原子步骤。 因此,如果希望自定义应用,则可以让 kubeadm 做一些工作,然后填补空白。

kubeadm join phase: kubeadm join phase 使你能够调用 join 过程的基本原子步骤。 因此,如果希望执行自定义操作,可以让 kubeadm 做一些工作,然后由用户来补足剩余操作。

kubeadm upgrade phase: 使用此阶段,可以选择执行辅助控制平面或工作节点升级的单独步骤。请注意,kubeadm upgrade apply 命令仍然必须在主控制平面节点上调用。

kubeadm reset phase: kubeadm reset phase 使你能够调用 reset 过程的基本原子步骤。 因此,如果希望执行自定义操作,可以让 kubeadm 做一些工作,然后由用户来补足剩余操作。

kubeadm alpha:尝试试验性功能

# Kubebuilder

Kubebuilder (opens new window) 是一个使用 CRDs 构建 K8s API 的 SDK,主要是:

  • 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置;
  • 提供代码库封装底层的 K8s go-client;

方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。

GVK = GroupVersionKind,GVR = GroupVersionResource。

API Group 是相关 API 功能的集合,每个 Group 拥有一或多个 Versions,用于接口的演进。

每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type (opens new window)),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。

每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写:

  • apiVersion:这个就是 GV 。
  • kind:这个就是 K。

根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。

每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: "tutotial.kubebuilder.io/api/v1".CronJob{} 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON:

{

"kind": "CronJob",

"apiVersion": "batch.tutorial.kubebuilder.io/v1",

...}
1
2
3
4
5
6
7

就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。

Kubebuilder 的核心组件,具有 3 个职责:

  • 负责运行所有的 Controllers;
  • 初始化共享 caches,包含 listAndWatch 功能;
  • 初始化 clients 用于与 Api Server 通信。

Cache

Kubebuilder 的核心组件,负责在 Controller 进程里面根据 Scheme 同步 Api Server 中所有该 Controller 关心 GVKs 的 GVRs,其核心是 GVK -> Informer 的映射,Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。

Controller

Kubebuidler 为我们生成的脚手架文件,我们只需要实现 Reconcile 方法即可。

Clients

在实现 Controller 的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该 Clients 实现的,其中查询功能实际查询是本地的 Cache,写操作直接访问 Api Server。

Index

由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。

Finalizer

在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。

所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。

OwerReference

K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。

# 命令

kubebuilder init --domain edas.io
1

这一步创建了一个 Go module 工程,引入了必要的依赖,创建了一些模板文件。

kubebuilder create api --group apps --version v1alpha1 --kind Application
1

这一步创建了对应的 CRD 和 Controller 模板文件

# 源码

Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)
func init() {
	appsv1alpha1.AddToScheme(scheme)
	// +kubebuilder:scaffold:scheme
}
func main() {
	...
        // 1、init Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme, MetricsBindAddress: metricsAddr})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}
        // 2、init Reconciler(Controller)
	err = (&controllers.ApplicationReconciler{
		Client: mgr.GetClient(),
		Log:    ctrl.Log.WithName("controllers").WithName("Application"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr)
	if err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "EDASApplication")
		os.Exit(1)
	}
	// +kubebuilder:scaffold:builder
	setupLog.Info("starting manager")
        // 3、start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

可以看到在 init 方法里面我们将 appsv1alpha1 注册到 Scheme 里面去了,这样一来 Cache 就知道 watch 谁了,main 方法里面的逻辑基本都是 Manager 的:

  1. 初始化了一个 Manager;
  2. 将 Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
  3. 启动 Manager。

Manager初始化

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
	...
	// Create the cache for the cached read client and registering informers
	cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
	if err != nil {
		return nil, err
	}
	apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
	if err != nil {
		return nil, err
	}
	writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper})
	if err != nil {
		return nil, err
	}
	...
	return &controllerManager{
		config:           config,
		scheme:           options.Scheme,
		errChan:          make(chan error),
		cache:            cache,
		fieldIndexes:     cache,
		client:           writeObj,
		apiReader:        apiReader,
		recorderProvider: recorderProvider,
		resourceLock:     resourceLock,
		mapper:           mapper,
		metricsListener:  metricsListener,
		internalStop:     stop,
		internalStopper:  stop,
		port:             options.Port,
		host:             options.Host,
		leaseDuration:    *options.LeaseDuration,
		renewDeadline:    *options.RenewDeadline,
		retryPeriod:      *options.RetryPeriod,
	}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

创建Cache

Cache初始化代码

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
}
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(...) *specificInformersMap {
	ip := &specificInformersMap{
		Scheme:            scheme,
		mapper:            mapper,
		informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
		codecs:            serializer.NewCodecFactory(scheme),
		resync:            resync,
		createListWatcher: createListWatcher,
		namespace:         namespace,
	}
	return ip
}
// MapEntry contains the cached data for an Informer
type MapEntry struct {
	// Informer is the cached informer
	Informer cache.SharedIndexInformer
	// CacheReader wraps Informer and implements the CacheReader interface for a single type
	Reader CacheReader
}
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
        ...
	// Create a new ListWatch for the obj
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
			}
			return dynamicClient.Resource(mapping.Resource).List(opts)
		},
		// Setup the watch function
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			// Watch needs to be set to true separately
			opts.Watch = true
			if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
			}
			return dynamicClient.Resource(mapping.Resource).Watch(opts)
		},
	}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

Cache 主要就是创建了 InformersMap,Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个 Informer 会根据 ListWatch 函数对对应的 GVK 进行 List 和 Watch

创建Clients

创建Clients 很简单

// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
	// Create the Client for Write operations.
	c, err := client.New(config, options)
	if err != nil {
		return nil, err
	}
	return &client.DelegatingClient{
		Reader: &client.DelegatingReader{
			CacheReader:  cache,
			ClientReader: c,
		},
		Writer:       c,
		StatusClient: c,
	}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。

Controller初始化

func (r *EDASApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
		err := ctrl.NewControllerManagedBy(mgr).
		For(&appsv1alpha1.EDASApplication{}).
		Complete(r)
		return err
}
1
2
3
4
5
6

使用的是 Builder 模式,NewControllerManagerBy 和 For 方法都是给 Builder 传参,最重要的是最后一个方法 Complete,其逻辑是

func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
	// Set the Manager
	if err := blder.doManager(); err != nil {
		return nil, err
	}
	// Set the ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}
	// Set the Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}
...
	return blder.mgr, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

主要是看看 doController 和 doWatch 方法

doController方法

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}
	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}
	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}
	// Inject dependencies into Reconciler
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}
	// Create controller with dependencies set
	c := &controller.Controller{
		Do:                      options.Reconciler,
		Cache:                   mgr.GetCache(),
		Config:                  mgr.GetConfig(),
		Scheme:                  mgr.GetScheme(),
		Client:                  mgr.GetClient(),
		Recorder:                mgr.GetEventRecorderFor(name),
		Queue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		Name:                    name,
	}
	// Add the controller as a Manager components
	return c, mgr.Add(c)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

该方法初始化了一个 Controller,传入了一些很重要的参数:

  • Do:Reconcile 逻辑;
  • Cache:找 Informer 注册 Watch;
  • Client:对 K8s 资源进行 CRUD;
  • Queue:Watch 资源的 CUD 事件缓存;
  • Recorder:事件收集。

doWatch方法

func (blder *Builder) doWatch() error {
	// Reconcile type
	src := &source.Kind{Type: blder.apiType}
	hdler := &handler.EnqueueRequestForObject{}
	err := blder.ctrl.Watch(src, hdler, blder.predicates...)
	if err != nil {
		return err
	}
	// Watches the managed types
	for _, obj := range blder.managedObjects {
		src := &source.Kind{Type: obj}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.apiType,
			IsController: true,
		}
		if err := blder.ctrl.Watch(src, hdler, blder.predicates...); err != nil {
			return err
		}
	}
	// Do the watch requests
	for _, w := range blder.watchRequest {
		if err := blder.ctrl.Watch(w.src, w.eventhandler, blder.predicates...); err != nil {
			return err
		}
	}
	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

可以看到该方法对本 Controller 负责的 CRD 进行了 watch,同时底下还会 watch 本 CRD 管理的其他资源,这个 managedObjects 可以通过 Controller 初始化 Buidler 的 Owns 方法传入,说到 Watch 我们关心两个逻辑:

  1. 注册的 handler

可以看到 Kubebuidler 为我们注册的 Handler 就是将发生变更的对象的 NamespacedName 入队列,如果在 Reconcile 逻辑中需要判断创建/更新/删除,需要有自己的判断逻辑。

  1. 注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	...
	log.Info("Starting EventSource", "controller", c.Name, "source", src)
	return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	...
	is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13

我们的 Handler 实际注册到 Informer 上面,这样整个逻辑就串起来了,通过 Cache 我们创建了所有 Scheme 里面 GVKs 的 Informers,然后对应 GVK 的 Controller 注册了 Watch Handler 到对应的 Informer,这样一来对应的 GVK 里面的资源有变更都会触发 Handler,将变更事件写到 Controller 的事件队列中,之后触发我们的 Reconcile 方法。

Manager启动

func (cm *controllerManager) Start(stop <-chan struct{}) error {
	...
	go cm.startNonLeaderElectionRunnables()
	...
}
func (cm *controllerManager) startNonLeaderElectionRunnables() {
	...
	// Start the Cache. Allow the function to start the cache to be mocked out for testing
	if cm.startCache == nil {
		cm.startCache = cm.cache.Start
	}
	go func() {
		if err := cm.startCache(cm.internalStop); err != nil {
			cm.errChan <- err
		}
	}()
        ...
        // Start Controllers
	for _, c := range cm.nonLeaderElectionRunnables {
		ctrl := c
		go func() {
			cm.errChan <- ctrl.Start(cm.internalStop)
		}()
	}
	cm.started = true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Cache启动

func (ip *specificInformersMap) Start(stop <-chan struct{}) {
	func() {
		...
		// Start each informer
		for _, informer := range ip.informersByGVK {
			go informer.Informer.Run(stop)
		}
	}()
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        ...
        // informer push resource obj CUD delta to this fifo queue
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
    // handler to process delta
		Process: s.HandleDeltas,
	}
	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
    // this is internal controller process delta generate by reflector
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()
        ...
	wg.StartWithChannel(processorStopCh, s.processor.run)
	s.controller.Run(stopCh)
}
func (c *controller) Run(stopCh <-chan struct{}) {
	...
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	...
        // reflector is delta producer
	wg.StartWithChannel(stopCh, r.Run)
        // internal controller's processLoop is comsume logic
	wait.Until(c.processLoop, time.Second, stopCh)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

Cache 的初始化核心是初始化所有的 Informer,Informer 的初始化核心是创建了 reflector 和内部 controller,reflector 负责监听 Api Server 上指定的 GVK,将变更写入 delta 队列中,可以理解为变更事件的生产者,内部 controller 是变更事件的消费者,他会负责更新本地 indexer,以及计算出 CUD 事件推给我们之前注册的 Watch Handler。

Controller启动

// Start implements controller.Controller
func (c *Controller) Start(stop <-chan struct{}) error {
	...
	for i := 0; i < c.MaxConcurrentReconciles; i++ {
		// Process work items
		go wait.Until(func() {
			for c.processNextWorkItem() {
			}
		}, c.JitterPeriod, stop)
	}
	...
}
func (c *Controller) processNextWorkItem() bool {
	...
	obj, shutdown := c.Queue.Get()
	...
	var req reconcile.Request
	var ok bool
	if req, ok = obj.(reconcile.Request); 
        ...
	// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
	// resource to be synced.
	if result, err := c.Do.Reconcile(req); err != nil {
		c.Queue.AddRateLimited(req)
		...
	} 
        ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

Controller 的初始化是启动 goroutine 不断地查询队列,如果有变更消息则触发到我们自定义的 Reconcile 逻辑

# Operator Framework

https://github.com/operator-framework/operator-sdk

# 集群联邦

集群联邦 Federation 的目的是实现单一集群统一管理多个kubernetes集群的机制。这些集群可以是跨地域的,跨云厂商的或者是用户内部自建集群。一旦集群建立联邦后,就可以使用集群 Federation API 来管理多个集群的 kubernetes API 资源。

kubernetes集群联邦主要是实现以下目标

1)简化管理多个联邦集群的Kubernetes API 资源

2)在多个集群之间分散工作负载(容器),以提升应用(服务)的可靠性

3)在不同集群中,能更快速更容易地迁移应用(服务)

4)跨集群的服务发现,服务可以就近访问,以降低延迟

5)实践多云(Multi-cloud)或混合云(Hybird Cloud)的部署

集群联邦最初是 v1 版本,因为方案设计问题,导致可扩展性比较差,已经被废弃,目前社区提出了新的方案 Federation V2

从上图架构中得知Federation v1 的设计沿用类似Kubernetes 模型,其主要组件有以下:

federation-apiserver :提供Federation API资源,只支持部分Kubernetes API resources。
federation-controller-manager :协调不同集群之间的状态,如同步Federated资源与策略,并建立Kubernetes组件至对应集群上。
etcd :储存Federation的状态。
1
2
3

该方案设计之初没有考虑到crd的特性,支持的 Federation API 资源类型都是写死的,无法有效的扩展,不能兼容新的api资源,不支持跨集群权限管理,如不支持RBAC。联邦层级的设定与策略依赖API 资源的Annotations 内容,这使得弹性不佳。

Federation V2

Federation V2 是Kubernetes SIG Multi-Cluster团队新提出的集群联邦架构( Architecture Doc与Brainstorming Doc ),新架构在Federation v1基础之上,简化扩展Federated API过程,并加强跨集群服务发现与编排的功能。

相较于V1,Federation V2移除了 federation-apiserver 组件,通过 crd 机制来完成 federated resource的扩充,kubefed-controller 组件通过监听crd的变化来完成联邦资源的同步和调度等功能。

集群联邦安装文档 DOC,集群成员注册后,会在管理集群创建一个 KubeFedCluster 资源来存储集群的基本信息,如API Endpoint、CA Bundle等,kubefed controller 通过这些信息来访问和管理联邦集群成员。

联邦化资源

Federation V2 可以联邦化任意资源,包括自定义的crd资源。对集群资源联邦化的实现主要是通过两种CRD来完成,分别是 FederatedTypeConfig 和 Federated ,FederatedTypeConfig定义了 Federated 和kubernetes api资源的关联关系。而 Federated 用来定义怎么去联邦化对应的kubernetes api资源。

KubeFed提供了一种自动化机制来将工作负载实例分散到不同的集群中,且能够基于总副本数与集群的定义策略来将Deployment或ReplicaSet资源进行编排。编排策略是通过创建ReplicaSchedulingPreference(RSP),再由KubeFed RSP Controller监听与获取RSP内容来将工作负载实例建立到指定的集群上。

# 管理K8s集群的工具

# miniKube

# Kind

# Cloud Providers

# Kube-DNS

Last Updated: 7/11/2023, 2:13:22 AM