在开发 operator,每种自定义资源只能包含有两种子资源——Status 和 Scale。这里探究一下Status的使用。

关键对象

Status资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

// ApplicationSpec defines the desired state of Application
type ApplicationSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Foo is an example field of Application. Edit application_types.go to remove/update
Foo string `json:"foo,omitempty"`
}

// ApplicationStatus defines the observed state of Application
type ApplicationStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// Application is the Schema for the applications API
type Application struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ApplicationSpec `json:"spec,omitempty"`
Status ApplicationStatus `json:"status,omitempty"`
}

Spec 和 Status 都是 Sample 的成员变量,Status并不像Pod.Status一样,是Pod的subResource.因此,如果我们在controller的代码中调用到Status().Update(), 会触发panic,并报错:the server could not find the requested resource

如果我们想像k8s中的设计那样,那么就要遵循k8s中status subresource的使用规范:

用户只能指定一个CRD实例的spec部分;
CRD实例的status部分由控制器进行变更。
此时我们需要在 Sample 中添加一行注解// +kubebuilder:subresource:status,标明 status 是 Sample 的子资源。

其实 Status 是一个非常重要的字段,在 Controller 中实现业务代码时,需要遵循 K8s 的声明式API的思想,因此需要依据 Status 的字段去进行 CR 处理的逻辑,即判断当前CR所处状态,对比其期望状态,然后做出一定的动作。

eventRecorder

Status 字段用于 CR 的逻辑处理,但在后期维护过程中,则一部分会依赖于 CR 所产生的 Event,这有些类似于在 CR 的业务逻辑中添加部分重要的日志,最终可以帮助我们定位问题。后期运行过程中,可以通过 kubectl describe cr -n xxx 看到这些 Event。

  1. 在controller的处理对象中添加Log和Recorder,并在业务逻辑中记录
1
2
3
4
5
6
7
8
9
10
11
12
13
type SampleReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
// 在业务逻辑中记录
func (r *SampleReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
sample := &samplev1.Sample{}
_ := r.Get(ctx, client.ObjectKey{Name: req.Name, Namespace: req.Namespace}, sample)
// do something
r.Recorder.Eventf(sample, corev1.EventTypeWarning, "Error", "some error")
}

K8s 为我们提供了2种等级的 event,分别是 Normal 和 Warning。

finalizer

finalizer即终结器,存在于每一个k8s内的资源实例中,即**.metadata.finalizers,它是一个字符串数组,每一个成员表示一个finalizer。

控制器在删除某个资源时,K8s 会更新 CR 的 DeletionTimestamp 字段,这样会触发一个 Update 动作,我们的 controller 可以监听这个字段的变更,实现 CR 的预删除处理,比如删除由 CR 创建的某些资源。

当我们需要设计这类finalizer时,就可以自定义一个controller来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (c *SampleReconciler) finalize(ctx context.Context, sample *samplev1.Sample) error {
logs := c.getLog(sample)
// 这里我们自定义一个 finalizer 字段
myFinalizerName := "sample.finalizers.io"

if sample.ObjectMeta.DeletionTimestamp.IsZero() {
//这里由于 DeletionTimestamp 是0,即没有删除操作,则不进行处理,只检查 CR 中是否含有自定义 finalizer 字符,若没有则增加。
if !containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
sample.ObjectMeta.Finalizers = append(sample.ObjectMeta.Finalizers, myFinalizerName)
}
} else {
//进行预删除操作
if containsString(sample.ObjectMeta.Finalizers, myFinalizerName) {
// do something
// 从 CR 中删除自定义 finalizer 字段。
sample.ObjectMeta.Finalizers = removeString(sample.ObjectMeta.Finalizers, myFinalizerName)
}
return nil
}
return nil
}

Reconsiler的处理

了解 controller-runtime 包的处理逻辑。Informer 机制在包中已经实现,我们就不再过多关注,假设现在已经监听到 CR 的变化事件(包括 创建、更新、删除、扩展),这个事件则会进入 WorkQueue 中。在进入 WorkQueue 之前, controller-runtime 会进行一些过滤处理和业务处理。主要涉及接口是 EventHandler 和 Predicate。

EventHandler 可以在事件入队列之前加入其他逻辑,其定义如下

EventHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//controller-runtime@v0.5.0\pkg\handler\eventhandler.go
//此处定义了针对不同事件的处理接口,我们可以通过实现此接口完成扩展业务逻辑
type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
Create(event.CreateEvent, workqueue.RateLimitingInterface)

// Update is called in response to an update event - e.g. Pod Updated.
Update(event.UpdateEvent, workqueue.RateLimitingInterface)

// Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(event.DeleteEvent, workqueue.RateLimitingInterface)

// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(event.GenericEvent, workqueue.RateLimitingInterface)
}

Predicate

Predicate 则是对监听到的事件进行过滤,让我们只关注我们想要的时间,其结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool

// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool

// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool

// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}

controller-runtime的处理逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//controller-runtime@v0.5.0\pkg\source\internal\eventsource.go
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}

func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
...
// 这里可以自定义 Predicates,将事件进行过滤
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}

// 调用了上面的 EventHandler 对应的逻辑
e.EventHandler.Create(c, e.Queue)
}

// 除了 OnAdd 外,还有 OnUpdate OnDelete

最终入队列的数据结构如下,即只有 namespace 和name,并没有资源的类型。

1
2
3
4
5
type NamespacedName struct {
Namespace string
Name string
}

在 controller-runtime 包中的 controller.Start()方法中,则会循环从队列中拿取一个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// controller.go
func (c *Controller) Start(stop <-chan struct{}) error {
...
// 启动多个 worker 线程,处理事件
log.Info("Starting workers", "controller", c.Name, "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.Until(c.worker, c.JitterPeriod, stop)
}
...
}
func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 拿取一个事件
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
return false
}

// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.Queue.Done(obj)
//处理事件
return c.reconcileHandler(obj)
}

监控多个资源变化(如何监听)

可能会遇到 CR -> Deployment -> Pod 的逻辑,即由CR 创建deployment,最终落入pod。这时,我们不仅需要监听 CR 的变化, deployment 以及 pod 的变化我们也需要关注,这就意味着我们在 reconciler 中也需要根据deployment变化进行相应的处理。我们在SampleReconciler的SetupWithManager方法中,可以看到:

1
2
3
4
5
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
Complete(r)
}

这里的 For 则可以指定需要监听的资源。进一步,我们看NewControllerManagedBy()的源码可以发现pkg\builder\controller.go 中实现了 构建者模式,controller-runtime 提供了一个 builder 方便我们进行配置。其中监听资源的有 For() Owns() 和Watch() ,For() Owns()都是基于Watch()实现,只不过是入队列前的 eventHandler 不同。

简单来说,我们可以调用Watch()实现监听deployment

1
2
3
4
5
6
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
// 这里可以供我们指定 eventhandler
Watches(&source.Kind{Type: &apps.Deployment{&#125;&#125;, &handler.EnqueueRequestForObject{}).
Complete(r)

但这样做会监听所有 deployment 事件变化,如果我们想只关注由 CR 创建的 Deployment 因此我们可以采用 Owns() 方法。

1
2
3
4
5
6
func (r *SampleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplev1.Sample{}).
// 这里可以供我们指定 eventhandler
Owns(&apps.Deployment{}).
Complete(r)

当我们使用 CR 创建 deployment 时,可以为他塞入一个从属关系,类似于 Pod 资源的Metadata 里会有一个OnwerReference字段.这样在监听时,只会监听到带有 OwnerShip 的 deployment。

1
_ = controllerutil.SetControllerReference(sample, &deployment, r.Scheme)

监听多资源下evnetHandler 和 Reconciler 的逻辑

我们的 controller Reconciler 业务逻辑,实际上只应该处理 CR 的变化,但有时是 CR 所拥有的 Deployment 发生了变化,但对应的 CR 并不会有更新事件,因此我们需要在自定义eventHandler中,对资源进行判断。若是 CR 的变化,则直接向队列写入 namespace 和 name,若是 deployment 的变化,则向队列写入 deployment 对应 CR 的namespace 和name,触发 Reconciler 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (e *EnqueueRequestForDP) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Meta == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
_, ok := evt.Object.(*samplev1.Sample)
if ok {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.Name,
Namespace: evt.Meta.GetNamespace(),
&#125;&#125;)
return
}
deploy ,_:= evt.Object.(*v1.Deployment)
for _, owner := range deploy.OwnerReferences {
if owner.Kind == "Sample" {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: owner.Name,
Namespace: evt.Meta.GetNamespace(),
&#125;&#125;)
}
}
}

监听指定字段变化

依据上文,自定义 predicate 即可实现。 比如我们只对 CR 改变 label 的事件感兴趣,我们此时可以自定义 Predicate 以及其 OnUpdate() 方法

1
2
3
4
5
6
7
8
9
10
func (rl *ResourceLabelChangedPredicate) Update (e event.UpdateEvent) bool{
_, ok1 = e.ObjectOld.(*samplev1.Sample)
_, ok2 = e.ObjectNew.(*samplev1.Sample)
if ok1 && ok2 {
if !compareMaps(e.MetaOld.GetLabels(), e.MetaNew.GetLabels()) {
return true
}
}
return false
}

当我们监听多个资源后, deployment 的更新事件也会进入到这个方法,所以在方法中,需要通过 _, ok = e.Object(*xxx) 判断资源的类型。

多版本切换

在crd的开发和演进过程中,必然会存在一个crd的不同版本。 kubebuilder支持以一个conversion webhook的方式,支持对一个crd资源以不同版本进行读取。简单地描述就是:

1
kubectl apply -f config/samples/batch_v2_cronjob.yaml

创建一个v2的cronjob后,可以通过v1和v2两种版本进行读取:

1
2
kubectl get cronjobs.v2.batch.tutorial.kubebuilder.io -o yaml
kubectl get cronjobs.v1.batch.tutorial.kubebuilder.io -o yaml

显然,get命令得到的v1和v2版本的cronjob会存在一些字段上的不同,conversion webhook会负责进行不同版本的cronjob之间的数据转换。

原理解读

manager