Files
agcore/src/llm/stream.rs
T
徐涛 32f3edaf19 feat(llm): 实现 Phase 0 剩余四个模块
实现 ProviderRegistry、HookExecutor、StreamEvents 和 Auto-compaction 模块,并集成到 LlmCycle 中
2026-06-02 08:51:42 +08:00

109 lines
3.6 KiB
Rust

//! 流式事件系统 —— 将 LLM 流式响应解析为语义化事件。
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_util::future::poll_fn;
use futures_util::FutureExt;
use serde_json::Value;
use crate::llm::error::LlmError;
use crate::llm::types::{FinishReason, OpenaiChatChunk, OpenaiToolCall, Usage};
/// 流式事件 —— LLM 调用全生命周期的语义化事件。
#[derive(Debug, Clone)]
pub enum StreamEvent {
/// 助手回复文本增量。
AssistantTextDelta { text: String },
/// 工具调用开始。
ToolExecutionStarted {
tool_name: String,
input: Value,
tool_call_id: String,
},
/// 工具调用完成。
ToolExecutionCompleted {
tool_name: String,
output: Value,
is_error: bool,
},
/// Token 用量更新。
CostUpdate { usage: Usage },
/// 一轮会话完成。
TurnComplete { reason: FinishReason },
/// 错误事件。
Error { message: String },
}
impl StreamEvent {
fn error(message: impl Into<String>) -> Self {
Self::Error {
message: message.into(),
}
}
}
/// 将原始 OpenaiChatChunk 流解析为 StreamEvent 流。
pub fn parse_chunk_stream(
chunks: Pin<Box<dyn futures_core::Stream<Item = Result<OpenaiChatChunk, LlmError>> + Send>>,
) -> Pin<Box<dyn futures_core::Stream<Item = StreamEvent> + Send>> {
Box::pin(ChunkToEventStream { chunks })
}
struct ChunkToEventStream {
chunks: Pin<Box<dyn futures_core::Stream<Item = Result<OpenaiChatChunk, LlmError>> + Send>>,
}
impl Stream for ChunkToEventStream {
type Item = StreamEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
poll_fn(|cx| match Pin::new(&mut this.chunks).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
for choice in &chunk.choices {
let delta = &choice.delta;
if let Some(content) = &delta.content {
return Poll::Ready(Some(StreamEvent::AssistantTextDelta {
text: content.clone(),
}));
}
if let Some(tool_calls) = &delta.tool_calls
&& let Some(tc) = tool_calls.first()
{
let OpenaiToolCall::Function { id, function } = tc;
let args: Value =
serde_json::from_str(&function.arguments).unwrap_or(Value::Null);
return Poll::Ready(Some(StreamEvent::ToolExecutionStarted {
tool_name: function.name.clone(),
input: args,
tool_call_id: id.clone(),
}));
}
if let Some(finish_reason) = &choice.finish_reason {
return Poll::Ready(Some(StreamEvent::TurnComplete {
reason: *finish_reason,
}));
}
}
if let Some(usage) = &chunk.usage {
return Poll::Ready(Some(StreamEvent::CostUpdate {
usage: *usage,
}));
}
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(StreamEvent::error(e.to_string()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
})
.poll_unpin(cx)
}
}