k8s operator 与webhook

背景

上一次我们写了一个 xxl-job-agent,旨为在k8s的pod中运行一个sidecar注册执行器在xxl-job-admin,并可以进入业务容器执行命令。

后面发现每一次手动配置deploy,修改clusterroleBinding太麻烦,需要做到简单的自动化,所以这一次尝试使用Operator和Webhook来简化我们的操作。

Operator是什么

引用官方文档的一段话:

operator 旨在记述(正在管理一个或一组服务的)运维人员的关键目标。 这些运维人员负责一些特定的应用和 Service,他们需要清楚地知道系统应该如何运行、如何部署以及出现问题时如何处理。

在 Kubernetes 上运行工作负载的人们都喜欢通过自动化来处理重复的任务。 Operator 模式会封装你编写的(Kubernetes 本身提供功能以外的)任务自动化代码。

可以理解为 Operator 是运维开发人员用来辅助自动化的一个工具,例如:

  • 在一组sass服务中,以命名空间隔离,然后每个命名空间都需要一些特定的服务,可以使用Operator统一创建更新服务。

  • 使用 CRD, 创建集群秘钥,在每一个命名空间中都存有统一的容器镜像秘钥,或其他通用配置。

webhook是什么

引用维基百科的解释:网页开发中的网络钩子(Webhook)是一种通过自定义回调函数来增加或更改网页表现的方法。这些回调可被可能与原始网站或应用相关的第三方用户及开发者保存、修改与管理

k8s中的webhook被官方称为准入 Webhook,引用官方解释:

准入 Webhook 是一种用于接收准入请求并对其进行处理的 HTTP 回调机制。 可以定义两种类型的准入 webhook,即 验证性质的准入 Webhook修改性质的准入 Webhook。 修改性质的准入 Webhook 会先被调用。它们可以更改发送到 API 服务器的对象以执行自定义的设置默认值操作。

在完成了所有对象修改并且 API 服务器也验证了所传入的对象之后, 验证性质的 Webhook 会被调用,并通过拒绝请求的方式来强制实施自定义的策略。

编写Operator

在之前编写的xxl-job-agent 中,如果想要在新的命名空间中使用agent,每次都要给命名空间的default serviceAccount添加对应的clusterroleBinding,这次我们将使用Operator自动添加clusterrole。

快速编写operator可以使用https://book.kubebuilder.io/

1
2
3
4
5
mkdir xxl-agent-operator

cd xxl-agent-operator &&  kubebuilder init --domain xxxx.com --repo gitlab.xxx.cn/kubematrix/operators/xxl-agent-operator

kubebuilder create api --group xxl-agent --version v1 --kind Pod

注意,这里我们不需要创建Resource,只需要创建ControllerResource是CRD, 我们目前不需要CRD。

这里会生成一个controllers/pod_controller.go文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the Pod object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *PodReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
 ... 
}

修改 Reconcile上面的注解,监听pods的变动,一旦pods发生变动,就会通知到这里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (r *PodReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	ctx := context.Background()
	r.log = ctrl.Log.WithName("PodController").WithValues("Pod", req.Name)

	// found the pod
	pod := &v1.Pod{}
	if err := r.Client.Get(ctx, req.NamespacedName, pod); err != nil {
		return ctrl.Result{}, nil
	}

	// Ignore event of deleting pod
	if !pod.DeletionTimestamp.IsZero() {
		return ctrl.Result{}, nil
	}

	// get enable  annotation
	enable, ok := pod.Annotations[constant.XXL_AGENT_ANNOTATION_ENABLE]
	if !ok {
		return ctrl.Result{}, nil
	}

	// parse annotation enable
	if enable, err := strconv.ParseBool(enable); !enable || err != nil {
		return ctrl.Result{}, nil
	}

	if err := r.syncClusterRole(ctx, pod); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{}, nil
}

func (r *PodReconciler) syncClusterRole(ctx context.Context, pod *v1.Pod) error {
	role := &rbacV1.ClusterRole{
		ObjectMeta: metav1.ObjectMeta{
			Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE,
		},
		Rules: []rbacV1.PolicyRule{
			{
				APIGroups: []string{""},
				Resources: []string{"pods/exec"},
				Verbs:     []string{"create"},
			},
		},
	}
	// create role
	if err := r.Client.Get(ctx, client.ObjectKey{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE}, role); err != nil {
		if errors.IsNotFound(err) {
			if err := r.Create(ctx, role); err != nil {
				return err
			}
		}
	}

	// async roleBinding
	roleBinding := &rbacV1.ClusterRoleBinding{
		ObjectMeta: metav1.ObjectMeta{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE_BINDING},
		RoleRef: rbacV1.RoleRef{
			APIGroup: "rbac.authorization.k8s.io",
			Kind:     "ClusterRole",
			Name:     constant.XXL_AGENT_EXEC_CLUSTER_ROLE,
		},
		Subjects: []rbacV1.Subject{},
	}

	// get or create roleBinding
	if err := r.Client.Get(ctx, client.ObjectKey{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE_BINDING}, roleBinding); err != nil {
		if errors.IsNotFound(err) {
			if err := r.Create(ctx, roleBinding); err != nil {
				return err
			}
		}
	}

	// check the roleBinding already has the pod namespace
	if res := utils.ArrFirst(roleBinding.Subjects, func(i rbacV1.Subject) bool {
		return i.Namespace == pod.Namespace && i.Name == constant.XXL_AGENT_EXEC_SERVICE_ACCOUNT
	}); res == nil {

		origin := roleBinding.DeepCopy()

		roleBinding.Subjects = append(roleBinding.Subjects, rbacV1.Subject{
			Kind:      rbacV1.ServiceAccountKind,
			Name:      constant.XXL_AGENT_EXEC_SERVICE_ACCOUNT,
			Namespace: pod.Namespace,
		})
		desired, err := json.Marshal(roleBinding)
		if err != nil {
			return err
		}

		return r.Client.Patch(ctx, origin, client.RawPatch(types.MergePatchType, desired))
	}

	return nil
}

上面的代码也很简单,监听pods的变动,从pods的annotations中解析xxl-agent/enable注解,并且parse为布尔值,如果注解值是true,自动同步clusterrole。

笔者的理解是operator有点类似一个监听器,监听某些操作和变动,然后做出相应的调整,来保证服务或者工作负载能完好的运转。有点类似半自动化的k8s运维。

注意,这里我们不可以在operator中进行边车的注入,因为这个时候的pods已经准备好了,再进行边车注入会提示报错,需要在webhook中进行边车注入。

编写webhook

参考: https://github.com/kubernetes/kubernetes/blob/release-1.21/test/images/agnhost/webhook/main.go

webhook 动态准入,有点类似过滤器 / 中间件,在完成某个资源的请求之前通过webhook过滤一遍,可以实现动态调整配置,拒绝某些请求。

编写一个webhook在pods创建的时候拦截过滤,这个时候pods还没有创建好,所以我们可以注入边车。

可以创建一个 webhook.yaml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  labels:
    app: xxl-agent-injector
  name: xxl-agent-injector
webhooks:
  - admissionReviewVersions:
      - v1beta1
    clientConfig:
      caBundle: ${CA_BUNDLE}
      service:
        name: xxl-agent-injector-webhook
        namespace: matrixio
        path: /inject
        port: 443
    failurePolicy: Ignore
    matchPolicy: Exact
    name: xxl-agent-injector.duomai.com
    reinvocationPolicy: Never
    rules:
      - apiGroups:
          - ""
        apiVersions:
          - v1
        operations:
          - CREATE
        resources:
          - pods
        scope: '*'
    sideEffects: None
    timeoutSeconds: 30

上面就是一个webhook的配置文件,解释一下一些比较关键的配置:

  1. rules 匹配请求-规则

    rules可以是多个,每个rule都必须有一个或多个 operations、apiGroups、apiVersions 和 resources 以及资源的 scope

  • operations 匹配的操作,CREATEUPDATEDELETECONNECT*

  • apiGroups 匹配的API组,“” 是核心(core)组 , "*" 是匹配所有

  • resources 是匹配的资源

    • “*”匹配所有资源,但不包括子资源
    • "*/*" 匹配所有资源,包括子资源。
    • "pods/*" 匹配 pod 的所有子资源。
    • "*/status" 匹配所有 status 子资源。
  • scope指定要匹配的范围。有效值为"Cluster""Namespaced""*"。 子资源匹配其父资源的范围。默认值为"*"

  • "Cluster" 表示只有集群作用域的资源才能匹配此规则(API 对象 Namespace 是集群作用域的)。

    • "Namespaced" 意味着仅具有名字空间的资源才符合此规则。
  • "*" 表示没有作用域限制。

  1. clientConfig 服务引用

当前匹配请求之后,请求将转发至某个服务,这个服务可以是内部,也可以是外部。内部服务使用service,外部服务使用url

  • caBundle 根证书,可以使用export caBundle=$(kubectl get configmap -n kube-system extension-apiserver-authentication -o=jsonpath='{.data.client-ca-file}' | base64 | tr -d '\n') 命令获取k8s集群的根证书

  • service 集群内部服务, 当选择了内部服务时,namespacename是必填的, port默认是443,path默认是/。 注意其中name是serviceName

  • url 类似这种 https://xxx.xxx:443/inject url

所以上面那个配置文件中的rule意思就是,匹配所有pods的创建请求,在pods创建之前,请求matrixio命名空间下的xxl-agent-injector-webhook 服务, 请求的端口是443,路径 是/inject

注意:webhook服务必须是tls加密过的,这里的服务端证书可以使用脚本生成 ssl.sh 在使用脚本的时候注意修改自己的APPNAMESPACE, 如果使用本地url调试,可以手动修改 IP 至本机 ip

当一切都准备好之后,开始编写服务端的代码

main.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	// 创建路由
	router := mux.NewRouter()

	// 定义 Webhook 处理函数
	router.HandleFunc("/inject", func(writer http.ResponseWriter, request *http.Request) {
		f := handler.Inject

		server(writer, request, admitHandler{
			v1beta1: delegateV1beta1AdmitToV1(f),
			v1:      f,
		})
	})

	// 启动 Webhook 服务
	srv := &http.Server{
		Addr:    ":8443",
		Handler: router,
	}

	klog.Info("Webhook server is running...")
	if err := srv.ListenAndServeTLS(config.Options.Ssl.CertFile, config.Options.Ssl.KeyFile); err != nil {
		panic(err)
	}

config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package config

import (
   "github.com/spf13/viper"
)

var Options OptionConfig

func init() {
   if err := InitConfig(); err != nil {
      panic(err)
   }
}
func InitConfig() error {
   v := viper.New()
   v.SetConfigFile("config/application.yaml")
   if err := v.ReadInConfig(); err != nil {
      return err
   }
   return v.Unmarshal(&Options)
}

type InjectConfig struct {
   SideCarImage string `json:"sidecar-image" mapstructure:"sidecar-image" `
}

type SslConfig struct {
   CertFile string `json:"cert" mapstructure:"cert"`
   KeyFile  string `json:"key" mapstructure:"key"`
}

type OptionConfig struct {
   Ssl    SslConfig    `json:"ssl" mapstructure:"ssl"`
   Inject InjectConfig `json:"inject" mapstructure:"inject"`
}

上面的代码就是,从config/application.yaml中读取边车镜像,证书文件路径,然后启动一个http服务,绑定inject路由。

然后就是自己的逻辑代码,inject-handler.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

func Inject(ar v1.AdmissionReview) *v1.AdmissionResponse {
	raw := ar.Request.Object.Raw
	pod := corev1.Pod{}
	deserializer := Codecs.UniversalDeserializer()
  // parse pods
	if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
		klog.Error(err)
		return utils.ToV1AdmissionResponse(err)
	}

	klog.Infof("admitting pod %s", pod.GenerateName)

	// get enable  annotation
	enable, ok := pod.Annotations[constant.XXL_AGENT_ANNOTATION_ENABLE]
	if !ok {
		return nil
	}

	// parse annotation enable
	if enable, err := strconv.ParseBool(enable); !enable || err != nil {
		return nil
	}


	// 解析注解
	agentOpt, err := parseAnnotation(&pod)
	if err != nil {
		return nil
	}

	patch, err := createPatch(agentOpt)
	if err != nil {
		return utils.ToV1AdmissionResponse(err)
	}

	klog.Infof("success  patch pod %s,  %s", pod.GenerateName, patch)

	// 构造 AdmissionResponse 对象
	uid := string(uuid.NewUUID())
	pt := v1.PatchTypeJSONPatch

	return &v1.AdmissionResponse{
		UID:       types.UID(uid),
		Allowed:   true,
		Patch:     patch,
		PatchType: &pt,
	}
}

// parseAnnotation parse pod annotation
func parseAnnotation(pod *corev1.Pod) (agentOpt *options.AgentOptions, err error) {
	annotations := []string{
		constant.XXL_AGENT_JOB_ADDR,
		constant.XXL_AGENT_TARGET_CONTAINER,
		constant.XXL_AGENT_JOB_NAME,
	}
	agentOpt = &options.AgentOptions{}
	for _, annotation := range annotations {
		res, ok := pod.GetAnnotations()[annotation]
		if !ok {
			return nil, errors.PodAnnotationNotFoundError{Annotation: annotation}
		}
		if annotation == constant.XXL_AGENT_JOB_ADDR {
			agentOpt.XxlJobAdminAddr = res
		}
		if annotation == constant.XXL_AGENT_TARGET_CONTAINER {
			agentOpt.TargetContainer = res
		}
		if annotation == constant.XXL_AGENT_JOB_NAME {
			agentOpt.JobName = res
		}
	}
	return
}

// createPatch 
func createPatch(agentOpt *options.AgentOptions) ([]byte, error) {

	value := corev1.Container{
		Name:  constant.SIDE_CAR_CONTAINER_NAME,
		Image: config.Options.Inject.SideCarImage,
		Ports: []corev1.ContainerPort{
			{
				Name:          "http-xxl-agent",
				ContainerPort: 9999,
			},
		},
		Env: []corev1.EnvVar{
			{
				Name: agentConst.CONTAINER_NAME, Value: agentOpt.TargetContainer,
			},
			{
				Name: agentConst.XXL_JOB_NAME, Value: agentOpt.JobName,
			},
			{
				Name: agentConst.XXL_JOB_ADDR, Value: agentOpt.XxlJobAdminAddr,
			},
			{
				Name: agentConst.NAMESPACE, ValueFrom: &corev1.EnvVarSource{
					FieldRef: &corev1.ObjectFieldSelector{
						APIVersion: "v1",
						FieldPath:  "metadata.namespace",
					},
				},
			},
			{
				Name: agentConst.POD_NAME, ValueFrom: &corev1.EnvVarSource{
					FieldRef: &corev1.ObjectFieldSelector{
						APIVersion: "v1",
						FieldPath:  "metadata.name",
					},
				},
			},
		},
	}

  //  add a container to the pods 
	return json.Marshal([]map[string]interface{}{
		{
			"op":    "add",
			"path":  "/spec/containers/-",
			"value": value,
		},
	})
}

上面这段代码就是从请求中解析出pods, 然后从pods的注解中解析出配置,根据配置来生成 container ,然后生成对应的响应。

关于响应,请参考 https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/extensible-admission-controllers/#response

这里编写webhook的时候请注意,response的allow值一定要是true,不然所有的pods更新都会被拒绝。当然 如果你确实有拒绝pods更新的需求,也可以是false

然后patch 是多维数组 [["op" : "add","path" :","value":"" ],[...] ]


所有的operator,webhook 都建议在本地调试好之后再去集群中部署。

Licensed under CC BY-NC-SA 4.0
Built with Hugo
主题 StackJimmy 设计