强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 - AI Agent 架构

第 14 章 · AI Agent 架构设计

AI Agent 能够自主决策、调用工具、记忆上下文并完成复杂任务。本章详解 Agent 设计模式、工具链、记忆系统和多 Agent 协作。


14.1 什么是 AI Agent

AI Agent = LLM(大脑) + Tools(工具) + Memory(记忆) + Planning(规划)

用户目标 → Agent 思考 → 选择工具 → 执行 → 观察结果 → 继续思考 → ... → 完成

Agent vs Chat 对比

特性ChatAgent
输入输出单轮问答多步自主执行
工具使用用户指定自主选择
决策能力被动响应主动规划
记忆上下文窗口短期+长期记忆
适用场景简单对话复杂任务自动化

14.2 基础 Agent 实现

ReAct 模式(Reasoning + Acting)

from openai import OpenAI
import json
from typing import Callable

client = OpenAI()

class ReActAgent:
    """基于 ReAct 模式的 Agent"""

    def __init__(self, model: str = "gpt-4o", tools: dict[str, Callable] = None):
        self.model = model
        self.tools = tools or {}
        self.messages: list[dict] = []
        self.max_iterations = 10

    def register_tool(self, name: str, func: Callable, description: str, parameters: dict):
        """注册工具"""
        self.tools[name] = {
            "function": func,
            "definition": {
                "type": "function",
                "function": {
                    "name": name,
                    "description": description,
                    "parameters": parameters,
                }
            }
        }

    def run(self, task: str, system_prompt: str = "") -> str:
        """执行任务"""
        self.messages = []
        if system_prompt:
            self.messages.append({"role": "system", "content": system_prompt})
        self.messages.append({"role": "user", "content": task})

        for iteration in range(self.max_iterations):
            # 调用 LLM
            response = client.chat.completions.create(
                model=self.model,
                messages=self.messages,
                tools=[t["definition"] for t in self.tools.values()],
                tool_choice="auto",
            )

            message = response.choices[0].message
            self.messages.append(message)

            # 没有工具调用,任务完成
            if not message.tool_calls:
                return message.content

            # 执行工具调用
            for tool_call in message.tool_calls:
                name = tool_call.function.name
                args = json.loads(tool_call.function.arguments)

                print(f"  [工具调用] {name}({json.dumps(args, ensure_ascii=False)[:100]})")

                try:
                    if name in self.tools:
                        result = self.tools[name]["function"](**args)
                        result_str = json.dumps(result, ensure_ascii=False) if not isinstance(result, str) else result
                    else:
                        result_str = json.dumps({"error": f"未知工具: {name}"})
                except Exception as e:
                    result_str = json.dumps({"error": str(e)})

                self.messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result_str[:3000],  # 限制结果长度
                })

        return "达到最大迭代次数,任务未完成。"


# 使用示例
agent = ReActAgent()

# 注册工具
import math
from datetime import datetime

agent.register_tool(
    name="calculate",
    func=lambda expression: {"result": eval(expression, {"__builtins__": {}}, {"math": math})},
    description="执行数学计算",
    parameters={
        "type": "object",
        "properties": {"expression": {"type": "string", "description": "数学表达式"}},
        "required": ["expression"],
    }
)

agent.register_tool(
    name="get_current_time",
    func=lambda: {"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
    description="获取当前时间",
    parameters={"type": "object", "properties": {}},
)

# 运行
result = agent.run(
    task="现在几点了?另外计算一下 2 的 10 次方。",
    system_prompt="你是一个智能助手,可以使用工具来帮助用户。",
)
print(f"\n最终结果: {result}")

14.3 工具链设计

设计原则

原则说明
单一职责每个工具只做一件事
描述清晰description 要让 LLM 准确理解
参数校验工具内部验证输入
错误处理返回错误而非抛异常
超时控制外部调用设置超时

常见工具分类

# 数据查询类
tools_data = [
    {"name": "query_database", "desc": "查询数据库"},
    {"name": "search_web", "desc": "搜索网页"},
    {"name": "get_weather", "desc": "获取天气"},
]

# 操作执行类
tools_action = [
    {"name": "send_email", "desc": "发送邮件"},
    {"name": "create_task", "desc": "创建任务"},
    {"name": "update_record", "desc": "更新记录"},
]

# 信息处理类
tools_process = [
    {"name": "calculate", "desc": "数学计算"},
    {"name": "translate", "desc": "文本翻译"},
    {"name": "summarize", "desc": "文本摘要"},
]

安全工具包装器

import functools
import time

def safe_tool(timeout: int = 10, max_retries: int = 2):
    """工具安全装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries + 1):
                try:
                    start = time.time()
                    result = func(*args, **kwargs)
                    elapsed = time.time() - start
                    if elapsed > timeout:
                        return {"error": "执行超时", "elapsed": elapsed}
                    return result
                except Exception as e:
                    if attempt == max_retries:
                        return {"error": str(e), "retries": attempt}
                    time.sleep(0.5 * (attempt + 1))
        return wrapper
    return decorator

@safe_tool(timeout=5)
def query_database(sql: str) -> dict:
    """安全的数据库查询"""
    # 实际实现...
    return {"rows": [], "count": 0}

14.4 记忆系统

14.4.1 短期记忆(对话历史)

class ShortTermMemory:
    """对话历史管理"""

    def __init__(self, max_tokens: int = 8000):
        self.messages: list[dict] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        self._trim()

    def _trim(self):
        """裁剪历史,保持在 token 限制内"""
        total = sum(len(m["content"]) for m in self.messages)
        while total > self.max_tokens and len(self.messages) > 2:
            removed = self.messages.pop(0)
            total -= len(removed["content"])

    def get_messages(self) -> list[dict]:
        return self.messages.copy()

    def clear(self):
        self.messages.clear()

14.4.2 长期记忆(向量存储)

class LongTermMemory:
    """基于向量的长期记忆"""

    def __init__(self):
        self.client = OpenAI()
        self.memories: list[dict] = []
        self.embeddings: list[list[float]] = []

    def store(self, content: str, metadata: dict = None):
        """存储记忆"""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=content,
        )
        self.memories.append({
            "content": content,
            "metadata": metadata or {},
            "timestamp": datetime.now().isoformat(),
        })
        self.embeddings.append(response.data[0].embedding)

    def recall(self, query: str, top_k: int = 5) -> list[dict]:
        """检索相关记忆"""
        if not self.memories:
            return []

        import numpy as np

        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=query,
        )
        query_vec = np.array(response.data[0].embedding)
        mem_vecs = np.array(self.embeddings)

        # 余弦相似度
        norms = np.linalg.norm(mem_vecs, axis=1)
        similarities = np.dot(mem_vecs, query_vec) / (norms * np.linalg.norm(query_vec))

        top_indices = np.argsort(similarities)[::-1][:top_k]
        return [
            {**self.memories[i], "score": float(similarities[i])}
            for i in top_indices
        ]

14.4.3 工作记忆(任务上下文)

class WorkingMemory:
    """当前任务的工作记忆"""

    def __init__(self):
        self.goal: str = ""
        self.plan: list[str] = []
        self.current_step: int = 0
        self.observations: list[dict] = []
        self.scratchpad: dict = {}  # 临时变量存储

    def set_goal(self, goal: str):
        self.goal = goal
        self.plan = []
        self.current_step = 0
        self.observations = []

    def set_plan(self, steps: list[str]):
        self.plan = steps
        self.current_step = 0

    def add_observation(self, step: str, result: str):
        self.observations.append({
            "step": step,
            "result": result,
            "timestamp": datetime.now().isoformat(),
        })

    def get_context(self) -> str:
        """获取当前任务上下文"""
        ctx = f"目标: {self.goal}\n"
        if self.plan:
            ctx += f"计划: {' → '.join(self.plan)}\n"
            ctx += f"当前步骤: {self.current_step + 1}/{len(self.plan)}\n"
        if self.observations:
            ctx += "已执行:\n"
            for obs in self.observations[-5:]:
                ctx += f"  - {obs['step']}: {obs['result'][:100]}\n"
        return ctx

14.5 完整 Agent 框架

class AIAgent:
    """完整的 AI Agent 实现"""

    def __init__(self, name: str, model: str = "gpt-4o"):
        self.name = name
        self.model = model
        self.client = OpenAI()
        self.tools: dict[str, dict] = {}
        self.short_memory = ShortTermMemory()
        self.long_memory = LongTermMemory()
        self.working_memory = WorkingMemory()
        self.system_prompt = ""

    def set_system_prompt(self, prompt: str):
        self.system_prompt = prompt

    def register_tool(self, name: str, func: Callable, description: str, parameters: dict):
        self.tools[name] = {
            "function": func,
            "definition": {
                "type": "function",
                "function": {
                    "name": name,
                    "description": description,
                    "parameters": parameters,
                }
            }
        }

    def execute(self, task: str) -> str:
        """执行任务"""
        # 1. 从长期记忆召回相关信息
        relevant_memories = self.long_memory.recall(task, top_k=3)
        memory_context = ""
        if relevant_memories:
            memory_context = "\n相关记忆:\n" + "\n".join(
                [f"- {m['content']}" for m in relevant_memories]
            )

        # 2. 构建消息
        messages = []
        if self.system_prompt:
            messages.append({"role": "system", "content": self.system_prompt + memory_context})
        messages.extend(self.short_memory.get_messages())
        messages.append({"role": "user", "content": task})

        # 3. 执行循环
        for _ in range(10):
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=[t["definition"] for t in self.tools.values()],
                tool_choice="auto",
            )

            message = response.choices[0].message
            messages.append(message)

            if not message.tool_calls:
                # 4. 存储到记忆
                self.short_memory.add("user", task)
                self.short_memory.add("assistant", message.content)
                self.long_memory.store(f"任务: {task}\n结果: {message.content}")
                return message.content

            # 执行工具
            for tc in message.tool_calls:
                name = tc.function.name
                args = json.loads(tc.function.arguments)
                try:
                    result = self.tools[name]["function"](**args)
                    result_str = json.dumps(result, ensure_ascii=False) if not isinstance(result, str) else result
                except Exception as e:
                    result_str = json.dumps({"error": str(e)})

                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.id,
                    "content": result_str[:2000],
                })

        return "任务未完成,达到最大迭代次数。"

14.6 多 Agent 协作

协作模式

模式说明适用场景
主从模式一个主 Agent 调度多个子 Agent通用任务分解
对等模式Agent 之间平等协作讨论、辩论
流水线前一个 Agent 的输出作为下一个的输入多阶段处理
辩论模式多个 Agent 对同一问题给出不同观点决策支持

主从模式实现

class OrchestratorAgent:
    """主控 Agent:分解任务并协调子 Agent"""

    def __init__(self, model: str = "gpt-4o"):
        self.client = OpenAI()
        self.model = model
        self.agents: dict[str, AIAgent] = {}

    def register_agent(self, name: str, agent: AIAgent, description: str):
        self.agents[name] = {"agent": agent, "description": description}

    def run(self, task: str) -> str:
        """分解任务并调度执行"""
        # 构建可用 Agent 列表
        agent_list = "\n".join([
            f"- {name}: {info['description']}"
            for name, info in self.agents.items()
        ])

        # 规划任务分解
        plan_response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": f"""你是一个任务规划器。
可用的子 Agent:
{agent_list}

将用户任务分解为子任务,分配给合适的 Agent。
输出 JSON: {{"tasks": [{{"agent": "name", "task": "description"}}]}}"""},
                {"role": "user", "content": task},
            ],
            response_format={"type": "json_object"},
            temperature=0.0,
        )

        import json
        plan = json.loads(plan_response.choices[0].message.content)

        # 执行子任务
        results = []
        for subtask in plan.get("tasks", []):
            agent_name = subtask["agent"]
            agent_task = subtask["task"]

            if agent_name in self.agents:
                print(f"[调度] {agent_name}: {agent_task}")
                result = self.agents[agent_name]["agent"].execute(agent_task)
                results.append({"agent": agent_name, "task": agent_task, "result": result})

        # 汇总结果
        summary_input = "\n\n".join([
            f"Agent [{r['agent']}] 执行: {r['task']}\n结果: {r['result']}"
            for r in results
        ])

        summary = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "汇总各子任务的结果,给出最终回答。"},
                {"role": "user", "content": f"原始任务: {task}\n\n子任务结果:\n{summary_input}"},
            ],
        )

        return summary.choices[0].message.content

14.7 Planning 策略

计划-执行-反思

class PlanAndExecuteAgent:
    """计划-执行-反思 Agent"""

    def plan(self, task: str) -> list[str]:
        """制定执行计划"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "为任务制定详细的执行步骤。输出JSON: {\"steps\": [\"step1\", ...]}"},
                {"role": "user", "content": task},
            ],
            response_format={"type": "json_object"},
        )
        return json.loads(response.choices[0].message.content)["steps"]

    def reflect(self, task: str, steps: list[str], results: list[str]) -> dict:
        """反思执行结果,决定是否需要调整"""
        reflection = "\n".join([f"步骤: {s}\n结果: {r}" for s, r in zip(steps, results)])

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "评估执行结果。输出JSON: {\"complete\": bool, \"next_action\": \"...\"}"},
                {"role": "user", "content": f"任务: {task}\n\n执行记录:\n{reflection}"},
            ],
            response_format={"type": "json_object"},
        )
        return json.loads(response.choices[0].message.content)

14.8 业务场景

场景Agent 类型工具组合
数据分析助手ReActSQL查询 + 图表生成 + 报告撰写
客服自动化主从模式知识库检索 + 工单系统 + 情感分析
内容创作流水线调研 + 写作 + 审核 + 发布
代码审查ReAct代码分析 + Bug检测 + 安全扫描
旅行规划Plan-Execute机票查询 + 酒店查询 + 行程优化

14.9 注意事项

  1. 迭代次数限制:设置 max_iterations 防止死循环
  2. 工具结果截断:过长的工具结果会撑爆上下文
  3. 成本控制:Agent 的多轮调用会消耗大量 token
  4. 安全边界:敏感操作(删除、支付)需要人工确认
  5. 错误恢复:工具失败时 Agent 应能尝试替代方案
  6. 日志记录:记录每一步的思考和工具调用,便于调试
  7. 测试验证:Agent 行为不确定性高,需要充分测试

14.10 扩展阅读


下一章15 - 最佳实践 — 错误处理、成本控制、安全防护、限流。