10.3 合并(Coalescing)与人性化节奏

生成模型:Claude Opus 4.6 (anthropic/claude-opus-4-6) Token 消耗:输入 ~500,000 tokens,输出 ~40,000 tokens(本章合计)


块流解决了"把连续文本切成合适大小的块"的问题,但还有一个体验问题——如果 Agent 快速生成了多个短块,用户会收到一连串"叮叮叮"的消息通知。合并(Coalescing) 机制将短时间内产生的多个块合并为一条消息;人性化延迟(Human Delay) 则模拟人类的打字节奏,让 Agent 的回复不那么"机器感"。

10.3.1 连续块合并策略:空闲间隙、最小/最大字符数

块发射控制

subscribeEmbeddedPiSession 中,emitBlockChunk 是块发射的统一出口。它在发射前执行多重过滤:

// src/agents/pi-embedded-subscribe.ts(简化)
const emitBlockChunk = (text: string) => {
  if (state.suppressBlockChunks) return;     // 压制中(已有最终文本)
  
  // 1. 剥离 <think>/<final> 标签
  const chunk = stripBlockTags(text, state.blockState).trimEnd();
  if (!chunk) return;
  
  // 2. 去重:与上一次发射的相同 → 跳过
  if (chunk === state.lastBlockReplyText) return;
  
  // 3. 消息工具去重:已通过 messaging tool 发送 → 跳过
  const normalizedChunk = normalizeTextForComparison(chunk);
  if (isMessagingToolDuplicateNormalized(
    normalizedChunk, messagingToolSentTextsNormalized
  )) return;
  
  // 4. 助手文本去重:与最近的 assistantText 相同 → 跳过
  if (shouldSkipAssistantText(chunk)) return;
  
  // 5. 记录并发射
  state.lastBlockReplyText = chunk;
  assistantTexts.push(chunk);
  
  if (!params.onBlockReply) return;
  
  // 6. 解析回复指令(媒体 URL、音频标记、引用标签)
  const splitResult = replyDirectiveAccumulator.consume(chunk);
  if (!splitResult) return;
  
  void params.onBlockReply({
    text: splitResult.text,
    mediaUrls: splitResult.mediaUrls,
    audioAsVoice: splitResult.audioAsVoice,
    replyToId: splitResult.replyToId,
    replyToCurrent: splitResult.replyToCurrent,
  });
};

块边界模式

块的边界由 blockReplyBreak 参数控制:

模式
行为
适用场景

"text_end"

每个 text_end 事件触发块刷新

实时流(Telegram 草稿模式)

"message_end"

仅在 message_end 时刷新

完整消息(电子邮件、短信)

text_end 模式下,每次收到文本结束事件时都会强制刷新块分块器:

工具执行前的强制刷新

当工具开始执行时,需要先刷新已有的文本块——确保用户在看到工具操作之前,已经看到了 Agent 的所有文字解释:

消息边界重置

每个新的 assistant 消息开始时,所有块状态会被重置:

10.3.2 人性化延迟(Human Delay):natural / custom 模式

在 9.3 节中我们提到了 resolveHumanDelayConfig,它返回人性化延迟的配置。在消息发送层面,这个延迟会被应用到块回复的发射间隔上。

配置

工作原理

人性化延迟并不在 subscribeEmbeddedPiSession 内部实现——它在消息发送层(如 Telegram bot 适配器、Signal 适配器)中应用。当 onBlockReply 回调被触发时,适配器会:

  1. 发送"正在输入"(typing)状态指示

  2. 等待一个随机的延迟时间(minMs ~ maxMs

  3. 发送实际的消息内容

这种设计将流式传输的内部机制(subscribeEmbeddedPiSession)与外部表现(通道适配器)解耦——内部总是尽快处理,延迟由外层按需添加。

压缩期间的缓冲重置

当自动压缩触发时,所有已缓冲的块和状态需要被清除:

这确保压缩后的重试从完全干净的状态开始——用户不会看到压缩前的部分文本紧接着压缩后的新文本。


本节小结

  1. 块发射emitBlockChunk)在发射前执行五重过滤:压制检查、标签剥离、文本去重、消息工具去重、助手文本去重。

  2. 两种边界模式text_end / message_end)适应不同通道的需求——实时通道倾向频繁刷新,批量通道倾向完整消息。

  3. 工具执行前刷新确保用户在看到工具操作前已收到所有解释文本。

  4. 人性化延迟在消息发送层(通道适配器)实现,与流式传输内部解耦。

  5. 压缩重置确保自动压缩后,流式传输从干净状态重新开始。

Last updated