4795 字
24 分钟
Agent + K8s 初步开发
Agent + K8s 初步开发
引言
你是否曾想过用自然语言来管理Kubernetes集群?比如说”帮我在生产环境创建3个nginx的副本”,系统就能自动完成部署?本文将带你从零开始,构建一个能够理解自然语言并自动执行K8s操作的智能Agent系统。
即使你是K8s初学者,通过本文你也能理解:
- Kubernetes是如何通过API进行管理的
- 如何用Go语言操作K8s集群
- 如何构建一个高性能的资源监控系统
- 如何设计RESTful API来封装K8s操作
一、项目架构概览
在开始编码之前,让我们先了解整体架构:
┌─────────────────────────────────────────┐│ 用户 (自然语言输入) │└─────────────────┬───────────────────────┘ ↓┌─────────────────────────────────────────┐│ RESTful API层 (Gin框架) ││ • 接收HTTP请求 ││ • 参数验证 ││ • 返回JSON响应 │└─────────────────┬───────────────────────┘ ↓┌─────────────────────────────────────────┐│ 服务层 (业务逻辑) ││ • 资源映射 (RESTMapper) ││ • CRUD操作 ││ • 缓存管理 │└─────────────────┬───────────────────────┘ ↓┌─────────────────────────────────────────┐│ K8s客户端层 (client-go) ││ ┌──────────────────────────────┐ ││ │ • Clientset (类型化客户端) │ ││ │ • Dynamic (动态客户端) │ ││ │ • RESTClient (REST客户端) │ ││ │ • Discovery (发现客户端) │ ││ └──────────────────────────────┘ │└─────────────────┬───────────────────────┘ ↓┌─────────────────────────────────────────┐│ Informer机制 (缓存+监听) ││ • 本地缓存 ││ • 事件监听 ││ • 减少API调用 │└─────────────────┬───────────────────────┘ ↓┌─────────────────────────────────────────┐│ Kubernetes API Server │└─────────────────────────────────────────┘
二、核心组件详解
2.1 四种客户端 - 不同场景的不同选择
Kubernetes提供了多种客户端,就像工具箱里的不同工具,各有用途。让我们先了解这四种客户端,然后再看如何连接K8s集群。
2.1.1 Clientset - 最常用的类型化客户端
这是最友好的客户端,适合操作K8s的标准资源(Pod、Service、Deployment等):
// 创建Clientset客户端func CreateClientset() (*kubernetes.Clientset, error) { // 1. 先加载配置(后面会详细介绍) config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { return nil, err }
// 2. 基于配置创建Clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err }
return clientset, nil}
// 使用示例:列出所有运行中的Podfunc ListRunningPods(clientset *kubernetes.Clientset) { // CoreV1() - 返回CoreV1Interface接口 // 功能:访问核心API组的v1版本资源(Pod、Service、ConfigMap等) // 其他组:AppsV1() - Deployment、StatefulSet // BatchV1() - Job、CronJob
// Pods("default") - 返回PodInterface接口 // 参数:namespace字符串,指定要操作的命名空间 // 功能:获取该命名空间下Pod资源的操作接口
// List() - 执行列表查询 // 参数: // - context.TODO():上下文,可控制超时、取消等 // - metav1.ListOptions:查询选项 // - FieldSelector:字段选择器,按资源字段过滤 // - LabelSelector:标签选择器,按标签过滤 // - Limit:限制返回数量 // - Continue:分页标记 pods, err := clientset. CoreV1(). Pods("default"). List(context.TODO(), metav1.ListOptions{ FieldSelector: "status.phase=Running", // 只获取运行中的Pod })
if err != nil { log.Fatal(err) }
// pods.Items - []v1.Pod类型,强类型数组 // 每个Pod对象都有完整的类型定义,IDE可以自动提示 fmt.Println("运行中的Pod列表:") for _, pod := range pods.Items { fmt.Printf("- %s (创建时间: %s)\n", pod.Name, // 直接访问字段 pod.CreationTimestamp.Format("2006-01-02 15:04:05")) }}
优势:
- ✅ 代码有自动提示,不容易写错
- ✅ 编译时就能发现错误
- ✅ 返回的是强类型数据,使用方便
劣势:
- ❌ 只能操作K8s预定义的资源
- ❌ 不支持自定义资源(CRD)
2.1.2 Dynamic Client - 灵活的动态客户端
当你需要操作自定义资源(CRD)或者资源类型在运行时才确定时,使用动态客户端:
// 创建Dynamic客户端func CreateDynamicClient() (dynamic.Interface, error) { // 1. 加载配置 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { return nil, err }
// 2. 创建动态客户端 dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return nil, err }
return dynamicClient, nil}
// 使用示例:操作自定义资源func OperateCustomResource(dynamicClient dynamic.Interface) { // 定义GVR - 这是动态客户端的核心概念 // GVR (GroupVersionResource) 唯一标识一种资源类型 gvr := schema.GroupVersionResource{ Group: "mycompany.com", // API组,CRD通常使用域名 Version: "v1", // API版本 Resource: "myapps", // 资源名称(必须是复数) }
// 创建一个自定义资源对象 // unstructured.Unstructured - 非结构化数据 // 本质是 map[string]interface{} 的封装 // 可以表示任意K8s资源,无需预定义Go结构体 myApp := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "mycompany.com/v1", // 必需:API版本 "kind": "MyApp", // 必需:资源种类 "metadata": map[string]interface{}{ // 必需:元数据 "name": "example-app", "namespace": "default", }, "spec": map[string]interface{}{ // 自定义字段 "replicas": 3, "image": "nginx:latest", }, }, }
// Resource(gvr) - 指定要操作的资源类型 // Namespace("default") - 指定命名空间 // Create() - 执行创建操作 result, err := dynamicClient. Resource(gvr). Namespace("default"). Create(context.TODO(), myApp, metav1.CreateOptions{})
if err != nil { log.Fatal(err) }
// result也是 *unstructured.Unstructured 类型 // 使用 GetXXX 方法访问字段 fmt.Printf("创建成功: %s\n", result.GetName())}
使用场景:
- 操作CRD(自定义资源)
- 编写通用的K8s工具
- 资源类型由用户输入决定
2.1.3 RESTClient - 底层HTTP客户端
需要完全控制HTTP请求时使用,比如调试或特殊API:
// 创建RESTClientfunc CreateRESTClient() (*rest.RESTClient, error) { // 1. 加载基础配置 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { return nil, err }
// 2. RESTClient需要额外配置 // 必须指定API路径和Group/Version信息 config.APIPath = "/api" // API路径前缀 config.GroupVersion = &corev1.SchemeGroupVersion // 指定要访问的API组版本 config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() // 序列化器
// 3. 创建RESTClient restClient, err := rest.RESTClientFor(config) if err != nil { return nil, err }
return restClient, nil}
// 使用示例:获取Pod日志(这是Clientset做不到的)func GetPodLogs(restClient *rest.RESTClient) { // Get() - 创建GET请求构建器 // AbsPath() - 使用绝对路径(从根/开始) // 路径格式:/api/v1/namespaces/{namespace}/pods/{name}/log req := restClient. Get(). AbsPath("/api/v1/namespaces/default/pods/my-pod/log"). Param("follow", "true"). // 添加查询参数:实时跟踪 Param("container", "nginx") // 指定容器
// Stream() - 获取响应流(适合日志等流式数据) // 也可以用 Do() 获取一次性响应 stream, err := req.Stream(context.TODO()) if err != nil { log.Fatal(err) } defer stream.Close()
// 读取日志流 buf := make([]byte, 1024) for { n, err := stream.Read(buf) if err != nil { break } fmt.Print(string(buf[:n])) }}
其他RESTClient操作:
// POST请求 - 创建资源restClient.Post().AbsPath("/api/v1/namespaces/default/pods").Body(podObj).Do(ctx)
// DELETE请求 - 删除资源restClient.Delete().AbsPath("/api/v1/namespaces/default/pods/my-pod").Do(ctx)
// PATCH请求 - 部分更新restClient.Patch(types.MergePatchType).AbsPath("...").Body(patchData).Do(ctx)
2.1.4 Discovery Client - 探索集群能力
用于发现集群支持哪些API和资源,是实现智能Agent的关键:
// 创建Discovery客户端func CreateDiscoveryClient() (*discovery.DiscoveryClient, error) { config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { return nil, err }
// 创建Discovery客户端 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { return nil, err }
return discoveryClient, nil}
// 使用示例:发现集群支持的所有资源func DiscoverClusterResources(discoveryClient *discovery.DiscoveryClient) { // ServerGroups() - 获取所有API组信息 // 返回 *metav1.APIGroupList apiGroups, err := discoveryClient.ServerGroups() if err != nil { log.Fatal(err) }
fmt.Println("集群支持的API组:") for _, group := range apiGroups.Groups { // 空字符串表示核心组(v1) if group.Name == "" { fmt.Println("\n核心组 (core):") } else { fmt.Printf("\n组名: %s\n", group.Name) }
fmt.Println(" 支持的版本:") for _, version := range group.Versions { // GroupVersion格式: "apps/v1", "batch/v1" fmt.Printf(" - %s\n", version.Version) }
// PreferredVersion - 该组的首选版本 fmt.Printf(" 首选版本: %s\n", group.PreferredVersion.Version) }
// ServerResourcesForGroupVersion() - 获取特定组版本的所有资源 resources, err := discoveryClient.ServerResourcesForGroupVersion("apps/v1") if err == nil { fmt.Println("\napps/v1 组的资源:") for _, r := range resources.APIResources { // Name: 资源名称(复数) // Kind: 资源种类 // Namespaced: 是否是命名空间级资源 // Verbs: 支持的操作 ["create", "delete", "get", "list", "patch", "update", "watch"] fmt.Printf(" - %s (Kind: %s, Namespaced: %v)\n", r.Name, r.Kind, r.Namespaced) } }
// ServerVersion() - 获取K8s版本信息 version, _ := discoveryClient.ServerVersion() fmt.Printf("\nKubernetes版本: %s\n", version.GitVersion)}
2.2 GVR vs GVK - K8s资源的两种标识方式
在深入RESTMapper之前,我们需要理解K8s中两个核心概念:
GVR (GroupVersionResource)
用途:标识API中的资源路径,用于实际的API调用
// GVR示例gvr := schema.GroupVersionResource{ Group: "apps", // API组 Version: "v1", // 版本 Resource: "deployments", // 资源(复数形式)}// 对应的API路径:/apis/apps/v1/deployments// 如果Group为空(核心组):/api/v1/pods
GVK (GroupVersionKind)
用途:标识资源的类型,用于YAML/JSON中的apiVersion和kind字段
// GVK示例gvk := schema.GroupVersionKind{ Group: "apps", // API组 Version: "v1", // 版本 Kind: "Deployment", // 种类(单数形式,首字母大写)}// 对应YAML中:// apiVersion: apps/v1// kind: Deployment
GVR与GVK的区别和联系
// 区别对照表// ┌─────────────┬──────────────────┬─────────────────┐// │ 属性 │ GVR │ GVK │// ├─────────────┼──────────────────┼─────────────────┤// │ 用途 │ API路径/URL构建 │ 对象类型标识 │// │ 资源名称 │ 复数(deployments) │ 单数(Deployment) │// │ 使用场景 │ 客户端API调用 │ YAML/JSON定义 │// │ 示例 │ pods、services │ Pod、Service │// └─────────────┴──────────────────┴─────────────────┘
// 转换示例func ShowGVRAndGVK() { // 用户视角:操作Deployment // GVK - 用户在YAML中写的 gvk := schema.GroupVersionKind{ Group: "apps", Version: "v1", Kind: "Deployment", }
// GVR - 实际API调用使用的 gvr := schema.GroupVersionResource{ Group: "apps", Version: "v1", Resource: "deployments", // 注意:复数 }
// RESTMapper的作用就是在两者之间转换 // Kind -> Resource: Deployment -> deployments // Resource -> Kind: pods -> Pod}
2.3 配置管理器 - 连接K8s的第一步
了解了客户端类型后,让我们看看如何建立与K8s集群的连接:
package config
import ( "path/filepath" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir")
// K8sConfig 管理所有K8s相关的配置和客户端type K8sConfig struct { *rest.Config // K8s连接配置 *kubernetes.Clientset // K8s客户端 e error // 错误信息}
// NewK8sConfig 创建配置管理器func NewK8sConfig() *K8sConfig { return &K8sConfig{}}
// InitRestConfig 自动寻找并加载kubeconfig文件// kubeconfig文件包含了连接K8s集群所需的所有信息:// - API Server地址// - 认证信息(证书、token等)// - 默认命名空间等func (k *K8sConfig) InitRestConfig() *K8sConfig { // 自动定位到 ~/.kube/config 文件 kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
// 加载配置文件 config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { k.e = err return k }
k.Config = config return k}
// InitClientSet 基于配置创建K8s客户端func (k *K8sConfig) InitClientSet() *kubernetes.Clientset { if k.Config == nil { k.e = errors.New("请先初始化配置") return nil }
clientSet, err := kubernetes.NewForConfig(k.Config) if err != nil { k.e = err return nil }
k.Clientset = clientSet return clientSet}
使用方式:
// 链式调用,简洁优雅config := NewK8sConfig().InitRestConfig()clientSet := config.InitClientSet()
if config.Error() != nil { log.Fatal("初始化失败:", config.Error())}
// 现在可以创建各种客户端了dynamicClient, _ := dynamic.NewForConfig(config.Config)discoveryClient, _ := discovery.NewDiscoveryClientForConfig(config.Config)
2.4 RESTMapper - 智能资源映射器
RESTMapper是Agent理解用户输入的关键组件。它解决了GVR和GVK之间的转换问题。
为什么需要Discovery客户端?
// 初始化RESTMapperfunc InitRestMapper(clientSet *kubernetes.Clientset) meta.RESTMapper { // 这里看起来传入的是Clientset,但实际使用的是其Discovery()方法 // clientSet.Discovery() 返回 discovery.DiscoveryInterface // 原因:Clientset内部包含了Discovery客户端
// GetAPIGroupResources使用Discovery客户端来: // 1. 调用 /api 和 /apis 端点 // 2. 获取所有API组的信息 // 3. 获取每个组支持的资源类型 // 4. 构建资源映射关系 gr, err := restmapper.GetAPIGroupResources(clientSet.Discovery()) if err != nil { panic(err) }
// 创建映射器,内部构建了多个映射表: // - Resource -> Kind (pods -> Pod) // - Kind -> Resource (Service -> services) // - 资源的作用域信息(命名空间级/集群级) // - 资源的首选版本信息 mapper := restmapper.NewDiscoveryRESTMapper(gr) return mapper}
RESTMapper的工作流程
// RESTMapper简洁工作流程:// 1. 初始化阶段(启动时执行一次)// Discovery客户端 -> 获取所有API信息 -> 构建映射表//// 2. 运行时映射(处理用户请求)// 用户输入 -> 解析 -> 查询映射表 -> 返回完整资源信息
func MappingWorkflow() { // 步骤1: 用户输入各种格式 userInputs := []string{ "pod", // 简单资源名 "pods", // 复数形式 "deployment", // 简写 "deploy", // 更短的简写 "pods.v1", // 带版本 "deployments.apps", // 带组名 }
// 步骤2: RESTMapper处理流程 // parseInput -> 解析输入格式 // lookupResource -> 查找资源定义 // resolveVersion -> 确定最佳API版本 // buildMapping -> 构建完整映射信息
// 步骤3: 返回RESTMapping对象,包含: // - GVR: 用于API调用 // - GVK: 资源类型信息 // - Scope: 命名空间级还是集群级 // - 其他元信息}
智能映射函数实现
// 智能映射函数 - 处理各种用户输入func MappingFor(userInput string, restMapper meta.RESTMapper) (*meta.RESTMapping, error) { // 尝试解析为Resource(资源名称) // ParseResourceArg处理格式:pods、pods.v1、deployments.apps.v1 fullySpecifiedGVR, groupResource := schema.ParseResourceArg(userInput)
// 尝试解析为Kind(种类名称) // ParseKindArg处理格式:Pod、Pod.v1、Deployment.apps fullySpecifiedGVK, groupKind := schema.ParseKindArg(userInput)
var mapping *meta.RESTMapping var err error
// 策略1:如果是完整的GVR,先转换为GVK,再获取映射 if fullySpecifiedGVR != nil { gvk, _ := restMapper.KindFor(*fullySpecifiedGVR) if !gvk.Empty() { mapping, err = restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) } }
// 策略2:如果是GVK格式,直接获取映射 if mapping == nil && fullySpecifiedGVK != nil { mapping, err = restMapper.RESTMapping( fullySpecifiedGVK.GroupKind(), fullySpecifiedGVK.Version, ) }
// 策略3:只有组和种类,使用首选版本 if mapping == nil && !groupKind.Empty() { mapping, err = restMapper.RESTMapping(groupKind) }
return mapping, err}
RESTMapper的核心价值:
// 用户输入 -> K8s资源"pod" -> GVR{Group:"", Version:"v1", Resource:"pods"}"deploy" -> GVR{Group:"apps", Version:"v1", Resource:"deployments"}"svc" -> GVR{Group:"", Version:"v1", Resource:"services"}"node" -> GVR{Group:"", Version:"v1", Resource:"nodes"} + Scope:Cluster"ns" -> GVR{Group:"", Version:"v1", Resource:"namespaces"} + Scope:Cluster
2.5 Informer机制 - 高性能的缓存系统
Informer是K8s客户端的核心组件,它通过本地缓存和事件监听机制,大幅提升性能:
// 创建SharedInformerFactory - 推荐方式func InitInformerFactory(clientSet *kubernetes.Clientset) informers.SharedInformerFactory { // 创建工厂,0表示不自动重新同步 factory := informers.NewSharedInformerFactoryWithOptions( clientSet, 0, informers.WithNamespace("default"), // 只监听default命名空间 )
// 注册Pod事件处理器 podInformer := factory.Core().V1().Pods() podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) fmt.Printf("✨ 新Pod创建: %s\n", pod.Name) }, UpdateFunc: func(oldObj, newObj interface{}) { newPod := newObj.(*v1.Pod) fmt.Printf("🔄 Pod更新: %s\n", newPod.Name) }, DeleteFunc: func(obj interface{}) { pod := obj.(*v1.Pod) fmt.Printf("🗑️ Pod删除: %s\n", pod.Name) }, })
// 启动Informer stopCh := make(chan struct{}) factory.Start(stopCh)
// 等待缓存同步完成 factory.WaitForCacheSync(stopCh)
return factory}
// 使用Lister从缓存读取数据(超快!)func ListPodsFromCache(factory informers.SharedInformerFactory) { // 从本地缓存获取,不会访问API Server pods, err := factory.Core().V1().Pods().Lister(). Pods("default"). List(labels.Everything())
if err != nil { log.Fatal(err) }
fmt.Printf("缓存中有 %d 个Pod\n", len(pods))}
Informer工作原理:
1. List: 首次启动时获取所有资源2. Watch: 监听资源变化事件3. Cache: 在本地维护资源副本4. 事件分发: 将变化通知给所有处理器
优势:- 减少API调用(从缓存读取)- 实时感知变化- 多个组件共享同一份数据
2.6 构建RESTful API - 对外提供服务
现在让我们将所有组件整合,构建一个完整的API服务:
服务层实现
package services
import ( "context" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic")
type ResourceService struct { dynamicClient dynamic.Interface restMapper meta.RESTMapper informerFactory informers.SharedInformerFactory}
// 创建资源 - 支持YAML输入func (s *ResourceService) CreateResource( resourceType string, // 如 "deployment", "service" namespace string, yamlContent string,) error { // 1. 解析YAML obj := &unstructured.Unstructured{} _, _, err := scheme.Codecs.UniversalDeserializer(). Decode([]byte(yamlContent), nil, obj) if err != nil { return fmt.Errorf("YAML解析失败: %v", err) }
// 2. 获取资源映射 mapping, err := MappingFor(resourceType, s.restMapper) if err != nil { return fmt.Errorf("未知的资源类型: %s", resourceType) }
// 3. 判断资源作用域 var resourceInterface dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { // 命名空间级资源 resourceInterface = s.dynamicClient. Resource(mapping.Resource). Namespace(namespace) } else { // 集群级资源 resourceInterface = s.dynamicClient. Resource(mapping.Resource) }
// 4. 创建资源 _, err = resourceInterface.Create( context.Background(), obj, metav1.CreateOptions{}, )
return err}
// 列出资源 - 从缓存读取func (s *ResourceService) ListResources( resourceType string, namespace string,) ([]unstructured.Unstructured, error) { // 获取资源映射 mapping, err := MappingFor(resourceType, s.restMapper) if err != nil { return nil, err }
// 从Informer缓存获取 informer, err := s.informerFactory.ForResource(mapping.Resource) if err != nil { return nil, err }
// 列出资源 var items []unstructured.Unstructured if mapping.Scope.Name() == meta.RESTScopeNameNamespace { objs, _ := informer.Lister().ByNamespace(namespace).List(labels.Everything()) for _, obj := range objs { items = append(items, *obj.(*unstructured.Unstructured)) } } else { objs, _ := informer.Lister().List(labels.Everything()) for _, obj := range objs { items = append(items, *obj.(*unstructured.Unstructured)) } }
return items, nil}
// 删除资源func (s *ResourceService) DeleteResource( resourceType string, namespace string, name string,) error { mapping, err := MappingFor(resourceType, s.restMapper) if err != nil { return err }
var resourceInterface dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { resourceInterface = s.dynamicClient. Resource(mapping.Resource). Namespace(namespace) } else { resourceInterface = s.dynamicClient. Resource(mapping.Resource) }
return resourceInterface.Delete( context.Background(), name, metav1.DeleteOptions{}, )}
HTTP控制器实现
package controllers
import ( "github.com/gin-gonic/gin" "your-project/services")
type ResourceController struct { service *services.ResourceService}
// POST /:resource - 创建资源func (c *ResourceController) Create() gin.HandlerFunc { return func(ctx *gin.Context) { resourceType := ctx.Param("resource") namespace := ctx.DefaultQuery("namespace", "default")
var body struct { Yaml string `json:"yaml" binding:"required"` }
if err := ctx.ShouldBindJSON(&body); err != nil { ctx.JSON(400, gin.H{ "error": "请提供有效的YAML内容", }) return }
err := c.service.CreateResource(resourceType, namespace, body.Yaml) if err != nil { ctx.JSON(500, gin.H{ "error": fmt.Sprintf("创建失败: %v", err), }) return }
ctx.JSON(201, gin.H{ "message": fmt.Sprintf("%s 创建成功", resourceType), }) }}
// GET /:resource - 列出资源func (c *ResourceController) List() gin.HandlerFunc { return func(ctx *gin.Context) { resourceType := ctx.Param("resource") namespace := ctx.DefaultQuery("namespace", "default")
items, err := c.service.ListResources(resourceType, namespace) if err != nil { ctx.JSON(500, gin.H{ "error": fmt.Sprintf("查询失败: %v", err), }) return }
// 简化输出,只返回名称列表 var names []string for _, item := range items { names = append(names, item.GetName()) }
ctx.JSON(200, gin.H{ "namespace": namespace, "resource": resourceType, "count": len(names), "items": names, }) }}
// DELETE /:resource/:name - 删除资源func (c *ResourceController) Delete() gin.HandlerFunc { return func(ctx *gin.Context) { resourceType := ctx.Param("resource") name := ctx.Param("name") namespace := ctx.DefaultQuery("namespace", "default")
err := c.service.DeleteResource(resourceType, namespace, name) if err != nil { ctx.JSON(500, gin.H{ "error": fmt.Sprintf("删除失败: %v", err), }) return }
ctx.JSON(200, gin.H{ "message": fmt.Sprintf("%s/%s 删除成功", resourceType, name), }) }}
主程序 - 启动服务
package main
import ( "log" "github.com/gin-gonic/gin" "k8s.io/client-go/dynamic" "your-project/config" "your-project/services" "your-project/controllers")
func main() { // 1. 初始化K8s配置 log.Println("正在初始化K8s配置...") k8sConfig := config.NewK8sConfig().InitRestConfig() if k8sConfig.Error() != nil { log.Fatal("配置加载失败:", k8sConfig.Error()) }
// 2. 创建客户端 clientSet := k8sConfig.InitClientSet() dynamicClient, err := dynamic.NewForConfig(k8sConfig.Config) if err != nil { log.Fatal("创建动态客户端失败:", err) }
// 3. 初始化RESTMapper log.Println("正在初始化资源映射器...") restMapper := InitRestMapper(clientSet)
// 4. 初始化Informer log.Println("正在初始化缓存系统...") informerFactory := InitInformerFactory(clientSet)
// 5. 创建服务层 resourceService := &services.ResourceService{ DynamicClient: dynamicClient, RestMapper: restMapper, InformerFactory: informerFactory, }
// 6. 创建控制器 resourceController := &controllers.ResourceController{ Service: resourceService, }
// 7. 设置路由 router := gin.Default()
// 资源操作路由 router.POST("/:resource", resourceController.Create()) router.GET("/:resource", resourceController.List()) router.DELETE("/:resource/:name", resourceController.Delete())
// 健康检查 router.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{"status": "healthy"}) })
// 8. 启动服务 log.Println("K8s Agent服务启动在 http://localhost:8080") if err := router.Run(":8080"); err != nil { log.Fatal("服务启动失败:", err) }}
三、实战演示
3.1 API使用示例
启动服务后,你可以通过以下方式使用API:
# 1. 创建一个Nginx Podcurl -X POST http://localhost:8080/pods?namespace=default \ -H "Content-Type: application/json" \ -d '{ "yaml": "apiVersion: v1\nkind: Pod\nmetadata:\n name: nginx-demo\nspec:\n containers:\n - name: nginx\n image: nginx:latest\n ports:\n - containerPort: 80" }'
# 响应: {"message": "pods 创建成功"}
# 2. 列出所有Podscurl http://localhost:8080/pods?namespace=default
# 响应:# {# "namespace": "default",# "resource": "pods",# "count": 3,# "items": ["nginx-demo", "app-1", "app-2"]# }
# 3. 创建一个Deploymentcurl -X POST http://localhost:8080/deployments \ -H "Content-Type: application/json" \ -d '{ "yaml": "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: nginx-deployment\nspec:\n replicas: 3\n selector:\n matchLabels:\n app: nginx\n template:\n metadata:\n labels:\n app: nginx\n spec:\n containers:\n - name: nginx\n image: nginx:1.14.2" }'
# 4. 删除资源curl -X DELETE http://localhost:8080/pods/nginx-demo?namespace=default
# 响应: {"message": "pods/nginx-demo 删除成功"}
# 5. 操作不同类型的资源curl http://localhost:8080/services # 列出服务curl http://localhost:8080/deployments # 列出部署curl http://localhost:8080/nodes # 列出节点(集群级资源)curl http://localhost:8080/namespaces # 列出命名空间
3.2 观察Informer事件
当你创建、修改或删除资源时,控制台会实时显示事件:
✨ 新Pod创建: nginx-demo🔄 Pod更新: nginx-demo (状态: Pending -> Running)🗑️ Pod删除: nginx-demo
四、集成自然语言处理
现在我们有了完整的K8s操作API,下一步是添加自然语言理解:
// NLP处理示例(伪代码)type NLPProcessor struct { resourceService *services.ResourceService}
func (p *NLPProcessor) ProcessCommand(input string) string { // 意图识别 intent := p.detectIntent(input)
// 实体提取 entities := p.extractEntities(input)
// 执行操作 switch intent { case "CREATE": // "创建3个nginx的副本" resourceType := entities["resource"] // deployment replicas := entities["count"] // 3 image := entities["image"] // nginx
yaml := p.generateYAML(resourceType, replicas, image) err := p.resourceService.CreateResource( resourceType, "default", yaml, )
if err != nil { return fmt.Sprintf("创建失败: %v", err) } return "已成功创建部署"
case "LIST": // "显示所有运行的服务" items, _ := p.resourceService.ListResources( entities["resource"], entities["namespace"], ) return fmt.Sprintf("找到 %d 个资源", len(items))
case "DELETE": // "删除测试环境的mysql" err := p.resourceService.DeleteResource( entities["resource"], entities["namespace"], entities["name"], )
if err != nil { return fmt.Sprintf("删除失败: %v", err) } return "删除成功" }
return "抱歉,我不理解这个命令"}
参考资源
Agent + K8s 初步开发
https://fuwari.vercel.app/posts/agent_k8s_initial_development/