4795 字
24 分钟
Agent + K8s 初步开发

Agent + K8s 初步开发#

引言#

你是否曾想过用自然语言来管理Kubernetes集群?比如说”帮我在生产环境创建3个nginx的副本”,系统就能自动完成部署?本文将带你从零开始,构建一个能够理解自然语言并自动执行K8s操作的智能Agent系统。

即使你是K8s初学者,通过本文你也能理解:

  • Kubernetes是如何通过API进行管理的
  • 如何用Go语言操作K8s集群
  • 如何构建一个高性能的资源监控系统
  • 如何设计RESTful API来封装K8s操作

一、项目架构概览#

在开始编码之前,让我们先了解整体架构:

┌─────────────────────────────────────────┐
│ 用户 (自然语言输入) │
└─────────────────┬───────────────────────┘
┌─────────────────────────────────────────┐
│ RESTful API层 (Gin框架) │
│ • 接收HTTP请求 │
│ • 参数验证 │
│ • 返回JSON响应 │
└─────────────────┬───────────────────────┘
┌─────────────────────────────────────────┐
│ 服务层 (业务逻辑) │
│ • 资源映射 (RESTMapper) │
│ • CRUD操作 │
│ • 缓存管理 │
└─────────────────┬───────────────────────┘
┌─────────────────────────────────────────┐
│ K8s客户端层 (client-go) │
│ ┌──────────────────────────────┐ │
│ │ • Clientset (类型化客户端) │ │
│ │ • Dynamic (动态客户端) │ │
│ │ • RESTClient (REST客户端) │ │
│ │ • Discovery (发现客户端) │ │
│ └──────────────────────────────┘ │
└─────────────────┬───────────────────────┘
┌─────────────────────────────────────────┐
│ Informer机制 (缓存+监听) │
│ • 本地缓存 │
│ • 事件监听 │
│ • 减少API调用 │
└─────────────────┬───────────────────────┘
┌─────────────────────────────────────────┐
│ Kubernetes API Server │
└─────────────────────────────────────────┘

二、核心组件详解#

2.1 四种客户端 - 不同场景的不同选择#

Kubernetes提供了多种客户端,就像工具箱里的不同工具,各有用途。让我们先了解这四种客户端,然后再看如何连接K8s集群。

2.1.1 Clientset - 最常用的类型化客户端#

这是最友好的客户端,适合操作K8s的标准资源(Pod、Service、Deployment等):

// 创建Clientset客户端
func CreateClientset() (*kubernetes.Clientset, error) {
// 1. 先加载配置(后面会详细介绍)
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
return nil, err
}
// 2. 基于配置创建Clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}
// 使用示例:列出所有运行中的Pod
func ListRunningPods(clientset *kubernetes.Clientset) {
// CoreV1() - 返回CoreV1Interface接口
// 功能:访问核心API组的v1版本资源(Pod、Service、ConfigMap等)
// 其他组:AppsV1() - Deployment、StatefulSet
// BatchV1() - Job、CronJob
// Pods("default") - 返回PodInterface接口
// 参数:namespace字符串,指定要操作的命名空间
// 功能:获取该命名空间下Pod资源的操作接口
// List() - 执行列表查询
// 参数:
// - context.TODO():上下文,可控制超时、取消等
// - metav1.ListOptions:查询选项
// - FieldSelector:字段选择器,按资源字段过滤
// - LabelSelector:标签选择器,按标签过滤
// - Limit:限制返回数量
// - Continue:分页标记
pods, err := clientset.
CoreV1().
Pods("default").
List(context.TODO(), metav1.ListOptions{
FieldSelector: "status.phase=Running", // 只获取运行中的Pod
})
if err != nil {
log.Fatal(err)
}
// pods.Items - []v1.Pod类型,强类型数组
// 每个Pod对象都有完整的类型定义,IDE可以自动提示
fmt.Println("运行中的Pod列表:")
for _, pod := range pods.Items {
fmt.Printf("- %s (创建时间: %s)\n",
pod.Name, // 直接访问字段
pod.CreationTimestamp.Format("2006-01-02 15:04:05"))
}
}

优势

  • ✅ 代码有自动提示,不容易写错
  • ✅ 编译时就能发现错误
  • ✅ 返回的是强类型数据,使用方便

劣势

  • ❌ 只能操作K8s预定义的资源
  • ❌ 不支持自定义资源(CRD)

2.1.2 Dynamic Client - 灵活的动态客户端#

当你需要操作自定义资源(CRD)或者资源类型在运行时才确定时,使用动态客户端:

// 创建Dynamic客户端
func CreateDynamicClient() (dynamic.Interface, error) {
// 1. 加载配置
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
return nil, err
}
// 2. 创建动态客户端
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return dynamicClient, nil
}
// 使用示例:操作自定义资源
func OperateCustomResource(dynamicClient dynamic.Interface) {
// 定义GVR - 这是动态客户端的核心概念
// GVR (GroupVersionResource) 唯一标识一种资源类型
gvr := schema.GroupVersionResource{
Group: "mycompany.com", // API组,CRD通常使用域名
Version: "v1", // API版本
Resource: "myapps", // 资源名称(必须是复数)
}
// 创建一个自定义资源对象
// unstructured.Unstructured - 非结构化数据
// 本质是 map[string]interface{} 的封装
// 可以表示任意K8s资源,无需预定义Go结构体
myApp := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "mycompany.com/v1", // 必需:API版本
"kind": "MyApp", // 必需:资源种类
"metadata": map[string]interface{}{ // 必需:元数据
"name": "example-app",
"namespace": "default",
},
"spec": map[string]interface{}{ // 自定义字段
"replicas": 3,
"image": "nginx:latest",
},
},
}
// Resource(gvr) - 指定要操作的资源类型
// Namespace("default") - 指定命名空间
// Create() - 执行创建操作
result, err := dynamicClient.
Resource(gvr).
Namespace("default").
Create(context.TODO(), myApp, metav1.CreateOptions{})
if err != nil {
log.Fatal(err)
}
// result也是 *unstructured.Unstructured 类型
// 使用 GetXXX 方法访问字段
fmt.Printf("创建成功: %s\n", result.GetName())
}

使用场景

  • 操作CRD(自定义资源)
  • 编写通用的K8s工具
  • 资源类型由用户输入决定

2.1.3 RESTClient - 底层HTTP客户端#

需要完全控制HTTP请求时使用,比如调试或特殊API:

// 创建RESTClient
func CreateRESTClient() (*rest.RESTClient, error) {
// 1. 加载基础配置
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
return nil, err
}
// 2. RESTClient需要额外配置
// 必须指定API路径和Group/Version信息
config.APIPath = "/api" // API路径前缀
config.GroupVersion = &corev1.SchemeGroupVersion // 指定要访问的API组版本
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() // 序列化器
// 3. 创建RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, err
}
return restClient, nil
}
// 使用示例:获取Pod日志(这是Clientset做不到的)
func GetPodLogs(restClient *rest.RESTClient) {
// Get() - 创建GET请求构建器
// AbsPath() - 使用绝对路径(从根/开始)
// 路径格式:/api/v1/namespaces/{namespace}/pods/{name}/log
req := restClient.
Get().
AbsPath("/api/v1/namespaces/default/pods/my-pod/log").
Param("follow", "true"). // 添加查询参数:实时跟踪
Param("container", "nginx") // 指定容器
// Stream() - 获取响应流(适合日志等流式数据)
// 也可以用 Do() 获取一次性响应
stream, err := req.Stream(context.TODO())
if err != nil {
log.Fatal(err)
}
defer stream.Close()
// 读取日志流
buf := make([]byte, 1024)
for {
n, err := stream.Read(buf)
if err != nil {
break
}
fmt.Print(string(buf[:n]))
}
}

其他RESTClient操作

// POST请求 - 创建资源
restClient.Post().AbsPath("/api/v1/namespaces/default/pods").Body(podObj).Do(ctx)
// DELETE请求 - 删除资源
restClient.Delete().AbsPath("/api/v1/namespaces/default/pods/my-pod").Do(ctx)
// PATCH请求 - 部分更新
restClient.Patch(types.MergePatchType).AbsPath("...").Body(patchData).Do(ctx)

2.1.4 Discovery Client - 探索集群能力#

用于发现集群支持哪些API和资源,是实现智能Agent的关键:

// 创建Discovery客户端
func CreateDiscoveryClient() (*discovery.DiscoveryClient, error) {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
return nil, err
}
// 创建Discovery客户端
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
return discoveryClient, nil
}
// 使用示例:发现集群支持的所有资源
func DiscoverClusterResources(discoveryClient *discovery.DiscoveryClient) {
// ServerGroups() - 获取所有API组信息
// 返回 *metav1.APIGroupList
apiGroups, err := discoveryClient.ServerGroups()
if err != nil {
log.Fatal(err)
}
fmt.Println("集群支持的API组:")
for _, group := range apiGroups.Groups {
// 空字符串表示核心组(v1)
if group.Name == "" {
fmt.Println("\n核心组 (core):")
} else {
fmt.Printf("\n组名: %s\n", group.Name)
}
fmt.Println(" 支持的版本:")
for _, version := range group.Versions {
// GroupVersion格式: "apps/v1", "batch/v1"
fmt.Printf(" - %s\n", version.Version)
}
// PreferredVersion - 该组的首选版本
fmt.Printf(" 首选版本: %s\n", group.PreferredVersion.Version)
}
// ServerResourcesForGroupVersion() - 获取特定组版本的所有资源
resources, err := discoveryClient.ServerResourcesForGroupVersion("apps/v1")
if err == nil {
fmt.Println("\napps/v1 组的资源:")
for _, r := range resources.APIResources {
// Name: 资源名称(复数)
// Kind: 资源种类
// Namespaced: 是否是命名空间级资源
// Verbs: 支持的操作 ["create", "delete", "get", "list", "patch", "update", "watch"]
fmt.Printf(" - %s (Kind: %s, Namespaced: %v)\n",
r.Name, r.Kind, r.Namespaced)
}
}
// ServerVersion() - 获取K8s版本信息
version, _ := discoveryClient.ServerVersion()
fmt.Printf("\nKubernetes版本: %s\n", version.GitVersion)
}

2.2 GVR vs GVK - K8s资源的两种标识方式#

在深入RESTMapper之前,我们需要理解K8s中两个核心概念:

GVR (GroupVersionResource)#

用途:标识API中的资源路径,用于实际的API调用

// GVR示例
gvr := schema.GroupVersionResource{
Group: "apps", // API组
Version: "v1", // 版本
Resource: "deployments", // 资源(复数形式)
}
// 对应的API路径:/apis/apps/v1/deployments
// 如果Group为空(核心组):/api/v1/pods

GVK (GroupVersionKind)#

用途:标识资源的类型,用于YAML/JSON中的apiVersion和kind字段

// GVK示例
gvk := schema.GroupVersionKind{
Group: "apps", // API组
Version: "v1", // 版本
Kind: "Deployment", // 种类(单数形式,首字母大写)
}
// 对应YAML中:
// apiVersion: apps/v1
// kind: Deployment

GVR与GVK的区别和联系#

// 区别对照表
// ┌─────────────┬──────────────────┬─────────────────┐
// │ 属性 │ GVR │ GVK │
// ├─────────────┼──────────────────┼─────────────────┤
// │ 用途 │ API路径/URL构建 │ 对象类型标识 │
// │ 资源名称 │ 复数(deployments) │ 单数(Deployment) │
// │ 使用场景 │ 客户端API调用 │ YAML/JSON定义 │
// │ 示例 │ pods、services │ Pod、Service │
// └─────────────┴──────────────────┴─────────────────┘
// 转换示例
func ShowGVRAndGVK() {
// 用户视角:操作Deployment
// GVK - 用户在YAML中写的
gvk := schema.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Deployment",
}
// GVR - 实际API调用使用的
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments", // 注意:复数
}
// RESTMapper的作用就是在两者之间转换
// Kind -> Resource: Deployment -> deployments
// Resource -> Kind: pods -> Pod
}

2.3 配置管理器 - 连接K8s的第一步#

了解了客户端类型后,让我们看看如何建立与K8s集群的连接:

package config
import (
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// K8sConfig 管理所有K8s相关的配置和客户端
type K8sConfig struct {
*rest.Config // K8s连接配置
*kubernetes.Clientset // K8s客户端
e error // 错误信息
}
// NewK8sConfig 创建配置管理器
func NewK8sConfig() *K8sConfig {
return &K8sConfig{}
}
// InitRestConfig 自动寻找并加载kubeconfig文件
// kubeconfig文件包含了连接K8s集群所需的所有信息:
// - API Server地址
// - 认证信息(证书、token等)
// - 默认命名空间等
func (k *K8sConfig) InitRestConfig() *K8sConfig {
// 自动定位到 ~/.kube/config 文件
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
// 加载配置文件
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
k.e = err
return k
}
k.Config = config
return k
}
// InitClientSet 基于配置创建K8s客户端
func (k *K8sConfig) InitClientSet() *kubernetes.Clientset {
if k.Config == nil {
k.e = errors.New("请先初始化配置")
return nil
}
clientSet, err := kubernetes.NewForConfig(k.Config)
if err != nil {
k.e = err
return nil
}
k.Clientset = clientSet
return clientSet
}

使用方式

// 链式调用,简洁优雅
config := NewK8sConfig().InitRestConfig()
clientSet := config.InitClientSet()
if config.Error() != nil {
log.Fatal("初始化失败:", config.Error())
}
// 现在可以创建各种客户端了
dynamicClient, _ := dynamic.NewForConfig(config.Config)
discoveryClient, _ := discovery.NewDiscoveryClientForConfig(config.Config)

2.4 RESTMapper - 智能资源映射器#

RESTMapper是Agent理解用户输入的关键组件。它解决了GVR和GVK之间的转换问题。

为什么需要Discovery客户端?#

// 初始化RESTMapper
func InitRestMapper(clientSet *kubernetes.Clientset) meta.RESTMapper {
// 这里看起来传入的是Clientset,但实际使用的是其Discovery()方法
// clientSet.Discovery() 返回 discovery.DiscoveryInterface
// 原因:Clientset内部包含了Discovery客户端
// GetAPIGroupResources使用Discovery客户端来:
// 1. 调用 /api 和 /apis 端点
// 2. 获取所有API组的信息
// 3. 获取每个组支持的资源类型
// 4. 构建资源映射关系
gr, err := restmapper.GetAPIGroupResources(clientSet.Discovery())
if err != nil {
panic(err)
}
// 创建映射器,内部构建了多个映射表:
// - Resource -> Kind (pods -> Pod)
// - Kind -> Resource (Service -> services)
// - 资源的作用域信息(命名空间级/集群级)
// - 资源的首选版本信息
mapper := restmapper.NewDiscoveryRESTMapper(gr)
return mapper
}

RESTMapper的工作流程#

// RESTMapper简洁工作流程:
// 1. 初始化阶段(启动时执行一次)
// Discovery客户端 -> 获取所有API信息 -> 构建映射表
//
// 2. 运行时映射(处理用户请求)
// 用户输入 -> 解析 -> 查询映射表 -> 返回完整资源信息
func MappingWorkflow() {
// 步骤1: 用户输入各种格式
userInputs := []string{
"pod", // 简单资源名
"pods", // 复数形式
"deployment", // 简写
"deploy", // 更短的简写
"pods.v1", // 带版本
"deployments.apps", // 带组名
}
// 步骤2: RESTMapper处理流程
// parseInput -> 解析输入格式
// lookupResource -> 查找资源定义
// resolveVersion -> 确定最佳API版本
// buildMapping -> 构建完整映射信息
// 步骤3: 返回RESTMapping对象,包含:
// - GVR: 用于API调用
// - GVK: 资源类型信息
// - Scope: 命名空间级还是集群级
// - 其他元信息
}

智能映射函数实现#

// 智能映射函数 - 处理各种用户输入
func MappingFor(userInput string, restMapper meta.RESTMapper) (*meta.RESTMapping, error) {
// 尝试解析为Resource(资源名称)
// ParseResourceArg处理格式:pods、pods.v1、deployments.apps.v1
fullySpecifiedGVR, groupResource := schema.ParseResourceArg(userInput)
// 尝试解析为Kind(种类名称)
// ParseKindArg处理格式:Pod、Pod.v1、Deployment.apps
fullySpecifiedGVK, groupKind := schema.ParseKindArg(userInput)
var mapping *meta.RESTMapping
var err error
// 策略1:如果是完整的GVR,先转换为GVK,再获取映射
if fullySpecifiedGVR != nil {
gvk, _ := restMapper.KindFor(*fullySpecifiedGVR)
if !gvk.Empty() {
mapping, err = restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
}
}
// 策略2:如果是GVK格式,直接获取映射
if mapping == nil && fullySpecifiedGVK != nil {
mapping, err = restMapper.RESTMapping(
fullySpecifiedGVK.GroupKind(),
fullySpecifiedGVK.Version,
)
}
// 策略3:只有组和种类,使用首选版本
if mapping == nil && !groupKind.Empty() {
mapping, err = restMapper.RESTMapping(groupKind)
}
return mapping, err
}

RESTMapper的核心价值

// 用户输入 -> K8s资源
"pod" -> GVR{Group:"", Version:"v1", Resource:"pods"}
"deploy" -> GVR{Group:"apps", Version:"v1", Resource:"deployments"}
"svc" -> GVR{Group:"", Version:"v1", Resource:"services"}
"node" -> GVR{Group:"", Version:"v1", Resource:"nodes"} + Scope:Cluster
"ns" -> GVR{Group:"", Version:"v1", Resource:"namespaces"} + Scope:Cluster

2.5 Informer机制 - 高性能的缓存系统#

Informer是K8s客户端的核心组件,它通过本地缓存和事件监听机制,大幅提升性能:

// 创建SharedInformerFactory - 推荐方式
func InitInformerFactory(clientSet *kubernetes.Clientset) informers.SharedInformerFactory {
// 创建工厂,0表示不自动重新同步
factory := informers.NewSharedInformerFactoryWithOptions(
clientSet,
0,
informers.WithNamespace("default"), // 只监听default命名空间
)
// 注册Pod事件处理器
podInformer := factory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("✨ 新Pod创建: %s\n", pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
newPod := newObj.(*v1.Pod)
fmt.Printf("🔄 Pod更新: %s\n", newPod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("🗑️ Pod删除: %s\n", pod.Name)
},
})
// 启动Informer
stopCh := make(chan struct{})
factory.Start(stopCh)
// 等待缓存同步完成
factory.WaitForCacheSync(stopCh)
return factory
}
// 使用Lister从缓存读取数据(超快!)
func ListPodsFromCache(factory informers.SharedInformerFactory) {
// 从本地缓存获取,不会访问API Server
pods, err := factory.Core().V1().Pods().Lister().
Pods("default").
List(labels.Everything())
if err != nil {
log.Fatal(err)
}
fmt.Printf("缓存中有 %d 个Pod\n", len(pods))
}

Informer工作原理

1. List: 首次启动时获取所有资源
2. Watch: 监听资源变化事件
3. Cache: 在本地维护资源副本
4. 事件分发: 将变化通知给所有处理器
优势:
- 减少API调用(从缓存读取)
- 实时感知变化
- 多个组件共享同一份数据

2.6 构建RESTful API - 对外提供服务#

现在让我们将所有组件整合,构建一个完整的API服务:

服务层实现#

package services
import (
"context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
type ResourceService struct {
dynamicClient dynamic.Interface
restMapper meta.RESTMapper
informerFactory informers.SharedInformerFactory
}
// 创建资源 - 支持YAML输入
func (s *ResourceService) CreateResource(
resourceType string, // 如 "deployment", "service"
namespace string,
yamlContent string,
) error {
// 1. 解析YAML
obj := &unstructured.Unstructured{}
_, _, err := scheme.Codecs.UniversalDeserializer().
Decode([]byte(yamlContent), nil, obj)
if err != nil {
return fmt.Errorf("YAML解析失败: %v", err)
}
// 2. 获取资源映射
mapping, err := MappingFor(resourceType, s.restMapper)
if err != nil {
return fmt.Errorf("未知的资源类型: %s", resourceType)
}
// 3. 判断资源作用域
var resourceInterface dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
// 命名空间级资源
resourceInterface = s.dynamicClient.
Resource(mapping.Resource).
Namespace(namespace)
} else {
// 集群级资源
resourceInterface = s.dynamicClient.
Resource(mapping.Resource)
}
// 4. 创建资源
_, err = resourceInterface.Create(
context.Background(),
obj,
metav1.CreateOptions{},
)
return err
}
// 列出资源 - 从缓存读取
func (s *ResourceService) ListResources(
resourceType string,
namespace string,
) ([]unstructured.Unstructured, error) {
// 获取资源映射
mapping, err := MappingFor(resourceType, s.restMapper)
if err != nil {
return nil, err
}
// 从Informer缓存获取
informer, err := s.informerFactory.ForResource(mapping.Resource)
if err != nil {
return nil, err
}
// 列出资源
var items []unstructured.Unstructured
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
objs, _ := informer.Lister().ByNamespace(namespace).List(labels.Everything())
for _, obj := range objs {
items = append(items, *obj.(*unstructured.Unstructured))
}
} else {
objs, _ := informer.Lister().List(labels.Everything())
for _, obj := range objs {
items = append(items, *obj.(*unstructured.Unstructured))
}
}
return items, nil
}
// 删除资源
func (s *ResourceService) DeleteResource(
resourceType string,
namespace string,
name string,
) error {
mapping, err := MappingFor(resourceType, s.restMapper)
if err != nil {
return err
}
var resourceInterface dynamic.ResourceInterface
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
resourceInterface = s.dynamicClient.
Resource(mapping.Resource).
Namespace(namespace)
} else {
resourceInterface = s.dynamicClient.
Resource(mapping.Resource)
}
return resourceInterface.Delete(
context.Background(),
name,
metav1.DeleteOptions{},
)
}

HTTP控制器实现#

package controllers
import (
"github.com/gin-gonic/gin"
"your-project/services"
)
type ResourceController struct {
service *services.ResourceService
}
// POST /:resource - 创建资源
func (c *ResourceController) Create() gin.HandlerFunc {
return func(ctx *gin.Context) {
resourceType := ctx.Param("resource")
namespace := ctx.DefaultQuery("namespace", "default")
var body struct {
Yaml string `json:"yaml" binding:"required"`
}
if err := ctx.ShouldBindJSON(&body); err != nil {
ctx.JSON(400, gin.H{
"error": "请提供有效的YAML内容",
})
return
}
err := c.service.CreateResource(resourceType, namespace, body.Yaml)
if err != nil {
ctx.JSON(500, gin.H{
"error": fmt.Sprintf("创建失败: %v", err),
})
return
}
ctx.JSON(201, gin.H{
"message": fmt.Sprintf("%s 创建成功", resourceType),
})
}
}
// GET /:resource - 列出资源
func (c *ResourceController) List() gin.HandlerFunc {
return func(ctx *gin.Context) {
resourceType := ctx.Param("resource")
namespace := ctx.DefaultQuery("namespace", "default")
items, err := c.service.ListResources(resourceType, namespace)
if err != nil {
ctx.JSON(500, gin.H{
"error": fmt.Sprintf("查询失败: %v", err),
})
return
}
// 简化输出,只返回名称列表
var names []string
for _, item := range items {
names = append(names, item.GetName())
}
ctx.JSON(200, gin.H{
"namespace": namespace,
"resource": resourceType,
"count": len(names),
"items": names,
})
}
}
// DELETE /:resource/:name - 删除资源
func (c *ResourceController) Delete() gin.HandlerFunc {
return func(ctx *gin.Context) {
resourceType := ctx.Param("resource")
name := ctx.Param("name")
namespace := ctx.DefaultQuery("namespace", "default")
err := c.service.DeleteResource(resourceType, namespace, name)
if err != nil {
ctx.JSON(500, gin.H{
"error": fmt.Sprintf("删除失败: %v", err),
})
return
}
ctx.JSON(200, gin.H{
"message": fmt.Sprintf("%s/%s 删除成功", resourceType, name),
})
}
}

主程序 - 启动服务#

package main
import (
"log"
"github.com/gin-gonic/gin"
"k8s.io/client-go/dynamic"
"your-project/config"
"your-project/services"
"your-project/controllers"
)
func main() {
// 1. 初始化K8s配置
log.Println("正在初始化K8s配置...")
k8sConfig := config.NewK8sConfig().InitRestConfig()
if k8sConfig.Error() != nil {
log.Fatal("配置加载失败:", k8sConfig.Error())
}
// 2. 创建客户端
clientSet := k8sConfig.InitClientSet()
dynamicClient, err := dynamic.NewForConfig(k8sConfig.Config)
if err != nil {
log.Fatal("创建动态客户端失败:", err)
}
// 3. 初始化RESTMapper
log.Println("正在初始化资源映射器...")
restMapper := InitRestMapper(clientSet)
// 4. 初始化Informer
log.Println("正在初始化缓存系统...")
informerFactory := InitInformerFactory(clientSet)
// 5. 创建服务层
resourceService := &services.ResourceService{
DynamicClient: dynamicClient,
RestMapper: restMapper,
InformerFactory: informerFactory,
}
// 6. 创建控制器
resourceController := &controllers.ResourceController{
Service: resourceService,
}
// 7. 设置路由
router := gin.Default()
// 资源操作路由
router.POST("/:resource", resourceController.Create())
router.GET("/:resource", resourceController.List())
router.DELETE("/:resource/:name", resourceController.Delete())
// 健康检查
router.GET("/health", func(c *gin.Context) {
c.JSON(200, gin.H{"status": "healthy"})
})
// 8. 启动服务
log.Println("K8s Agent服务启动在 http://localhost:8080")
if err := router.Run(":8080"); err != nil {
log.Fatal("服务启动失败:", err)
}
}

三、实战演示#

3.1 API使用示例#

启动服务后,你可以通过以下方式使用API:

Terminal window
# 1. 创建一个Nginx Pod
curl -X POST http://localhost:8080/pods?namespace=default \
-H "Content-Type: application/json" \
-d '{
"yaml": "apiVersion: v1\nkind: Pod\nmetadata:\n name: nginx-demo\nspec:\n containers:\n - name: nginx\n image: nginx:latest\n ports:\n - containerPort: 80"
}'
# 响应: {"message": "pods 创建成功"}
# 2. 列出所有Pods
curl http://localhost:8080/pods?namespace=default
# 响应:
# {
# "namespace": "default",
# "resource": "pods",
# "count": 3,
# "items": ["nginx-demo", "app-1", "app-2"]
# }
# 3. 创建一个Deployment
curl -X POST http://localhost:8080/deployments \
-H "Content-Type: application/json" \
-d '{
"yaml": "apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: nginx-deployment\nspec:\n replicas: 3\n selector:\n matchLabels:\n app: nginx\n template:\n metadata:\n labels:\n app: nginx\n spec:\n containers:\n - name: nginx\n image: nginx:1.14.2"
}'
# 4. 删除资源
curl -X DELETE http://localhost:8080/pods/nginx-demo?namespace=default
# 响应: {"message": "pods/nginx-demo 删除成功"}
# 5. 操作不同类型的资源
curl http://localhost:8080/services # 列出服务
curl http://localhost:8080/deployments # 列出部署
curl http://localhost:8080/nodes # 列出节点(集群级资源)
curl http://localhost:8080/namespaces # 列出命名空间

3.2 观察Informer事件#

当你创建、修改或删除资源时,控制台会实时显示事件:

✨ 新Pod创建: nginx-demo
🔄 Pod更新: nginx-demo (状态: Pending -> Running)
🗑️ Pod删除: nginx-demo

四、集成自然语言处理#

现在我们有了完整的K8s操作API,下一步是添加自然语言理解:

// NLP处理示例(伪代码)
type NLPProcessor struct {
resourceService *services.ResourceService
}
func (p *NLPProcessor) ProcessCommand(input string) string {
// 意图识别
intent := p.detectIntent(input)
// 实体提取
entities := p.extractEntities(input)
// 执行操作
switch intent {
case "CREATE":
// "创建3个nginx的副本"
resourceType := entities["resource"] // deployment
replicas := entities["count"] // 3
image := entities["image"] // nginx
yaml := p.generateYAML(resourceType, replicas, image)
err := p.resourceService.CreateResource(
resourceType,
"default",
yaml,
)
if err != nil {
return fmt.Sprintf("创建失败: %v", err)
}
return "已成功创建部署"
case "LIST":
// "显示所有运行的服务"
items, _ := p.resourceService.ListResources(
entities["resource"],
entities["namespace"],
)
return fmt.Sprintf("找到 %d 个资源", len(items))
case "DELETE":
// "删除测试环境的mysql"
err := p.resourceService.DeleteResource(
entities["resource"],
entities["namespace"],
entities["name"],
)
if err != nil {
return fmt.Sprintf("删除失败: %v", err)
}
return "删除成功"
}
return "抱歉,我不理解这个命令"
}

参考资源#

Agent + K8s 初步开发
https://fuwari.vercel.app/posts/agent_k8s_initial_development/
作者
Jarrett
发布于
2025-09-04
许可协议
CC BY-NC-SA 4.0