背景
上一次我们写了一个 xxl-job-agent,旨为在k8s的pod中运行一个sidecar注册执行器在xxl-job-admin,并可以进入业务容器执行命令。
后面发现每一次手动配置deploy,修改clusterroleBinding太麻烦,需要做到简单的自动化,所以这一次尝试使用Operator和Webhook来简化我们的操作。
Operator是什么
引用官方文档的一段话:
operator 旨在记述(正在管理一个或一组服务的)运维人员的关键目标。 这些运维人员负责一些特定的应用和 Service,他们需要清楚地知道系统应该如何运行、如何部署以及出现问题时如何处理。
在 Kubernetes 上运行工作负载的人们都喜欢通过自动化来处理重复的任务。 Operator 模式会封装你编写的(Kubernetes 本身提供功能以外的)任务自动化代码。
可以理解为 Operator 是运维开发人员用来辅助自动化的一个工具,例如:
webhook是什么
引用维基百科的解释:网页开发中的网络钩子(Webhook)是一种通过自定义回调函数来增加或更改网页表现的方法。这些回调可被可能与原始网站或应用相关的第三方用户及开发者保存、修改与管理
k8s中的webhook被官方称为准入 Webhook,引用官方解释:
准入 Webhook 是一种用于接收准入请求并对其进行处理的 HTTP 回调机制。 可以定义两种类型的准入 webhook,即 验证性质的准入 Webhook 和 修改性质的准入 Webhook。 修改性质的准入 Webhook 会先被调用。它们可以更改发送到 API 服务器的对象以执行自定义的设置默认值操作。
在完成了所有对象修改并且 API 服务器也验证了所传入的对象之后, 验证性质的 Webhook 会被调用,并通过拒绝请求的方式来强制实施自定义的策略。
编写Operator
在之前编写的xxl-job-agent 中,如果想要在新的命名空间中使用agent,每次都要给命名空间的default serviceAccount添加对应的clusterroleBinding,这次我们将使用Operator自动添加clusterrole。
快速编写operator可以使用https://book.kubebuilder.io/
1
2
3
4
5
|
mkdir xxl-agent-operator
cd xxl-agent-operator && kubebuilder init --domain xxxx.com --repo gitlab.xxx.cn/kubematrix/operators/xxl-agent-operator
kubebuilder create api --group xxl-agent --version v1 --kind Pod
|
注意,这里我们不需要创建Resource
,只需要创建Controller
,Resource
是CRD, 我们目前不需要CRD。
这里会生成一个controllers/pod_controller.go
文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// the Pod object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *PodReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
...
}
|
修改 Reconcile上面的注解,监听pods的变动,一旦pods发生变动,就会通知到这里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
func (r *PodReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
r.log = ctrl.Log.WithName("PodController").WithValues("Pod", req.Name)
// found the pod
pod := &v1.Pod{}
if err := r.Client.Get(ctx, req.NamespacedName, pod); err != nil {
return ctrl.Result{}, nil
}
// Ignore event of deleting pod
if !pod.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}
// get enable annotation
enable, ok := pod.Annotations[constant.XXL_AGENT_ANNOTATION_ENABLE]
if !ok {
return ctrl.Result{}, nil
}
// parse annotation enable
if enable, err := strconv.ParseBool(enable); !enable || err != nil {
return ctrl.Result{}, nil
}
if err := r.syncClusterRole(ctx, pod); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *PodReconciler) syncClusterRole(ctx context.Context, pod *v1.Pod) error {
role := &rbacV1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE,
},
Rules: []rbacV1.PolicyRule{
{
APIGroups: []string{""},
Resources: []string{"pods/exec"},
Verbs: []string{"create"},
},
},
}
// create role
if err := r.Client.Get(ctx, client.ObjectKey{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE}, role); err != nil {
if errors.IsNotFound(err) {
if err := r.Create(ctx, role); err != nil {
return err
}
}
}
// async roleBinding
roleBinding := &rbacV1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE_BINDING},
RoleRef: rbacV1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE,
},
Subjects: []rbacV1.Subject{},
}
// get or create roleBinding
if err := r.Client.Get(ctx, client.ObjectKey{Name: constant.XXL_AGENT_EXEC_CLUSTER_ROLE_BINDING}, roleBinding); err != nil {
if errors.IsNotFound(err) {
if err := r.Create(ctx, roleBinding); err != nil {
return err
}
}
}
// check the roleBinding already has the pod namespace
if res := utils.ArrFirst(roleBinding.Subjects, func(i rbacV1.Subject) bool {
return i.Namespace == pod.Namespace && i.Name == constant.XXL_AGENT_EXEC_SERVICE_ACCOUNT
}); res == nil {
origin := roleBinding.DeepCopy()
roleBinding.Subjects = append(roleBinding.Subjects, rbacV1.Subject{
Kind: rbacV1.ServiceAccountKind,
Name: constant.XXL_AGENT_EXEC_SERVICE_ACCOUNT,
Namespace: pod.Namespace,
})
desired, err := json.Marshal(roleBinding)
if err != nil {
return err
}
return r.Client.Patch(ctx, origin, client.RawPatch(types.MergePatchType, desired))
}
return nil
}
|
上面的代码也很简单,监听pods的变动,从pods的annotations中解析xxl-agent/enable注解,并且parse为布尔值,如果注解值是true,自动同步clusterrole。
笔者的理解是operator有点类似一个监听器,监听某些操作和变动,然后做出相应的调整,来保证服务或者工作负载能完好的运转。有点类似半自动化的k8s运维。
注意,这里我们不可以在operator中进行边车的注入,因为这个时候的pods已经准备好了,再进行边车注入会提示报错,需要在webhook中进行边车注入。
编写webhook
参考: https://github.com/kubernetes/kubernetes/blob/release-1.21/test/images/agnhost/webhook/main.go
webhook 动态准入,有点类似过滤器 / 中间件,在完成某个资源的请求之前通过webhook过滤一遍,可以实现动态调整配置,拒绝某些请求。
编写一个webhook在pods创建的时候拦截过滤,这个时候pods还没有创建好,所以我们可以注入边车。
可以创建一个 webhook.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
labels:
app: xxl-agent-injector
name: xxl-agent-injector
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
caBundle: ${CA_BUNDLE}
service:
name: xxl-agent-injector-webhook
namespace: matrixio
path: /inject
port: 443
failurePolicy: Ignore
matchPolicy: Exact
name: xxl-agent-injector.duomai.com
reinvocationPolicy: Never
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
sideEffects: None
timeoutSeconds: 30
|
上面就是一个webhook的配置文件,解释一下一些比较关键的配置:
-
rules 匹配请求-规则
rules可以是多个,每个rule都必须有一个或多个 operations、apiGroups、apiVersions 和 resources 以及资源的 scope
-
operations
匹配的操作,CREATE
、UPDATE
、DELETE
、CONNECT
或 *
-
apiGroups
匹配的API组,“”
是核心(core)组 , "*"
是匹配所有
-
resources
是匹配的资源
“*”
匹配所有资源,但不包括子资源
"*/*"
匹配所有资源,包括子资源。
"pods/*"
匹配 pod 的所有子资源。
"*/status"
匹配所有 status 子资源。
-
scope
指定要匹配的范围。有效值为"Cluster"
、"Namespaced"
和 "*"
。 子资源匹配其父资源的范围。默认值为"*"
-
"Cluster"
表示只有集群作用域的资源才能匹配此规则(API 对象 Namespace 是集群作用域的)。
"Namespaced"
意味着仅具有名字空间的资源才符合此规则。
-
"*"
表示没有作用域限制。
- clientConfig 服务引用
当前匹配请求之后,请求将转发至某个服务,这个服务可以是内部,也可以是外部。内部服务使用service
,外部服务使用url
-
caBundle 根证书,可以使用export caBundle=$(kubectl get configmap -n kube-system extension-apiserver-authentication -o=jsonpath='{.data.client-ca-file}' | base64 | tr -d '\n')
命令获取k8s集群的根证书
-
service 集群内部服务, 当选择了内部服务时,namespace
和name
是必填的, port
默认是443
,path
默认是/
。 注意其中name
是serviceName
-
url 类似这种 https://xxx.xxx:443/inject url
所以上面那个配置文件中的rule意思就是,匹配所有pods
的创建请求,在pods
创建之前,请求matrixio
命名空间下的xxl-agent-injector-webhook
服务, 请求的端口是443
,路径 是/inject
注意:webhook服务必须是tls加密过的,这里的服务端证书可以使用脚本生成 ssl.sh 在使用脚本的时候注意修改自己的APP
和 NAMESPACE
, 如果使用本地url调试,可以手动修改 IP 至本机 ip
当一切都准备好之后,开始编写服务端的代码
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// 创建路由
router := mux.NewRouter()
// 定义 Webhook 处理函数
router.HandleFunc("/inject", func(writer http.ResponseWriter, request *http.Request) {
f := handler.Inject
server(writer, request, admitHandler{
v1beta1: delegateV1beta1AdmitToV1(f),
v1: f,
})
})
// 启动 Webhook 服务
srv := &http.Server{
Addr: ":8443",
Handler: router,
}
klog.Info("Webhook server is running...")
if err := srv.ListenAndServeTLS(config.Options.Ssl.CertFile, config.Options.Ssl.KeyFile); err != nil {
panic(err)
}
|
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package config
import (
"github.com/spf13/viper"
)
var Options OptionConfig
func init() {
if err := InitConfig(); err != nil {
panic(err)
}
}
func InitConfig() error {
v := viper.New()
v.SetConfigFile("config/application.yaml")
if err := v.ReadInConfig(); err != nil {
return err
}
return v.Unmarshal(&Options)
}
type InjectConfig struct {
SideCarImage string `json:"sidecar-image" mapstructure:"sidecar-image" `
}
type SslConfig struct {
CertFile string `json:"cert" mapstructure:"cert"`
KeyFile string `json:"key" mapstructure:"key"`
}
type OptionConfig struct {
Ssl SslConfig `json:"ssl" mapstructure:"ssl"`
Inject InjectConfig `json:"inject" mapstructure:"inject"`
}
|
上面的代码就是,从config/application.yaml
中读取边车镜像,证书文件路径,然后启动一个http服务,绑定inject
路由。
然后就是自己的逻辑代码,inject-handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
func Inject(ar v1.AdmissionReview) *v1.AdmissionResponse {
raw := ar.Request.Object.Raw
pod := corev1.Pod{}
deserializer := Codecs.UniversalDeserializer()
// parse pods
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
klog.Error(err)
return utils.ToV1AdmissionResponse(err)
}
klog.Infof("admitting pod %s", pod.GenerateName)
// get enable annotation
enable, ok := pod.Annotations[constant.XXL_AGENT_ANNOTATION_ENABLE]
if !ok {
return nil
}
// parse annotation enable
if enable, err := strconv.ParseBool(enable); !enable || err != nil {
return nil
}
// 解析注解
agentOpt, err := parseAnnotation(&pod)
if err != nil {
return nil
}
patch, err := createPatch(agentOpt)
if err != nil {
return utils.ToV1AdmissionResponse(err)
}
klog.Infof("success patch pod %s, %s", pod.GenerateName, patch)
// 构造 AdmissionResponse 对象
uid := string(uuid.NewUUID())
pt := v1.PatchTypeJSONPatch
return &v1.AdmissionResponse{
UID: types.UID(uid),
Allowed: true,
Patch: patch,
PatchType: &pt,
}
}
// parseAnnotation parse pod annotation
func parseAnnotation(pod *corev1.Pod) (agentOpt *options.AgentOptions, err error) {
annotations := []string{
constant.XXL_AGENT_JOB_ADDR,
constant.XXL_AGENT_TARGET_CONTAINER,
constant.XXL_AGENT_JOB_NAME,
}
agentOpt = &options.AgentOptions{}
for _, annotation := range annotations {
res, ok := pod.GetAnnotations()[annotation]
if !ok {
return nil, errors.PodAnnotationNotFoundError{Annotation: annotation}
}
if annotation == constant.XXL_AGENT_JOB_ADDR {
agentOpt.XxlJobAdminAddr = res
}
if annotation == constant.XXL_AGENT_TARGET_CONTAINER {
agentOpt.TargetContainer = res
}
if annotation == constant.XXL_AGENT_JOB_NAME {
agentOpt.JobName = res
}
}
return
}
// createPatch
func createPatch(agentOpt *options.AgentOptions) ([]byte, error) {
value := corev1.Container{
Name: constant.SIDE_CAR_CONTAINER_NAME,
Image: config.Options.Inject.SideCarImage,
Ports: []corev1.ContainerPort{
{
Name: "http-xxl-agent",
ContainerPort: 9999,
},
},
Env: []corev1.EnvVar{
{
Name: agentConst.CONTAINER_NAME, Value: agentOpt.TargetContainer,
},
{
Name: agentConst.XXL_JOB_NAME, Value: agentOpt.JobName,
},
{
Name: agentConst.XXL_JOB_ADDR, Value: agentOpt.XxlJobAdminAddr,
},
{
Name: agentConst.NAMESPACE, ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
{
Name: agentConst.POD_NAME, ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
},
}
// add a container to the pods
return json.Marshal([]map[string]interface{}{
{
"op": "add",
"path": "/spec/containers/-",
"value": value,
},
})
}
|
上面这段代码就是从请求中解析出pods, 然后从pods的注解中解析出配置,根据配置来生成 container
,然后生成对应的响应。
关于响应,请参考 https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/extensible-admission-controllers/#response
这里编写webhook的时候请注意,response的allow值一定要是true,不然所有的pods更新都会被拒绝。当然 如果你确实有拒绝pods更新的需求,也可以是false
然后patch 是多维数组 [["op" : "add","path" :","value":"" ],[...] ]
所有的operator,webhook 都建议在本地调试好之后再去集群中部署。