32f3edaf19
实现 ProviderRegistry、HookExecutor、StreamEvents 和 Auto-compaction 模块,并集成到 LlmCycle 中
11 KiB
11 KiB
Phase 0 剩余模块 — 实施方案
定稿日期:2026-06-02
背景与目标
AG Core Phase 0(Foundation)已完成核心数据类型、错误体系、Provider 接口、OpenAI 实现、生命周期引擎等基础设施。剩余 4 个子项尚未实现:ProviderRegistry、HookExecutor、StreamEvents、Auto-compaction。它们均被 Roadmap 标记为 P0(Must Have),是本阶段不可或缺的底层能力。
目标:完成这 4 个模块的设计与实现,使 Phase 0 全面交付。
需求分析
功能需求
| 模块 | 需求 | 验收条件 |
|---|---|---|
| ProviderRegistry | 支持注册命名 Provider、按名称查找、设置默认 | 3 个公开方法 + 工厂辅助 |
| HookExecutor | 4 个事件点:PreRequest / PostRequest / OnRetry / OnError | Hook trait + HookExecutor 触发 |
| StreamEvents | 流式事件枚举 + Provider 流式方法 + Cycle 流式入口 | 6 种事件类型 + SSE 解析 |
| Auto-compaction | Token 估算 + 微压缩 + 断路器 | 触发后释放 token 且不改变语义 |
非功能需求
- 所有公开 API 必须带
///文档注释 - 无新增
unwrap()调用 - 与现有
LlmCycle集成时保持向后兼容(全为可选/增量添加) - 错误统一使用
LlmError枚举
方案设计
1. ProviderRegistry (src/llm/provider/registry.rs)
职责:管理多个 LLM Provider 实例,支持按名称注册、发现、切换。
// src/llm/provider/registry.rs
use std::collections::HashMap;
use crate::llm::error::LlmError;
use crate::llm::provider::{LlmProvider, ProviderConfig, ProviderType, create_provider};
/// Provider 注册表——管理多个 LLM Provider 实例。
pub struct ProviderRegistry {
providers: HashMap<String, Box<dyn LlmProvider>>,
default_name: Option<String>,
}
impl ProviderRegistry {
pub fn new() -> Self;
/// 注册一个已初始化的 Provider 实例。
pub fn register(&mut self, name: impl Into<String>, provider: Box<dyn LlmProvider>);
/// 通过 ProviderType + ProviderConfig 创建并注册。
pub fn register_with_config(
&mut self,
name: impl Into<String>,
provider_type: ProviderType,
config: ProviderConfig,
) -> Result<(), LlmError>;
/// 设置默认 Provider。
pub fn set_default(&mut self, name: &str) -> Result<(), LlmError>;
/// 按名称查找 Provider。
pub fn get(&self, name: &str) -> Option<&dyn LlmProvider>;
/// 获取默认 Provider。
pub fn get_default(&self) -> Option<&dyn LlmProvider>;
}
无新增依赖。
2. HookExecutor (src/llm/hooks.rs)
职责:在 LLM 调用生命周期的关键节点插入自定义逻辑。
// src/llm/hooks.rs
use async_trait::async_trait;
use crate::llm::error::LlmError;
use crate::llm::types::ChatRequest;
/// 生命周期钩子事件点。
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HookEvent {
/// LLM 请求发起之前(可阻断)。
PreRequest,
/// 成功响应之后。
PostRequest,
/// 重试之前(仅可重试错误时触发)。
OnRetry,
/// 不可恢复错误返回之前。
OnError,
}
/// 此次钩子调用的上下文。
#[derive(Debug, Clone)]
pub struct HookContext<'a> {
pub event: HookEvent,
pub request: Option<&'a ChatRequest>,
pub error: Option<&'a LlmError>,
pub attempt: u32,
}
/// 钩子执行结果。
#[derive(Debug, Clone)]
pub struct HookResult {
/// 是否阻断后续操作(仅 PreRequest 有效)。
pub should_block: bool,
/// 阻断/备注原因。
pub reason: Option<String>,
}
/// 生命周期钩子 trait。
#[async_trait]
pub trait Hook: Send + Sync {
async fn execute(&self, ctx: &HookContext<'_>) -> HookResult;
}
/// 钩子执行器——管理注册与触发。
pub struct HookExecutor {
hooks: Vec<(HookEvent, Box<dyn Hook>)>,
}
impl HookExecutor {
pub fn new() -> Self;
pub fn register(&mut self, event: HookEvent, hook: Box<dyn Hook>);
pub async fn execute(&self, event: HookEvent, ctx: &HookContext<'_>) -> Vec<HookResult>;
}
与 LlmCycle 集成:
LlmCycle新增字段hook_executor: Option<HookExecutor>- 新增 builder 方法
with_hook_executor() submit()中在 4 个点触发(PreRequest 若阻断则提前返回)
无新增依赖(async-trait 已存在)。
3. StreamEvents (src/llm/stream.rs)
职责:提供流式 LLM 调用的事件抽象,将原始 SSE chunk 解析为语义化事件。
// src/llm/stream.rs
use std::pin::Pin;
use tokio_stream::Stream;
use crate::llm::error::LlmError;
use crate::llm::types::{FinishReason, Usage, OpenaiChatChunk, ToolDefinition};
use serde_json::Value;
/// 流式事件——LLM 调用全生命周期的语义化事件。
#[derive(Debug, Clone)]
pub enum StreamEvent {
/// 助手回复文本增量。
AssistantTextDelta { text: String },
/// 工具调用开始。
ToolExecutionStarted { tool_name: String, input: Value },
/// 工具调用完成。
ToolExecutionCompleted { tool_name: String, output: Value, is_error: bool },
/// Token 用量更新。
CostUpdate { usage: Usage },
/// 一轮会话完成。
TurnComplete { reason: FinishReason },
/// 可恢复的错误事件。
Error { message: String },
}
/// 将原始 OpenaiChatChunk 流解析为 StreamEvent 流。
pub fn parse_chunk_stream(
chunks: Pin<Box<dyn Stream<Item = Result<OpenaiChatChunk, LlmError>> + Send>>,
) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
Provider 层扩展(src/llm/provider.rs):
#[async_trait]
pub trait LlmProvider: Send + Sync {
async fn chat(&self, request: ChatRequest) -> Result<ChatResponse, LlmError>;
/// 流式聊天请求——返回原始 SSE chunk 流。
/// 默认实现回退到非流式调用。
async fn chat_stream(
&self,
request: ChatRequest,
) -> Result<
Pin<Box<dyn Stream<Item = Result<OpenaiChatChunk, LlmError>> + Send>>,
LlmError,
> {
let response = self.chat(request).await?;
let chunk = OpenaiChatChunk::from(response);
Ok(Box::pin(tokio_stream::once(Ok(chunk))))
}
}
LlmCycle 扩展:
impl LlmCycle {
/// 提交请求并返回语义事件流。
pub async fn submit_stream(
&mut self,
prompt: String,
tools: Vec<ToolDefinition>,
) -> Result<
Pin<Box<dyn Stream<Item = StreamEvent> + Send>>,
LlmError,
>;
}
新增依赖:
tokio-stream = "0.1"
4. Auto-compaction (src/llm/compact.rs)
职责:在上下文过长时自动压缩历史消息,避免 ContextLength 错误。
// src/llm/compact.rs
use crate::llm::types::{ContentField, Message, OpenaiChatMessage, OpenaiContentPart};
// === 常量 ===
const AUTOCOMPACT_BUFFER_TOKENS: u32 = 13_000;
const RESERVED_OUTPUT_TOKENS: u32 = 20_000;
const MAX_CONSECUTIVE_FAILURES: u32 = 3;
const KEEP_RECENT: usize = 6;
/// 上下文压缩配置。
#[derive(Debug, Clone)]
pub struct CompactConfig {
/// 模型上下文窗口大小(token 数)。
pub context_window: u32,
/// 为输出预留的 token 数。
pub reserved_tokens: u32,
/// 微压缩保留的最近消息数。
pub keep_recent: usize,
}
impl Default for CompactConfig {
fn default() -> Self {
Self {
context_window: 128_000,
reserved_tokens: RESERVED_OUTPUT_TOKENS,
keep_recent: KEEP_RECENT,
}
}
}
impl CompactConfig {
pub fn threshold(&self) -> u32 {
self.context_window
.saturating_sub(self.reserved_tokens)
.saturating_sub(AUTOCOMPACT_BUFFER_TOKENS)
}
}
/// 压缩状态——跟踪连续失败次数(断路器模式)。
#[derive(Debug, Clone)]
pub struct CompactState {
consecutive_failures: u32,
}
impl CompactState {
pub fn new() -> Self;
pub fn record_success(&mut self);
/// 记录失败,返回 true 表示已达断路器上限。
pub fn record_failure(&mut self) -> bool;
}
/// 粗略估计消息列表的 token 数(基于字符数,4 字符 ≈ 1 token)。
pub fn estimate_message_tokens(messages: &[Message]) -> u32;
/// 判断是否需要触发自动压缩。
pub fn should_compact(messages: &[Message], config: &CompactConfig, state: &CompactState) -> bool;
/// 执行微压缩——用占位符替换旧的 tool result 内容。
/// 返回释放的 token 数。
pub fn microcompact(messages: &mut Vec<Message>, keep_recent: usize) -> u32;
与 LlmCycle 集成:
LlmCycle新增字段compact_config: Option<CompactConfig>,compact_state: CompactState- 新增 builder 方法
with_compact_config() submit()开始时调用should_compact()→microcompact()
完整 LLM 摘要压缩留占位,Phase 2 实现(需要循环内调用 LLM 的能力)。
无新增依赖。
实现计划
Step 1: 先写方案文档
创建 docs/3-phase0-remaining.md(即本文档)。
Step 2: ProviderRegistry
- 创建
src/llm/provider/registry.rs provider.rs添加pub mod registry;cargo check验证
Step 3: HookExecutor
- 创建
src/llm/hooks.rs llm.rs添加pub mod hooks;LlmCycle新增字段和方法submit()中插入钩子触发点cargo check验证
Step 4: StreamEvents
Cargo.toml添加tokio-stream- 创建
src/llm/stream.rs llm.rs添加pub mod stream;LlmProvider添加chat_stream()(默认回退)OpenaiProvider实现 SSE 解析LlmCycle添加submit_stream()cargo check验证
Step 5: Auto-compaction
- 创建
src/llm/compact.rs llm.rs添加pub mod compact;LlmCycle新增字段和方法submit()开头插入压缩检查cargo check验证
Step 6: 收尾
cargo clippy— 无警告cargo build --release— 完整构建- 检查所有新公开 API 有
///注释
风险评估
| 风险 | 概率 | 影响 | 缓解措施 |
|---|---|---|---|
| SSE 解析边界情况 | 中 | 中 | 参考 reqwest 的 chunked 响应处理;先用简单的 lines() 方式逐行读取 |
| token 估算不准 | 中 | 低 | 仅用于触发微压缩的阈值判断,保守估算即可;后续可接入 tiktoken-rs |
| 钩子阻断语义复杂 | 低 | 中 | PreRequest 阻断后返回明确错误消息;其他事件点只读不阻断 |
| 与后续 Phase 冲突 | 低 | 高 | 保持接口向后兼容,全用可选集成(Option) |
验收标准
cargo check编译通过cargo clippy无警告- 4 个模块文件存在且路径正确
ProviderRegistry支持注册/查找/默认 ProviderHookExecutor支持 4 个事件点注册与触发StreamEvents支持从 SSE chunk 解析为语义事件Auto-compaction支持 token 估算与微压缩LlmCycle向后兼容(新增字段全为 Option,不影响现有代码)