Agent Loop:一个被现实逼出来的设计
Agent Loop:一个被现实逼出来的设计
大多数技术设计文档告诉你"应该怎么做"。这篇文章想说的是"为什么会变成这样"。
我第一次写 agent loop 的时候,大概是这样的:
async function agentLoop(userMessage: string): Promise<string> {
const messages = [{ role: "user", content: userMessage }];
while (true) {
const response = await callLLM(messages);
const toolCalls = extractToolCalls(response);
if (toolCalls.length === 0) return extractText(response);
for (const call of toolCalls) {
messages.push({ role: "tool", content: await executeTool(call) });
}
}
}五十行,跑起来了。
然后用户开始用,然后开始抱怨,然后代码开始膨胀,然后我开始理解为什么现实中的 agent loop 长那个样子。
这篇文章就是这个过程的复盘:从最笨的 V0 开始,每一步被一个具体的痛点推着走,直到你理解那些复杂度存在的理由。
V0:能跑就行
async function agentLoop(userMessage: string): Promise<string> {
const messages = [{ role: "user", content: userMessage }];
while (true) {
const response = await callLLM(messages);
messages.push({ role: "assistant", content: response.content });
const toolCalls = extractToolCalls(response);
if (toolCalls.length === 0) {
return extractText(response);
}
for (const call of toolCalls) {
const result = await executeTool(call);
messages.push({ role: "tool", toolCallId: call.id, content: result });
}
}
}这个版本的心智模型很简单:
用户输入 → [LLM → 工具 → LLM → 工具 → ...] → 最终文字输出中括号里的一切都是黑盒。调用方塞进去一个字符串,等一会儿,拿出来另一个字符串。
这在脚本里完全够用。npm run summarize -- --url https://... 这种 CLI 工具,用户等几秒拿结果,体验可以接受。
但我很快发现,这个接口形状有一个根本性的问题——它假设"过程"不重要,只有结果重要。
第一个真实的痛
我把 agent 接入了一个 Web UI,问题立刻出现:
用户看着一个白屏等了 40 秒,然后页面上突然出现了五段文字。
这不是性能问题。40 秒的等待是合理的——agent 在调用三个不同的 API,LLM 在做复杂的推理。问题是用户不知道正在发生什么。它在想吗?卡住了吗?还是根本没有发送请求?
更具体的需求冒出来了:
- 当 AI 开始打字,立刻在页面上流式渲染——不能等整段话说完
- 当工具在执行,显示"正在搜索 Wikipedia..."之类的状态
- 当一个工具完成,立刻显示进度,而不是等所有工具都完成
V0 的 Promise<string> 接口满足不了这些。要满足这些,过程必须可以被观察。
V1:让外界能"看见"——EventStream
最直觉的修法是加个回调:
async function agentLoop(
userMessage: string,
onEvent: (event: AgentEvent) => void
): Promise<string>这能用,但有个隐患:回调是 push 模型。生产方决定什么时候推,消费方被动接收。如果你想在"收到某个事件后,做完一件事,再让 loop 继续",回调做不到——你没有办法在回调里暂停 loop。
所以更好的设计是 EventStream,一个可以被 for await 的异步迭代器:
function agentLoop(
prompts: AgentMessage[],
context: AgentContext,
config: AgentLoopConfig,
): EventStream<AgentEvent, AgentMessage[]> {
const stream = new EventStream<AgentEvent, AgentMessage[]>(...);
// loop 在后台异步跑,不阻塞返回
void runAgentLoop(..., async (event) => {
stream.push(event); // 产生事件
}).then((messages) => {
stream.end(messages); // loop 结束,携带最终结果
});
return stream; // 立刻返回
}调用方可以这样消费:
const stream = agentLoop(prompts, context, config);
for await (const event of stream) {
switch (event.type) {
case "message_update":
ui.streamText(event.message);
break;
case "tool_execution_start":
ui.showSpinner(`正在执行 ${event.toolName}...`);
break;
case "tool_execution_end":
ui.hideSpinner(event.toolCallId);
break;
}
}
const finalMessages = await stream.result();第一个 token 出现,UI 立刻开始渲染。工具开始执行,状态立刻更新。过程从黑盒变成了透明的。
这里有一个值得停下来想的设计决定:为什么不直接返回 AsyncIterator,而是包一个 EventStream?
因为 EventStream 携带两种信息:流式的事件序列,和一个最终结果(AgentMessage[]——这次 loop 新增的所有消息)。这两种信息的消费方式不同,放在同一个对象里,接口更干净。普通的 AsyncIterator 只有流,没有"最终结果"的概念。
V1 的本质:把函数从"同步黑盒"变成"实时可观察的过程"。接口形状的选择比实现细节更重要,而且要尽早做——这是改动成本最高的决策。
第二个真实的痛
V1 跑了一段时间,新的抱怨出现了:有些 agent 任务慢得离谱。
追了一下发现:AI 经常一次说"我需要查三个数据源",但执行是顺序的——
查 数据源A (3秒)
↓
查 数据源B (2秒)
↓
查 数据源C (4秒)
↓
合计:9秒这三个查询之间没有依赖关系,完全可以并行:
查 数据源A (3秒)
查 数据源B (2秒) ← 同时
查 数据源C (4秒)
↓
合计:4秒(最慢的那个)V2:并行工具执行——但顺序不能乱
并行执行听起来简单,加个 Promise.all 就好了:
// 天真版本
const results = await Promise.all(toolCalls.map(call => executeTool(call)));但马上撞到一个 LLM 的约束:toolResult 消息必须和 toolCall 的顺序严格对应。
LLM API 会校验:如果 AI 说了 [call_A, call_B, call_C],你返回的 toolResult 序列必须对应 [result_A, result_B, result_C],不能因为 B 先完成就把它排在第一个。
Promise.all 本身就是保顺序的——它返回的数组顺序和输入数组顺序一致,不管哪个 Promise 先 resolve。所以这个约束其实自动满足了。
但还有另一个复杂度:有些工具根本不能并行。
想象这样的指令:"先把内容写入文件,然后读取这个文件做处理。"如果并行执行,读文件可能在写文件完成前就开始——结果要么是空文件,要么是竞争条件。
所以需要一个标记,让特定工具强制顺序执行:
// 工具定义时声明
const writeFileTool: AgentTool = {
name: "write_file",
executionMode: "sequential", // 我必须顺序执行
execute: async (id, args) => { ... }
};// loop 里检查
const hasSequentialToolCall = toolCalls.some(
(tc) => currentContext.tools?.find((t) => t.name === tc.name)
?.executionMode === "sequential",
);
if (config.toolExecution === "sequential" || hasSequentialToolCall) {
return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);并行版本还有一个微妙的实现细节,值得单独说:
// 并行执行的实际做法
const pendingCalls: (FinalizedToolCallOutcome | (() => Promise<FinalizedToolCallOutcome>))[] = [];
for (const toolCall of toolCalls) {
const preparation = await prepareToolCall(toolCall, ...);
if (preparation.kind === "immediate") {
// 准备阶段就能出结果的(比如 tool not found):直接是值
pendingCalls.push({ toolCall, result: preparation.result, isError: true });
continue;
}
// 需要真正执行的:变成一个函数(闭包),但不立刻调用
pendingCalls.push(async () => {
const executed = await executePreparedToolCall(preparation, signal, emit);
return finalizeExecutedToolCall(preparation, executed, ...);
});
}
// 一起启动,Promise.all 保顺序
const finalizedCalls = await Promise.all(
pendingCalls.map(entry =>
typeof entry === "function" ? entry() : Promise.resolve(entry)
)
);这里有个巧妙的地方:prepareToolCall(查找工具、验证参数、调 beforeToolCall 钩子)是顺序执行的——因为 beforeToolCall 可能有副作用,顺序执行语义更清晰。真正耗时的 executePreparedToolCall 才是并发的。
V2 的本质:在"结果顺序正确"的约束下榨出并行性能。"并发"和"无序"是两件不同的事。
第三个真实的痛
V2 发布以后,开始有不同团队把 agent loop 用在完全不同的场景里:
- A 团队:需要在某些工具调用前做安全审计,高风险操作要记录日志
- B 团队:有成本预算,如果当前 turn 已经花了太多 token,要自动切换成便宜的模型
- C 团队:上下文会变得很长,需要在发给 LLM 前先压缩历史消息
- D 团队:某些工具的结果需要做二次处理才能给 LLM 看
每个团队都想改 loop,但他们想改的地方不一样。如果每个团队都在 loop 内部加 if-else,很快这个文件就会变成一个谁也不敢动的怪物。
V3:钩子系统——控制权回归业务层
解法不是把所有场景的逻辑写进 loop,而是在关键节点暴露拦截点,让业务逻辑自己决定要干什么。
interface AgentLoopConfig {
// 工具执行前的拦截
beforeToolCall?: (context: BeforeToolCallContext, signal?: AbortSignal) => Promise<{
block?: boolean;
reason?: string;
} | void>;
// 工具执行后的处理
afterToolCall?: (context: AfterToolCallContext, signal?: AbortSignal) => Promise<{
content?: ToolResultContent[];
isError?: boolean;
terminate?: boolean;
} | void>;
// 每轮结束后,可以修改下一轮的配置
prepareNextTurn?: (context: NextTurnContext) => Promise<{
model?: Model;
thinkingLevel?: ThinkingLevel;
context?: AgentContext;
} | void>;
// 自定义停止条件
shouldStopAfterTurn?: (context: NextTurnContext) => Promise<boolean>;
// 发给 LLM 前,对消息列表做变换(压缩、过滤等)
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
}各个团队现在可以这样用,完全不碰 loop 本身:
// A 团队:安全审计
const securityConfig: Partial<AgentLoopConfig> = {
beforeToolCall: async ({ toolCall, args }) => {
if (isHighRisk(toolCall.name)) {
await auditLog.record({ toolCall, args, timestamp: Date.now() });
if (requiresApproval(toolCall.name)) {
return { block: true, reason: "此操作需要人工审批" };
}
}
},
};
// B 团队:成本控制
const costConfig: Partial<AgentLoopConfig> = {
prepareNextTurn: async ({ context, newMessages }) => {
const tokenCount = estimateTokens(context.messages);
if (tokenCount > BUDGET_THRESHOLD) {
return { model: CHEAP_MODEL };
}
},
};
// C 团队:上下文压缩
const compressionConfig: Partial<AgentLoopConfig> = {
transformContext: async (messages) => {
if (messages.length > 50) {
return await compressHistory(messages);
}
return messages;
},
};
// D 团队:工具结果处理
const processingConfig: Partial<AgentLoopConfig> = {
afterToolCall: async ({ toolCall, result }) => {
if (toolCall.name === "web_search") {
return {
content: [{ type: "text", text: extractRelevantParts(result) }],
};
}
},
};这里有一个设计哲学值得说透:钩子的参数应该是只读的,钩子的作用是"返回指令",而不是"直接修改状态"。
比如 beforeToolCall 返回 { block: true },而不是直接 toolCall.cancelled = true。afterToolCall 返回新的 content,而不是直接修改 result 对象。
这样做的好处:loop 始终是状态的单一所有者,钩子只是"顾问",不是"执行者"。调试时你只需要看 loop 内部的状态转换,不需要追踪是哪个钩子在什么时机改了什么字段。
V3 的本质:控制反转。loop 从"干所有事情"变成"提供扩展点"。复杂度不消失,但被隔离到了各自的业务模块里。
第四个真实的痛
V3 跑得很好,直到有人问了一个问题:
"用户在 agent 跑任务的时候,可以跟它说话吗?"
比如:AI 正在帮用户研究一个技术方案,执行了三个工具调用,还没说完。用户看到中间结果,想说"等等,那个方向不对,重点放在性能而不是功能"。
在 V3 里,这做不到。只有两个选项:
- 等 agent 跑完再说:等待时间可能很长,而且结果出来用户发现方向完全不对,白等了
- 中断 agent 重新开始:状态全丢了,之前收集的信息也没了
两个选项都很差。
V4:持续对话——Steering 和 Follow-up
真正需要的是在 agent 运行过程中插入消息,而且要区分两种不同的时机:
Steering(引导):AI 正在思考或执行工具,用户这时说话。这条消息要在AI 下次开口之前注入,让 AI 看到最新的用户意图。
Follow-up(跟进):AI 刚说完,准备退出了,但用户立刻有新消息。这条消息要让 agent 不退出,而是继续跑新的 turn。
Steering 的时序:
用户消息① → [AI 执行中] → 用户消息②(steering) → AI 说话(同时看到①和②) → 工具 → ...
Follow-up 的时序:
用户消息① → AI 说话 → 没有工具调用 → agent 准备退出 → 用户消息②(follow-up) → agent 继续 → AI 说话 → ...实现上,这对应了双层循环:
async function runLoop(...): Promise<void> {
let pendingMessages = await config.getSteeringMessages?.() || [];
// 外层循环:处理 follow-up(用户在 agent 停止后的新消息)
while (true) {
let hasMoreToolCalls = true;
// 内层循环:处理工具调用和 steering 消息
while (hasMoreToolCalls || pendingMessages.length > 0) {
// 在 AI 说话前,注入 steering 消息
if (pendingMessages.length > 0) {
for (const message of pendingMessages) {
currentContext.messages.push(message);
await emit({ type: "message_start", message });
await emit({ type: "message_end", message });
}
pendingMessages = [];
}
// AI 说话
const message = await streamAssistantResponse(currentContext, config, ...);
// 执行工具调用
if (toolCalls.length > 0) {
const batch = await executeToolCalls(...);
hasMoreToolCalls = !batch.terminate;
} else {
hasMoreToolCalls = false;
}
// 内层结束前检查新的 steering 消息
pendingMessages = await config.getSteeringMessages?.() || [];
}
// agent 要停了——但先检查有没有 follow-up
const followUpMessages = await config.getFollowUpMessages?.() || [];
if (followUpMessages.length > 0) {
pendingMessages = followUpMessages; // 当成 pending 继续跑
continue;
}
break; // 真的没有了,退出
}
}这里有一个容易忽略的细节:getSteeringMessages 在每次内层循环开始时都会调用——这意味着调用方需要维护一个"待注入消息队列",agent loop 轮询这个队列。这是一个 pull 模式,而不是 push 模式。
为什么不用 push(让用户直接往 context 里塞消息)?因为 push 模式有竞争条件:如果用户在 AI 正在说话的中间往 context 里插消息,消息顺序可能变乱。Pull 模式让 loop 自己决定"什么时候拿新消息",时机总是明确的(turn 开始前),不存在并发问题。
V4 的本质:把 agent 从"批处理任务"变成"持续对话"。用户和 agent 之间的交互边界从"提交-等待-拿结果"变成了真正的实时协作。
把所有复杂度放在一起看
现在回头看最终的代码,每一层复杂度都有来处:
| 代码里的东西 | 是为了解决什么 |
|---|---|
EventStream 而非 Promise<string> | UI 需要流式渲染,过程需要可观察 |
executeToolCallsParallel | 多工具顺序执行太慢 |
executionMode: "sequential" | 有依赖关系的工具不能并发 |
Promise.all 保顺序 | LLM API 要求 toolResult 顺序对应 toolCall |
beforeToolCall / afterToolCall | 不同业务需要在工具前后干预,但不能改 loop |
prepareNextTurn | 不同 turn 可能需要不同模型/配置 |
transformContext | 上下文太长需要压缩,但不影响原始消息记录 |
getSteeringMessages | 用户需要在 agent 执行中途引导方向 |
getFollowUpMessages | 用户需要在 agent 停止后继续对话 |
双层 while 循环 | steering 和 follow-up 的时机语义不同 |
prepareToolCall 顺序执行 | beforeToolCall 有副作用,顺序语义更安全 |
没有一行代码是为了"看起来高级"加进去的。
什么时候你不需要这些
读这篇文章不是为了照抄这个设计。很多复杂度是有条件的:
如果你在写 CLI 脚本:V0 就够了。黑盒函数,等待结果,用户不在乎中间状态。
如果你的 agent 只有一个工具:并行执行这块可以删掉,executeToolCallsSequential 直接用,不需要 parallel 版本。
如果你只有一个使用场景:钩子系统可以不要。把业务逻辑直接写进 loop,代码更直白,也更好追踪。
如果用户不需要实时干预:Steering 和 Follow-up 可以不要,去掉双层循环,代码少一半。
一个判断原则:如果你在加一个复杂度,但说不出它解决了哪个用户的哪个具体抱怨,那它可能是过度设计。
最后一件事
这篇文章里有一个隐含的教训,比任何具体的设计决策都重要:
接口形状的选择要尽早做,其余的可以推迟。
V0 → V1 是最贵的改动——Promise<string> 变成 EventStream,上游所有调用方都要跟着改。如果这件事做晚了,你要么重构整个调用链,要么带着技术债继续跑。
V1 → V4 的所有改动都是"加法":新的参数、新的配置项、新的执行路径。它们不改变已有接口,已有调用方不需要任何修改。
所以真正需要在 V0 阶段想清楚的,只有一个问题:
这个函数的调用方,需要在过程中"看到"什么吗?
如果答案是"需要",从第一天就用 EventStream,哪怕你暂时只有两个事件类型。
如果答案是"不需要",Promise<string> 完全够用,其余的复杂度等你真的被痛到了再加。
被现实逼出来的设计,往往是最好的设计。
本文基于对 pi-ai agent loop 实现的分析与重构思考。代码片段经过简化,仅保留说明关键设计点所需的部分。