背景

业务中有使用k8s部署和管理微服务,微服务的数量多了之后偶尔会出现因人工操作疏漏而导致版本错乱的现象。比如某个微服务临时要切到灰度版本,结果灰度执行完成后忘记切换回正式版本;再比如一些cronjob所使用的镜像版本可能落后于deployment中运行的版本(cronjob配置忘记更新镜像版本)。这些状况都可能给业务带来损害,为此我编写了一个检查规则可配置的工具来兜底提醒,工具通过k8s的client-go客户端获取微服务的信息跟配置中的信息做比对,对于异常信息做出预警。

插件模式

这里使用到上一篇中的插件组件,插件组件的介绍可以查阅这篇内容《一个插件引擎,让自己的Go程序支持插件式扩展

go-tagdog

配置文件

目前实现了tag检查告警忽略以及告警三个插件,其中告警插件偷了个懒只是直接Print了,其实可以按需扩展邮件、企业微信、钉钉等告警方式。各插件可配置的内容参考配置文件,均有注释说明。

# 配置kube config 路径
kube_config: "/Users/kcat/.kube/config"
# 配置目标集群中的namespace
namespace: "default"
#配置调用k8s接口的超时时间,单位:秒
timeout: 600

# 配置需要启用的规则插件,alarm建议配置在最前,ignore建议配置在最后
# 每个插件都可以定制(前序逻辑)和(后续逻辑),系统将按照插件顺序执行(前序逻辑)再逆序执行(后续逻辑)
# 其中alarm在(后续逻辑)中实现了告警输出
plugins:
  - alarm
  - tagcheck
  - ignore

# 以下是各插件的自主配置,由各插件自主解析和使用
alarm:
  enable: true

ignore:
  enable: true
  list: # 类型::pod名称.container名称:屏蔽告警截止时间(在此时间之前 将忽略该container的告警)
    deployment::testpod.testcontainer0: "2022-03-10 00:00:00"
    deployment::testpod.testcontainer3: "2022-03-10 00:00:00"
    deployment::testpod.testcontainer4: ""
    deployment::testpod.testcontainer5: "2022-03-10 00:00:00"
    deployment::testpod.testcontainer7: "2022-03-07 00:00:00"
    deployment::testpod.testcontainer9: "2022-03-10 00:00:00"
    cronjob::testpod.testcontainer: "" # 也可以不设置时间 将无限期忽略

tagcheck:
  enable: true
  list: # 检查使用以下镜像的容器tag是否正确
    ccr.ccs.tencentyun.com/kcat/nginx: "latest"
    ccr.ccs.tencentyun.com/kcat/typecho: "latest"
  cronjob: true #检查cronjob所采用的镜像tag是否与deployment一致

TagCheck插件

做Tag验证的主要是TagCheck插件,配置文件中需要配置启用tagcheck并且tagcheck中配置镜像和版本

package plugins

import (
    "context"
    "fmt"
    "github.com/kcatcat/go-plugin/plugin"
    "gopkg.in/yaml.v3"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "log"
    "strings"
)

func init() {
    plugin.Register("tagcheck", TagCheck())
}

// TagConfig 配置文件结构
type TagConfig struct {
    Root TagOption `yaml:"tagcheck"`
}

// TagOption 配置文件结构
type TagOption struct {
    Enable       bool              `yaml:"enable"`
    List         map[string]string `yaml:"list,flow"`
    CheckCronjob bool              `yaml:"cronjob"`
}

// TagCheck 按配置校验指定的镜像版本是否正确
// 自动校验cronjob中镜像tag与deployment是否一致
func TagCheck() plugin.Plugin {
    return func(ctx context.Context, in, out interface{}, next plugin.NextHandle) (err error) {
        log.Printf("enter tagcheck")

        isEnabled := false
        c := &TagConfig{}
        if input, ok := in.(*Input); ok {
            //解析配置文件,不使用err,避免污染链路返回
            localErr := yaml.Unmarshal(input.ConfigContent, c)
            if localErr != nil {
                log.Printf("ignore config decode err:%v", localErr)
            }
            isEnabled = c.Root.Enable == true
        }
        pIn := in.(*Input)
        pOut := out.(*Output)
        if isEnabled && (c.Root.CheckCronjob || len(c.Root.List) > 0) {
            // 获取deployment列表
            deploymentList, localErr := (*pIn).ClientSet.AppsV1().Deployments((*pIn).Namespace).List(ctx, metav1.ListOptions{})
            if localErr != nil {
                log.Printf("get deployment list err:%v", localErr)
            }

            imageMap := make(map[string]string)
            for _, d := range deploymentList.Items {
                // 获取deployment详情
                deployment, localErr := (*pIn).ClientSet.AppsV1().Deployments((*pIn).Namespace).Get(ctx, d.Name, metav1.GetOptions{})
                if err != nil {
                    log.Printf("deployment get error:%v", localErr)
                    continue
                }

                containers := &deployment.Spec.Template.Spec.Containers
                for _, container := range *containers {
                    // 提取镜像名称和Tag
                    image := strings.Split(container.Image, ":")[0]
                    tag := strings.Split(container.Image, ":")[1]
                    if _, ok := imageMap[image]; !ok {
                        // 存储镜像与tag以备检查cronjob中tag的一致性
                        imageMap[image] = tag
                    }
                    if targetTag, ok := c.Root.List[image]; ok && targetTag != tag {
                        // 指定镜像的tag不匹配
                        o := Result{}
                        o.Msg = fmt.Sprintf("tag not match, need(%s) ,got(%s)", targetTag, tag)
                        o.Field = map[string]string{
                            "kind":      "deployment",
                            "pod":       d.Name,
                            "container": container.Name,
                            "image":     image,
                            "tag":       tag,
                        }
                        *pOut = append(*pOut, o)
                    }
                }
            }

            // 检查cronjob中tag的一致性
            if c.Root.CheckCronjob && len(imageMap) > 0 {
                cronjobList, localErr := (*pIn).ClientSet.BatchV1beta1().CronJobs((*pIn).Namespace).List(ctx, metav1.ListOptions{})
                if localErr != nil {
                    log.Printf("get cronjob list err:%v", localErr)
                }

                for _, cj := range cronjobList.Items {
                    cronjob, localErr := (*pIn).ClientSet.BatchV1beta1().CronJobs((*pIn).Namespace).Get(ctx, cj.Name, metav1.GetOptions{})
                    if localErr != nil {
                        log.Printf("deployment get error:%v", localErr)
                        continue
                    }

                    if *cronjob.Spec.Suspend {
                        // 跳过暂停的任务
                        continue
                    }
                    containers := &cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers
                    for _, container := range *containers {
                        image := strings.Split(container.Image, ":")[0]
                        tag := strings.Split(container.Image, ":")[1]
                        if _, ok := imageMap[image]; ok && tag != imageMap[image] {
                            // 指定镜像的tag不匹配
                            o := Result{}
                            o.Msg = fmt.Sprintf("tag not match, need(%s) ,got(%s)", imageMap[image], tag)
                            o.Field = map[string]string{
                                "kind":      "cronjob",
                                "pod":       cj.Name,
                                "container": container.Name,
                                "image":     image,
                                "tag":       tag,
                            }
                            *pOut = append(*pOut, o)
                        }
                    }
                }
            }
        }

        // before the next request
        if next != nil {
            err = next(ctx, in, out)
        }
        // after the last response
        log.Printf("exit from tagcheck")
        return err
    }
}

附录

完整源代码:https://github.com/kcatcat/go-tagdog

目录结构

.
├── config.yaml # 主配置文件,可在配置中启用和关闭插件
├── go.mod
├── go.sum
├── main.go # 入口文件
└── plugins # 插件目录
    ├── alarm.go # 告警插件,可按需修改为邮件、企业微信等通知方式
    ├── entity.go # 实体定义
    ├── ignore.go # 忽略插件,可配置免告警策略
    └── tagcheck.go # Tag校验插件,校验目标镜像的Tag是否符合预期