diff --git a/docs/5-tool-system.md b/docs/5-tool-system.md index 26e170a..20bb3a8 100644 --- a/docs/5-tool-system.md +++ b/docs/5-tool-system.md @@ -85,10 +85,22 @@ pub use registry::{ToolEntry, ToolInvocation, ToolRegistry}; use async_trait::async_trait; use serde_json::Value; +use tokio_util::sync::CancellationToken; use crate::tools::error::ToolError; use crate::tools::permission::Permission; +/// 工具执行上下文 —— 携带每次执行的运行时信息。 +/// 新增字段时提供默认值,不破坏已有工具实现。 +pub struct ToolContext<'a> { + /// 当前对话/会话 ID,用于关联性追踪。 + pub session_id: &'a str, + /// 链路追踪 ID,用于跨工具调用的耗时分布。 + pub trace_id: &'a str, + /// 取消令牌,用于优雅取消正在执行的工具。 + pub cancellation_token: CancellationToken, +} + /// 工具抽象接口 —— 所有工具(自定义或 MCP)最终都实现此 trait。 #[async_trait] pub trait BaseTool: Send + Sync { @@ -107,7 +119,8 @@ pub trait BaseTool: Send + Sync { } /// 执行工具调用。 - async fn execute(&self, args: Value) -> Result; + /// `ctx` 携带执行上下文(session_id、trace_id 等),Phase 3/4 可扩展字段而不破坏 trait 签名。 + async fn execute(&self, args: Value, ctx: &ToolContext<'_>) -> Result; } ``` @@ -115,7 +128,8 @@ pub trait BaseTool: Send + Sync { - `name()` 返回 `&str` 而非 `String`,避免每次调用克隆 - `parameters()` 返回 `serde_json::Value`,与现有 `OpenaiToolDefinition.parameters` 类型一致 - `required_permissions()` 提供默认空实现,简化无敏感操作的工具定义 -- `execute()` 接收 `Value`(JSON 对象)作为参数,返回 `Value` 作为结果,与 OpenAI API 的 arguments/output 格式一致 +- `execute()` 接收 `Value`(JSON 对象)+ `ToolContext` 作为参数,返回 `Value` 作为结果,与 OpenAI API 的 arguments/output 格式一致 +- `ToolContext` 从 Phase 2 即注入 `execute()` 签名,防止后续 breaking change;新增字段用 `Option` 包裹或提供默认值 ### 2. ToolRegistry — 工具注册表 @@ -308,6 +322,15 @@ pub enum McpTransport { }, } +/// MCP 子进程运行时状态(connect() 后创建)。 +struct ChildProcessState { + child: tokio::process::Child, + stdin: tokio::io::BufWriter, + /// 等待响应的请求映射(id → oneshot sender)。 + pending: HashMap>>, + next_id: u64, +} + /// MCP 客户端 —— 与 MCP 服务器通信。 pub struct McpClient { transport: McpTransport, @@ -318,6 +341,8 @@ pub struct McpClient { initialized: AtomicBool, /// 超时时间(秒)。 timeout_secs: u64, + /// 子进程运行时状态(connect() 后创建,close() 后取回)。 + process: Option>, } /// MCP 服务器暴露的工具(缓存结构)。 @@ -335,15 +360,19 @@ impl McpClient { pub fn with_timeout(mut self, secs: u64) -> Self; /// 连接并初始化(发送 initialize 请求,获取服务器能力声明)。 + /// 启动子进程,创建 ChildProcessState(含 reader task)。 pub async fn connect(&mut self) -> Result<(), ToolError>; /// 列出服务器支持的工具(调用 tools/list)。 pub async fn list_tools(&mut self) -> Result, ToolError>; /// 调用一个工具(调用 tools/call)。 + /// 通过 Mutex 获取 stdin 写入权限,发送 JSON-RPC 请求,通过 id 匹配响应。 + /// reader task 持续读取 stdout,解析 JSON-RPC 响应,通过 oneshot 通知调用方。 pub async fn call_tool(&self, name: &str, args: Value) -> Result; /// 关闭连接(终止子进程)。 + /// 发送 shutdown → 等待 5s 优雅退出 → 超时则 child.kill()。 pub async fn close(&mut self) -> Result<(), ToolError>; /// 将 MCP 客户端转换为 BaseTool 适配器列表(用于注册到 ToolRegistry)。 @@ -447,18 +476,22 @@ impl LlmCycle { /// 流程: /// 1. 发送请求(含工具定义) /// 2. 检查响应中的 finish_reason - /// 3. 如果是 ToolCalls → 执行工具 → 回传结果 → 重复 1 - /// 4. 如果是 Stop/Length → 返回最终响应 + /// 3. 如果是 ToolCalls → 先 push Assistant 消息 → 执行工具 → 回传结果 → 重复 1 + /// 4. 如果是 Stop/Length → push Assistant 消息 → 返回最终响应 + /// + /// 注意:OpenAI API 要求 tool 消息必须紧跟在对应的 Assistant(tool_calls)消息之后。 + /// 因此 push 工具结果前必须先 push Assistant 响应,否则 API 拒绝请求。 pub async fn submit_with_tools( &mut self, prompt: String, registry: &ToolRegistry, ) -> Result { let tools = registry.definitions(); - let max_turns = self.config.max_turns.unwrap_or(10); // 注:CycleConfig.max_turns 默认值为 None,实现时需修改 Default 为 Some(10) + let max_turns = self.config.max_tool_turns.unwrap_or(10); let mut turn = 0; self.messages.push(OpenaiChatMessage::user_text(prompt)); + self.maybe_compact(); loop { turn += 1; @@ -475,9 +508,12 @@ impl LlmCycle { // 检查是否需要执行工具 let should_execute = matches!( response.stop_reason, - Some(FinishReason::ToolCalls) | None + Some(FinishReason::ToolCalls) ) && has_tool_calls(&response.message); + // 将 Assistant 响应(含 tool_calls 或最终文本)追加到消息历史 + self.messages.push(response.message.clone()); + if !should_execute { return Ok(response); } @@ -490,7 +526,10 @@ impl LlmCycle { for result in results { let content = match &result.output { Ok(value) => serde_json::to_string(value) - .unwrap_or_else(|_| "{}".to_string()), + .unwrap_or_else(|e| { + tracing::warn!("工具结果序列化失败: {}", e); + "{}".to_string() + }), Err(e) => format!("错误: {}", e), }; @@ -498,6 +537,21 @@ impl LlmCycle { OpenaiChatMessage::tool_result(result.tool_name.clone(), content) ); } + + // 每轮工具执行后触发 compaction,防止 token 快速膨胀 + self.maybe_compact(); + } + } + + /// 在接近上下文窗口时压缩历史消息。 + fn maybe_compact(&mut self) { + if let Some(ref config) = self.compact_config + && should_compact(&self.messages, config, &self.compact_state) + { + let freed = microcompact(&mut self.messages, config.keep_recent); + if freed > 0 { + self.compact_state.record_success(); + } } } @@ -516,7 +570,7 @@ impl LlmCycle { | 决策 | 选择 | 理由 | |------|------|------| | 循环方式 | 同步循环(单线程串行) | 工具执行依赖前一轮结果,串行更安全 | -| 最大轮次 | `CycleConfig.max_turns`,默认 `Some(10)` | 防止无限循环(LLM 反复调用工具)。**注意**:当前 `CycleConfig` 默认值为 `None`,实现时需将 `Default` 改为 `Some(10)` | +| 最大轮次 | `CycleConfig.max_tool_turns`,独立于 `max_turns`,默认 `Some(10)` | 防止无限循环(LLM 反复调用工具)。使用独立字段避免影响现有 `submit()`/`submit_messages()` 的 `max_turns` 语义 | | 工具并行 | `invoke_all()` 互不依赖的工具并行 | LLM 可能一次发出多个 tool_calls(parallel_tool_calls) | | 工具超时 | `CycleConfig::tool_timeout_secs`,默认 60 | 防止单个工具长时间阻塞循环。`invoke_all()` 使用 `tokio::time::timeout` 包装 | | 错误处理 | 工具执行错误以文本回传 LLM,而非终止循环 | LLM 可自行从错误中恢复 | @@ -580,6 +634,10 @@ impl LlmCycle { `submit_stream()` 的增强方案:新增 `submit_stream_with_tools()`,在流式事件层面支持自动 tool 循环。 +> **实现复杂度提示**:流式 tool 循环需要自定义 `Stream` 实现 + 内部状态机(`Streaming` → `ExecutingTools` → `Finished`)。每一轮需要:消费当前流 → 收集事件 → 检测 `TurnComplete(ToolCalls)` → 执行工具 → 发射 `ToolExecutionCompleted` → 发起新流 → 继续 yield。不能用简单的 `stream!` 宏实现。 +> +> 建议 Phaes 3 再实现 `submit_stream_with_tools()`,Phase 2 只实现非流式的 `submit_with_tools()`。如果 Phase 2 需要可先返回 "not yet implemented" 错误。 + ```rust impl LlmCycle { pub async fn submit_stream_with_tools( @@ -830,36 +888,46 @@ Phase 4(Agent + Skill + 编排) - 创建 `src/tools/registry.rs` - 定义 `ToolInvocation` 结构体 + `ToolEntry` 元数据包装(tool + tags + category + stats)+ `ToolRegistry` - 实现核心方法:register / get / list / definitions / invoke / invoke_all / find_by_tag / find_by_category -- `invoke_all()` 使用 `futures::future::join_all` 并行执行互不依赖的工具 +- `invoke_all()` 使用 `futures::future::join_all` + `tokio::time::timeout` 并行执行互不依赖的工具(每工具独立超时) - `definitions()` 将 `HashMap` 中的工具转换为 `Vec` +- `ToolRegistry` 不支持运行时并发注册(setup 阶段一次性构建),如需热注册由调用方通过 `Arc>` 包装 - 编写 8+ 测试覆盖:注册冲突、空注册表查找、单次调用、批量并行调用、工具执行失败 - 运行 `cargo test` 验证 ### Step 6: LlmCycle 扩展(自动 Tool 循环) - 新增 `cycle_submit.rs` 子模块(或直接在 `cycle.rs` 中扩增,取决于代码量) -- 提取 `submit_request()` 内部方法(将 submit() 中的 request→response 逻辑独立) +- 提取 `submit_request()` 内部方法(将 submit() 中的 request→response 逻辑独立),同时重构 `submit_messages()` 以复用同一路径 - 实现 `submit_with_tools()` 方法: - - 循环:submit_request → 检查 finish_reason → 调用 registry.invoke_all → 回传结果 - - `max_turns` 控制,达到上限返回错误 - - 工具执行错误以文本回传(LLM 可恢复) -- 实现 `submit_stream_with_tools()` 方法: - - 组合流式事件流和自动 tool 循环 - - 在 TurnComplete(ToolCalls) 后发射 ToolExecutionCompleted -- 更新 `CycleConfig` 文档注释,新增 `tool_timeout_secs` 字段,默认值 60 -- 将 `CycleConfig::max_turns` 默认值由 `None` 改为 `Some(10)` -- 编写 3+ 集成测试:单轮 tool 调用、多轮 tool 调用、达到 max_turns 终止 + - 循环:submit_request → push Assistant 消息 → 检查 finish_reason → 调用 registry.invoke_all → push tool_results → 重复 + - 在 push tool_results **之前**先 push Assistant(tool_calls)消息(OpenAI API 要求) + - `max_tool_turns` 控制(独立于 `max_turns`),达到上限返回错误 + - 不可恢复的错误(NotFound、PermissionDenied、McpError)终止循环 + - 可恢复的错误(ExecutionFailed、InvalidArguments)以文本回传 LLM + - 每轮执行后触发 `maybe_compact()` 防止 token 膨胀 +- `submit_stream_with_tools()` 方法: + - Phase 2 标记为未实现(返回 `LlmError::Other("流式 tool 循环将在后续版本中支持")`) + - 实际实现推迟到 Phase 3(需要自定义 `ToolStream` 状态机) +- 更新 `CycleConfig`: + - 新增 `max_tool_turns: Option`,默认 `Some(10)`(不影响 `max_turns` 语义) + - 新增 `tool_timeout_secs: u64`,默认值 60 + - 新增 `max_tool_result_bytes: Option`,默认 `Some(65536)`(限制单次工具结果大小) +- 编写 3+ 集成测试:单轮 tool 调用、多轮 tool 调用、达到 max_tool_turns 终止 - 运行 `cargo test` 验证 ### Step 7: McpClient(MCP 协议客户端) - 创建 `src/tools/mcp.rs` - 实现 JSON-RPC 消息结构(Request / Response / Error / Notification) +- 定义 `ChildProcessState` 结构体,包含运行时字段:`child`/`stdin`/`pending: HashMap`/`next_id: u64` + - reader task 使用 `tokio::select!` 同时监听 stdout 和 cancellation token + - `call_tool()` 通过 Mutex 获取 stdin 写入权限,通过 id 匹配响应 + - 子进程意外退出时通知所有 pending 请求 - 实现 stdio transport: - - `connect()`:启动子进程,发送 initialize 请求 + - `connect()`:启动子进程,创建 ChildProcessState,发送 initialize 请求 - `list_tools()`:调用 tools/list,缓存结果 - `call_tool()`:调用 tools/call,解析响应 - - `close()`:发送 shutdown 请求,终止子进程 + - `close()`:发送 shutdown → 等待 5s 优雅退出 → 超时则 child.kill() - `StreamableHttp` transport 预留枚举变体,当前返回 "not implemented" 错误,不在 Phase 2 实现 - 实现 `into_tools()`:将 MCP 工具转换为 `Vec>` 适配器 - 设置 30 秒默认超时