10.1 流式传输架构
10.1.1 Pi Agent Core 事件流 → OpenClaw 事件桥接
// src/agents/pi-embedded-subscribe.ts(简化)
export function subscribeEmbeddedPiSession(
params: SubscribeEmbeddedPiSessionParams
) {
// 1. 初始化状态
const state: EmbeddedPiSubscribeState = {
assistantTexts: [], // 最终收集的助手文本片段
toolMetas: [], // 工具调用元数据
deltaBuffer: "", // delta 累积缓冲
blockBuffer: "", // 块流缓冲
blockState: { thinking: false, final: false, ... },
compactionInFlight: false,
messagingToolSentTexts: [],
// ...更多状态
};
// 2. 创建块分块器(可选)
const blockChunker = params.blockReplyChunking
? new EmbeddedBlockChunker(params.blockReplyChunking)
: null;
// 3. 构建上下文对象
const ctx: EmbeddedPiSubscribeContext = {
params, state, blockChunker,
emitBlockChunk, flushBlockReplyBuffer,
stripBlockTags, emitReasoningStream,
// ...更多方法
};
// 4. 注册事件处理器
const unsubscribe = params.session.subscribe(
createEmbeddedPiSessionEventHandler(ctx)
);
// 5. 返回控制句柄
return {
assistantTexts, // 可读取的文本片段数组
toolMetas, // 工具元数据
unsubscribe, // 取消订阅
isCompacting: () => state.compactionInFlight,
waitForCompactionRetry: () => { ... },
didSendViaMessagingTool: () => messagingToolSentTexts.length > 0,
};
}订阅参数
10.1.2 流事件类型:lifecycle / assistant / tool
lifecycle / assistant / toollifecycle 流:生命周期事件
事件
含义
触发时机
assistant 流:助手文本事件
tool 流:工具执行事件
compaction 流:压缩事件
10.1.3 原始流处理(src/agents/pi-embedded-subscribe.raw-stream.ts)
src/agents/pi-embedded-subscribe.raw-stream.ts)事件处理器路由
本节小结
Last updated