目录

  1. 核心查询引擎 (query.ts) 深度解析

  2. 工具编排系统 (toolOrchestration.ts) 深度解析

  3. 工具执行管道 (toolExecution.ts) 深度解析

  4. API 通信层 (claude.ts) 深度解析

  5. 权限系统 (permissions.ts) 深度解析

  6. 上下文压缩系统 (compact.ts) 深度解析

  7. 会话持久化 (sessionStorage.ts) 深度解析

  8. 应用状态管理 (AppStateStore.ts) 深度解析

  9. 关键时序图汇总

  10. 设计模式与架构亮点总结


1. 核心查询引擎 (query.ts) 深度解析

1.1 文件概览

属性

文件路径

src/query.ts

代码行数

~1730 行

核心导出

query() 异步生成器函数

职责

管理整个对话轮次的主循环

1.2 query() 函数签名

export async function* query(
  params: QueryParams,
): AsyncGenerator<
  StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage,
  Terminal  // 返回值类型
>

设计精妙之处:使用 AsyncGenerator 实现「边迭代边产出」,调用方(REPL)可以在工具执行、API 流式响应等各个阶段实时拿到更新并渲染 UI,而不必等整个轮次结束。

1.3 核心状态机 (State 类型)

type State = {
  messages: Message[]                    // 当前对话历史
  toolUseContext: ToolUseContext          // 工具执行上下文(可变)
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number    // max_output_tokens 重试计数
  hasAttemptedReactiveCompact: boolean   // 是否已尝试响应式压缩
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<...> | undefined
  stopHookActive: boolean | undefined
  turnCount: number                      // 当前轮次计数
  transition: Continue | undefined       // 上一轮的延续原因
}

1.4 主循环伪代码流程

function* queryLoop(params, consumedCommandUuids):
    1. 初始化 State(messages, toolUseContext 等)
    2. 创建 budgetTracker(TOKEN_BUDGET 特性门控)
    3. 构建 QueryConfig(一次性快照,避免每轮重复读取)
    ↓
    LOOP(无限循环,直到 return 或 throw):
    ├── 4. 启动预取:
    │   ├── startRelevantMemoryPrefetch()  — 记忆文件预取
    │   └── startSkillDiscoveryPrefetch()  — 技能发现预取
    │
    ├── 5. 上下文预处理(压缩三部曲):
    │   ├── [a] snipCompactIfNeeded()     — HISTORY_SNIP:裁剪旧消息
    │   ├── [b] microcompact()            — CACHED_MICROCOMPACT:缓存编辑压缩
    │   └── [c] applyCollapsesIfNeeded() — CONTEXT_COLLAPSE:上下文折叠
    │
    ├── 6. autoCompact()                  — 自动压缩(超 token 阈值时触发)
    │
    ├── 7. 构建 API 请求参数:
    │   ├── messagesForQuery = getMessagesAfterCompactBoundary(messages)
    │   ├── applyToolResultBudget()       — 限制单条消息的工具结果大小
    │   ├── fullSystemPrompt = appendSystemContext(systemPrompt, systemContext)
    │   └── 确定 currentModel(考虑 plan 模式 + 200k 阈值)
    │
    ├── 8. 阻塞限制检查:
    │   └── calculateTokenWarningState()  — 超阈值则拒绝,要求手动 /compact
    │
    ├── 9. 调用 API(callModel):
    │   └── for await (const message of deps.callModel(...)):
    │       ├── text_delta    → 累积文本,yield 给 UI
    │       ├── tool_use      → 收集 toolUseBlocks,加入 streamingToolExecutor
    │       ├── thinking      → 累积思考过程
    │       └── error         →  withholding(延迟到恢复路径确定后再 yield)
    │
    ├── 10. 工具执行阶段:
    │   └── yield* runTools(toolUseBlocks, assistantMessages, ...)
    │       ├── 并发安全工具(read-only)→ 并发执行
    │       └── 非并发安全工具(write)→ 串行执行
    │
    ├── 11. 后处理:
    │   ├── 执行 stop hooks
    │   ├── 检查 needsFollowUp(是否有待处理工具调用)
    │   └── 如果 needsFollowUp = false → break(轮次结束)
    │
    └── 12. yield tombstone 消息,清理,进入下一轮

1.5 错误恢复机制

query.ts 实现了多层错误恢复:

恢复机制

触发条件

行为

FallbackTriggeredError

当前模型不可用

切换到fallbackModel,重新发送整个请求

max_output_tokens

输出 token 超限

maxOutputTokensRecoveryCount < 3 时截断并重试

prompt_too_long

输入超长

触发 snipCompact / reactiveCompact 恢复路径

APIUserAbortError

用户取消(Ctrl+C)

清理并退出循环

Withheld Error

CONTEXT_COLLAPSE / REACTIVE_COMPACT 启用

延迟 yield 错误消息,等恢复路径完成

1.6 Task Budget 跟踪

// task_budget 跨压缩边界跟踪
let taskBudgetRemaining: number | undefined = undefined

// 每次压缩前捕获「压缩前的 context window」
// 压缩后,服务器只看到摘要,会少算 spend
// remaining 告诉服务器「被压缩掉的那部分 context 也占 budget」
if (params.taskBudget) {
  const preCompactContext = finalContextTokensFromLastResponse(messagesForQuery)
  taskBudgetRemaining = Math.max(
    0,
    (taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext,
  )
}

1.7 设计亮点

  1. AsyncGenerator 双向流:API 流 → 工具执行 → UI 渲染,全部在一个生成器内完成,调用方通过 for await 驱动

  2. State 对象集中管理可变状态:每次迭代解构 statecontinue 站点统一写回 state = { ... },避免散落各处的赋值

  3. withheld 错误延迟机制:可恢复错误先不 yield,等恢复路径(collapse / reactive compact)完成再决定是 yield 错误还是 yield 恢复结果

  4. QueryConfig 一次性快照buildQueryConfig() 在循环外调用一次,避免每轮重复读取 GrowthBook / env / session,减少不必要的重新渲染


2. 工具编排系统 (toolOrchestration.ts) 深度解析

2.1 文件概览

属性

文件路径

src/services/tools/toolOrchestration.ts

代码行数

~190 行

核心导出

runTools() 异步生成器函数

职责

将工具调用分批,决定并发/串行执行策略

2.2 runTools() 函数签名

export async function* runTools(
  toolUseMessages: ToolUseBlock[],
  assistantMessages: AssistantMessage[],
  canUseTool: CanUseToolFn,
  toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void>

2.3 并发安全分区算法 (partitionToolCalls)

这是整个工具系统的核心调度算法:

type Batch = {
  isConcurrencySafe: boolean    // true = 可并发(read-only)
  blocks: ToolUseBlock[]        // 本批的工具调用块
}

function partitionToolCalls(
  toolUseMessages: ToolUseBlock[],
  toolUseContext: ToolUseContext,
): Batch[] {
  return toolUseMessages.reduce((acc: Batch[], toolUse) => {
    const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
    const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
    const isConcurrencySafe = parsedInput?.success
      ? (() => {
          try {
            return tool!.isConcurrencySafe(parsedInput.data, toolUseContext)
          } catch {
            return false  // 保守:异常时假定不安全
          }
        })()
      : false

    const lastBatch = acc[acc.length - 1]

    if (isConcurrencySafe) {
      // 连续多个 read-only 工具合并到同一批
      if (lastBatch?.isConcurrencySafe) {
        lastBatch.blocks.push(toolUse)
      } else {
        acc.push({ isConcurrencySafe: true, blocks: [toolUse] })
      }
    } else {
      // 每个非并发安全工具独占一批(保证串行)
      acc.push({ isConcurrencySafe: false, blocks: [toolUse] })
    }
    return acc
  }, [])
}

分区规则总结

  • Read-only 工具(如 FileReadToolGlobToolGrepTool)→ isConcurrencySafe = true → 合并并发

  • Write 工具(如 FileEditToolFileWriteToolBashTool)→ isConcurrencySafe = false → 每个独占一批,串行执行

  • 并发上限:CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY(默认 10)

2.4 并发执行路径 (runToolsConcurrently)

async function* runToolsConcurrently(...) {
  // 使用 Promise.all 并发执行所有工具
  const results = await Promise.all(
    blocks.map(block =>
      runToolUse(block, assistantMessage, ...)
        .next()  // 取第一个 yield(最终结果)
    )
  )
  // 收集 contextModifier(用于跨工具上下文传递)
  for (const result of results) {
    if (result.value.contextModifier) {
      // 排队,等所有并发完成后统一应用
      queuedContextModifiers[block.id].push(modifyContext)
    }
  }
  // 统一应用所有 contextModifier
  for (const block of blocks) {
    for (const modifier of queuedContextModifiers[block.id]) {
      currentContext = modifier(currentContext)
    }
  }
}

为什么 contextModifier 要排队后统一应用?
并发执行时,工具 A 和工具 B 同时修改 context,如果各自直接修改,会产生竞态条件。排队后在所有并发完成后按固定顺序应用,保证确定性。

2.5 串行执行路径 (runToolsSerially)

async function* runToolsSerially(...) {
  for (const block of blocks) {
    // 逐个执行,每次 yield 中间状态
    for await (const update of runToolUse(block, ...)) {
      if (update.newContext) {
        currentContext = update.newContext  // 串行:立即应用上下文变更
      }
      yield update
    }
  }
}

2.6 设计亮点

  1. 分区算法保证正确性:read-only 并发,write 串行,无需分布式锁

  2. contextModifier 排队机制:并发工具的上下文修改延迟到所有工具完成后统一应用,避免竞态

  3. MessageUpdateLazy 惰性更新:工具执行产生的中间状态通过生成器流式 yield,UI 可以实时渲染进度


3. 工具执行管道 (toolExecution.ts) 深度解析

3.1 文件概览

属性

文件路径

src/services/tools/toolExecution.ts

代码行数

~1747 行

核心导出

runToolUse() 生成器函数

职责

单个工具的完整生命周期管理

3.2 runToolUse() 完整生命周期

runToolUse(toolUse, assistantMessage, canUseTool, toolUseContext)
│
├── 1. 查找工具
│   ├── findToolByName(tools, toolName)  → 在可用工具列表中查找
│   └── 未找到?→ 检查别名(aliases)→ 仍找不到 → yield 错误信息
│
├── 2. 检查取消
│   └── abortController.signal.aborted?→ yield CANCEL_MESSAGE,返回
│
├── 3. 权限检查 + 执行(streamedCheckPermissionsAndCallTool)
│   ├── 3a. pre-tool-use hooks(runPreToolUseHooks)
│   ├── 3b. canUseTool() → 权限决策
│   │   ├── allow → 继续执行
│   │   ├── deny  → yield 错误结果,触发 denial tracking
│   │   └── ask   → 暂停,等待用户批准(通过 Stream 桥接)
│   ├── 3c. 启动 OTel tracing span(isBetaTracingEnabled)
│   ├── 3d. 执行工具:tool.call(input, context, canUseTool, assistantMessage)
│   ├── 3e. post-tool-use hooks(runPostToolUseHooks)
│   └── 3f. 后处理 hooks(runPostToolUseFailureHooks / runPostToolUseSuccessHooks)
│
├── 4. 处理工具结果
│   ├── 成功:yield { message: UserMessage (tool_result), newContext }
│   └── 失败:yield { message: UserMessage (is_error: true), newContext }
│
└── 5. 返回 (done)

3.3 权限检查详细流程 (checkPermissionsAndCallTool 内部)

async function checkPermissionsAndCallTool(...) {
  // 1. 解析工具输入(Zod schema 校验)
  const parseResult = tool.inputSchema.safeParse(toolInput)
  if (!parseResult.success) {
    // 输入校验失败 → 返回 Zod 错误信息
    return yield { message: createUserMessage({ content: formatZodValidationError(...) }) }
  }

  // 2. 检查工具是否被禁用(isEnabled)
  if (!tool.isEnabled?.(toolInput, toolUseContext.options)) {
    return yield { message: createUserMessage({ content: 'Tool is disabled' }) }
  }

  // 3. 权限决策(核心)
  const permissionResult: PermissionResult = await canUseTool(
    tool.name,
    parsedInput.data,
    toolUseContext,
    assistantMessage,
  )

  // 4. 根据决策执行
  if (permissionResult.behavior === 'allow') {
    // 执行工具
    const toolResult = await tool.call(parsedInput.data, toolUseContext, canUseTool, assistantMessage)
    // 处理 toolResult(可能是 ToolResult 或 AsyncGenerator)
    ...
  } else if (permissionResult.behavior === 'deny') {
    // 记录 denial tracking(触发多次拒绝后的自动建议)
    recordDenial(tool.name, toolUseContext)
    // 执行 denial hooks
    await executePermissionDeniedHooks(...)
    // yield 拒绝消息
    yield { message: createUserMessage({ content: [{ type: 'tool_result', is_error: true, ... }] }) }
  } else {
    // 'ask' — 暂停,等待用户响应
    // 通过 Stream 桥接实现「暂停-恢复」
    const decision = await resolveHookPermissionDecision(...)
    // 递归调用自身处理用户响应
    return yield* checkPermissionsAndCallTool(...)
  }
}

3.4 MCP 工具特殊处理

function findMcpServerConnection(
  toolName: string,
  mcpClients: MCPServerConnection[],
): MCPServerConnection | undefined {
  if (!toolName.startsWith('mcp__')) return undefined
  const mcpInfo = mcpInfoFromString(toolName)
  // 归一化比较:工具名可能是归一化后的(下划线),client.name 可能是原始名(空格)
  return mcpClients.find(
    client => normalizeNameForMCP(client.name) === mcpInfo.serverName
  )
}

MCP 工具的 call() 实现在 MCPTool 类里,它会:

  1. 通过 MCPServerConnection 找到对应的 MCP 客户端

  2. 序列化工具参数为 JSON-RPC 请求

  3. 通过 StdioClientTransport / StreamableHTTPClientTransport 发送到 MCP 服务器

  4. 将 MCP 服务器的 JSON-RPC 响应反序列化为 ToolResult

3.5 工具执行结果处理 (processToolResultBlock)

function processToolResultBlock(
  toolResult: ToolResult,
  toolUseID: string,
  tool: Tool,
): UserMessage {
  // 1. 提取 data(工具返回的主要结果)
  // 2. 检查是否有 contextModifier(需要修改 ToolUseContext)
  // 3. 检查是否有附加消息(newMessages)
  // 4. 组装标准的 tool_result content block
  return createUserMessage({
    content: [{
      type: 'tool_result',
      tool_use_id: toolUseID,
      content: toolResult.data,  // 可能是 string | object | binary
      is_error: toolResult.isError ?? false,
    }],
    toolUseResult: toolResult.data,
    sourceToolAssistantUUID: assistantMessage.uuid,
  })
}

3.6 错误分类遥测 (classifyToolError)

function classifyToolError(error: unknown): string {
  // 1. TelemetrySafeError → 使用其 telemetryMessage(已脱敏)
  if (error instanceof TelemetrySafeError) {
    return error.telemetryMessage.slice(0, 200)
  }
  // 2. Node.js fs 错误 → 返回错误码(ENOENT, EACCES 等,安全且有用)
  const errnoCode = getErrnoCode(error)
  if (typeof errnoCode === 'string') return `Error:${errnoCode}`
  // 3. 已知错误类型(ShellError, ImageSizeError 等)→ 返回 name(minification 安全)
  if (error.name && error.name !== 'Error' && error.name.length > 3) {
    return error.name.slice(0, 60)
  }
  // 4. 兜底 → 'Error'(比 minified 后的 'nJT' 更有意义)
  return 'Error'
}

3.7 设计亮点

  1. Stream 实现「暂停-等待用户响应」:权限询问时,函数暂停在 await resolveHookPermissionDecision(),用户响应后通过 Stream.enqueue 恢复执行

  2. OTel tracing 集成:每个工具调用都有完整的 span 生命周期(startToolSpanendToolSpan),支持分布式追踪

  3. classifyToolError 遥测安全:minified 生产环境中 error.constructor.name 是乱码,这个函数提供了结构化的、安全的错误分类

  4. Hook 生命周期完整:pre-tool-use → 执行 → post-tool-use → success/failure hooks,完整覆盖


4. API 通信层 (claude.ts) 深度解析

4.1 文件概览

属性

文件路径

src/services/api/claude.ts

代码行数

~3420 行

核心导出

callModel() 函数(通过 deps.callModel 注入)

职责

封装 Anthropic API 调用,处理 beta headers、重试、流式解析

4.2 callModel() 调用链

deps.callModel(options)
│
├── callModel 内部调用 setupCallModel(deps)()
│
└── setupCallModel(deps)(options)
    │
    ├── 1. 构建请求参数(BetaMessageStreamParams)
    │   ├── messages: normalizeMessagesForAPI(messages)
    │   ├── system: fullSystemPrompt(可能包含缓存断点)
    │   ├── tools: toolToAPISchema(tools)(转换为 Anthropic API schema)
    │   ├── model: currentModel
    │   ├── thinking: thinkingConfig
    │   ├── betas: 合并所有 beta headers
    │   └── extra_body: getExtraBodyParams(betaHeaders)
    │
    ├── 2. 注入自定义 headers
    │   ├── anthropic-beta         → beta headers
    │   ├── X-API-Key             → API Key
    │   ├── Claude-Code-*          → 自定义业务 headers
    │   └──(client_request_id)    → 请求追踪 ID
    │
    ├── 3. 调用 Anthropic SDK
    │   └── anthropic.beta.messages.stream(params, options)
    │       └── 返回 AsyncStream<BetaRawMessageStreamEvent>
    │
    ├── 4. 流式事件处理(for await)
    │   ├── message_start      → 初始化 assistantMessage
    │   ├── content_block_start → 开始新的 content block(text / tool_use / thinking)
    │   ├── content_block_delta → 增量更新(text_delta / input_json_delta)
    │   ├── content_block_stop  → 结束当前 block
    │   ├── message_delta      → 更新 usage(input_tokens, output_tokens)
    │   └── message_stop      → 流结束
    │
    ├── 5. 错误处理 + 重试
    │   ├── withRetry() 包裹整个调用
    │   ├── FallbackTriggeredError → 切换模型重试
    │   ├── 529 (Overloaded)      → 指数退避重试
    │   └── 其他 APIError         → 有限次数重试
    │
    └── 6. 返回最终结果
        └── yield assistantMessage(完整组装好的 AssistantMessage)

4.3 Beta Headers 管理

claude.ts 中处理了大量的 beta 功能开关,通过 headers 传递给 API:

// 从 getMergedBetas() 收集所有需要的 beta headers
const betaHeaders = [
  // 1. 基础 betas(从工具、MCP 等收集)
  ...toolBetas,
  ...mcpBetas,
  // 2. 模型特定 betas
  ...getModelBetas(model),
  // 3. 功能特定 betas
  isFastModeEnabled && FAST_MODE_BETA_HEADER,
  isAssistantMode && AFK_MODE_BETA_HEADER,
  isContextManagementEnabled && CONTEXT_MANAGEMENT_BETA_HEADER,
  // ...
].filter(Boolean)

// 合并到 extra_body
const extraBody = getExtraBodyParams(betaHeaders)
extraBody.anthropic_beta = betaHeaders  // 传递给 API

4.4 Prompt Caching 实现

export function getPromptCachingEnabled(model: string): boolean {
  // 1. 全局禁用检查
  if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
  // 2. 按模型禁用检查(Haiku / Sonnet / Opus 可分别禁用)
  if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) { ... }
  // 3. 检查模型是否支持 caching
  return modelSupportsCaching(model)
}

// 在系统提示中插入缓存断点:
// Anthropic API 使用 cache_control 字段标记缓存断点
// claude.ts 通过 splitSysPromptPrefix 在系统提示的合适位置插入断点
const { prefix, rest } = splitSysPromptPrefix(fullSystemPrompt)
// prefix 部分:cache_control = { type: 'ephemeral' }
// rest 部分:不缓存

4.5 withRetry() 重试机制

// services/api/withRetry.ts
export async function withRetry<T>(
  fn: () => Promise<T>,
  shouldRetry: (error: unknown, context: RetryContext) => boolean,
  maxRetries: number = 3,
): Promise<T> {
  let attempt = 0
  while (true) {
    try {
      return await fn()
    } catch (error) {
      attempt++
      if (attempt > maxRetries || !shouldRetry(error, { attempt, error })) {
        throw error
      }
      // 指数退避
      const delay = Math.min(1000 * Math.pow(2, attempt - 1), 30000)
      await sleep(delay)
    }
  }
}

可重试错误

  • 529 Overloaded:Anthropic 服务器过载

  • APIConnectionTimeoutError:网络超时

  • FallbackTriggeredError:模型不可用,切换后重试

4.6 dumpPromptsFetch — 提示词调试

// 创建 fetch wrapper 来捕获完整的 API 请求体(用于调试)
const dumpPromptsFetch = config.gates.isAnt
  ? createDumpPromptsFetch(toolUseContext.agentId ?? config.sessionId)
  : undefined

// createDumpPromptsFetch 返回一个自定义的 fetch 函数
// 它拦截所有发往 api.anthropic.com 的请求
// 将 request body(完整 prompt)写入调试文件
// 每个 query() 调用创建一个闭包,只捕获最新请求(避免内存泄漏)

4.7 设计亮点

  1. BetaMessageStreamParams 类型安全:完整映射 Anthropic Beta API 的所有参数,编译时检查

  2. withRetry 策略可插拔shouldRetry 函数作为参数传入,不同调用方可以定制重试策略

  3. Prompt Caching 透明化:上层代码无需关心缓存断点插入逻辑,getPromptCachingEnabled() + splitSysPromptPrefix() 自动处理

  4. dumpPromptsFetch 内存安全:每个 query() 调用创建一个新闭包,只保留最新请求体(~700KB),而不是所有历史(可能 500MB+)


5. 权限系统 (permissions.ts) 深度解析

5.1 文件概览

属性

文件路径

src/utils/permissions/permissions.ts

代码行数

~1487 行

核心类型

PermissionResult, PermissionRule, PermissionRuleSource

职责

实现三层权限决策:规则匹配 → Hook 检查 → 用户交互

5.2 权限决策流程

canUseTool(toolName, input, context, assistantMessage)
│
├── 1. 规则匹配(最快路径,无 I/O)
│   ├── 检查 alwaysDenyRules → 命中 → return { behavior: 'deny', reason: { type: 'rule', rule } }
│   ├── 检查 alwaysAllowRules → 命中 → return { behavior: 'allow', reason: { type: 'rule', rule } }
│   └── 未命中 → 继续
│
├── 2. Hook 检查(可异步,可阻断)
│   ├── runPreToolUseHooks(toolName, input)
│   │   └── 任何 hook 返回 deny → return { behavior: 'deny', reason: { type: 'hook', hookName } }
│   └── Hook 可修改工具输入(通过 contextModifier)
│
├── 3. 分类器检查(BASH_CLASSIFIER 特性)
│   ├── classifierDecisionModule?.classify(input) → 返回分类结果
│   ├── 高风险命令 → return { behavior: 'ask', reason: { type: 'classifier', ... } }
│   └── 低风险 → 继续
│
├── 4. 沙箱检查(Sandbox 模式)
│   ├── shouldUseSandbox() → true → 在沙箱中执行,无需用户批准
│   └── 沙箱外的操作 → 继续
│
├── 5. 模式检查(最后一道门)
│   ├── mode = 'default' → return { behavior: 'ask' }(询问用户)
│   ├── mode = 'acceptEdits' → allow 编辑类工具,ask 其他
│   ├── mode = 'bypass' → return { behavior: 'allow' }(全部允许)
│   └── mode = 'plan' → 只允许查看类工具
│
└── 6. 返回最终决策
    return { behavior: 'allow' | 'deny' | 'ask', reason?: PermissionDecisionReason }

5.3 权限规则系统

type PermissionRuleSource =
  | 'cliArg'          // 命令行参数(最高优先级)
  | 'session'         // 会话内临时规则
  | 'localSettings'   // .claude/settings.json(项目级)
  | 'userSettings'    // ~/.claude/settings.json(用户级)
  | 'policySettings'   // MDM / 策略配置
  | 'command'         // 通过 /allow 命令添加

type PermissionRule = {
  source: PermissionRuleSource
  ruleBehavior: 'allow' | 'deny' | 'ask'
  ruleValue: PermissionRuleValue  // 解析后的规则(工具名 + 可选的内容匹配器)
}

// 规则匹配示例:
// "Bash"               → 匹配整个 Bash 工具
// "Bash(prefix:git)"   → 匹配 Bash 工具中以 git 开头的命令
// "mcp__server1"       → 匹配来自 server1 的所有 MCP 工具
// "mcp__server1__*"    → 通配符,匹配 server1 的所有工具

5.4 toolMatchesRule() — 规则匹配算法

function toolMatchesRule(
  tool: Pick<Tool, 'name' | 'mcpInfo'>,
  rule: PermissionRule,
): boolean {
  // 1. 规则没有 content 匹配器 → 匹配整个工具
  if (rule.ruleValue.ruleContent === undefined) {
    // 直接比较工具名
    return rule.ruleValue.toolName === getToolNameForPermissionCheck(tool)
  }

  // 2. 规则有 content 匹配器 → 需要检查工具输入
  // 例如 "Bash(prefix:rm)" 只匹配 rm 命令,不匹配 ls
  if (tool.name === 'Bash') {
    const bashInput = toolInput as BashToolInput
    return matchesPrefix(bashInput.command, rule.ruleValue.ruleContent)
  }
  // ... 其他工具的 content 匹配逻辑
}

5.5 Denial Tracking — 智能建议系统

// utils/permissions/denialTracking.ts
const DENIAL_LIMITS = [1, 3, 5]  // 拒绝次数阈值

function recordDenial(toolName: string, context: ToolUseContext): void {
  const state = getDenialTrackingState(context)
  state.denials[toolName] = (state.denials[toolName] || 0) + 1

  // 达到阈值?→ 建议用户添加规则
  if (DENIAL_LIMITS.includes(state.denials[toolName])) {
    showSuggestion(`Allow ${toolName} for this session? [y/n]`)
  }
}

用户体验优化:连续拒绝同一工具 3 次后,Claude Code 会主动询问「是否要永久允许这个工具?」,减少重复确认。

5.6 OTel 遥测集成

// decisionReasonToOTelSource() — 将权限决策原因映射到 OTel 标准词汇
function decisionReasonToOTelSource(
  reason: PermissionDecisionReason | undefined,
  behavior: 'allow' | 'deny',
): string {
  switch (reason?.type) {
    case 'rule':
      return reason.rule.source === 'session'
        ? (behavior === 'allow' ? 'user_temporary' : 'user_reject')
        : (behavior === 'allow' ? 'user_permanent' : 'user_reject')
    case 'hook':
      return 'hook'
    case 'mode':
      return 'config'
    // ...
  }
}

5.7 设计亮点

  1. 三层决策(规则 → Hook → 模式):规则最快(纯内存查找),Hook 灵活(可调用外部脚本),模式兜底(用户体验保证)

  2. source 精细化控制:规则来源影响持久化策略(session = 临时,localSettings = 永久)

  3. Denial Tracking 主动建议:减少用户重复操作,提升体验

  4. OTel 标准遥测:权限决策全程可追踪,支持 user_temporary / user_permanent / user_reject / hook / config 等标准词汇


6. 上下文压缩系统 (compact.ts) 深度解析

6.1 文件概览

属性

文件路径

src/services/compact/compact.ts

代码行数

~1706 行

核心导出

autocompact() 函数

职责

当上下文接近 token 限制时,自动压缩历史消息

6.2 压缩触发条件

export function isAutoCompactEnabled(): boolean {
  // 1. 检查特性开关
  if (!feature('AUTO_COMPACT')) return false
  // 2. 检查用户设置
  const settings = getSettings_DEPRECATED()
  if (settings.disableAutoCompact) return false
  return true
}

function shouldCompact(
  messages: Message[],
  model: string,
  autoCompactTracking?: AutoCompactTrackingState,
): boolean {
  const tokenCount = tokenCountWithEstimation(messages)
  const threshold = getAutoCompactThreshold(model)  // 通常是模型限制的 80%
  return tokenCount > threshold
}

6.3 压缩流程

autocompact(messages, toolUseContext, options, querySource, tracking, snipTokensFreed)
│
├── 1. 检查是否应该压缩
│   └── shouldCompact() → false → return null(不压缩)
│
├── 2. 执行 pre-compact hooks
│   └── executePreCompactHooks()
│
├── 3. 构建压缩提示词
│   ├── 选取要压缩的消息(通常是最早的 N 条)
│   ├── 保留最近的消息(最近 10 条或更少)
│   └── 组装成压缩请求(发给 API,让模型生成摘要)
│
├── 4. 调用 API 生成压缩摘要
│   └── API 返回一个简洁的摘要(替代被压缩的消息)
│
├── 5. 构建压缩后的消息列表
│   ├── SystemCompactBoundaryMessage(标记压缩边界)
│   ├── 压缩摘要消息(AssistantMessage)
│   └── 未压缩的近期消息
│
├── 6. 执行 post-compact hooks
│   └── executePostCompactHooks(compactionResult)
│
├── 7. 返回压缩结果
│   └── return {
│       preCompactTokenCount,
│       postCompactTokenCount,
│       summaryMessages,
│       compactionUsage,  // API 调用的 token 使用
│     }
│
└── 8. 如果压缩失败(consecutiveFailures > 0)
    └── 触发熔断:暂时禁用自动压缩,避免无限重试

6.4 三种压缩策略对比

策略

文件

触发时机

原理

Snip Compact

snipCompact.ts

HISTORY_SNIP 特性,在 autoCompact 之前运行

直接删除旧消息(不调用 API),最快但不生成摘要

Micro Compact

microCompact.ts

CACHED_MICROCOMPACT 特性,在 autoCompact 之前运行

通过缓存编辑减少 context 大小,保留原文

Auto Compact

compact.ts

token 超阈值时触发

调用 API 生成历史摘要,质量最高但慢

执行顺序(在 query.ts 主循环中):

snipCompact → microCompact → contextCollapse → autoCompact

6.5 压缩边界消息

// 压缩后在消息列表中插入边界标记
function createCompactBoundaryMessage(
  preCount: number,
  postCount: number,
  toolUseSummary?: ToolUseSummaryMessage,
): SystemCompactBoundaryMessage {
  return {
    type: 'system',
    systemMessageType: 'compact_boundary',
    content: `Context compressed: ${preCount} → ${postCount} messages`,
    ...
  }
}

6.6 设计亮点

  1. 三层压缩策略:Snip(快)→ Micro(中)→ Auto(质量高),逐级尝试

  2. 压缩熔断机制consecutiveFailures 跟踪连续失败次数,超过阈值后暂时禁用,避免无限重试循环

  3. Hook 生命周期:pre-compact → 压缩 → post-compact,完整可扩展

  4. Task Budget 感知:压缩后正确更新 taskBudgetRemaining,避免预算计算错误


7. 会话持久化 (sessionStorage.ts) 深度解析

7.1 文件概览

属性

文件路径

src/utils/sessionStorage.ts

代码行数

~5106 行

核心导出

appendTranscript(), loadTranscript(), resumeSession()

职责

会话的磁盘持久化(JSONL 格式)

7.2 存储格式

每条消息写入 ~/.claude/projects/<cwd-hash>/sessions/<session-id>.jsonl每行一个 JSON 对象(JSONL 格式,便于追加和流式读取):

{"type":"user","uuid":"...","message":{"role":"user","content":"Hello"}}
{"type":"assistant","uuid":"...","message":{"role":"assistant","content":[{"type":"text","text":"Hi!"}]}}
{"type":"user","uuid":"...","message":{"role":"user","content":[{"type":"tool_result",...}]}}

7.3 appendTranscript() — 追加写入

export function appendTranscript(
  sessionId: SessionId,
  messages: TranscriptMessage[],  // 只追加 TranscriptMessage(不含 ProgressMessage)
  options?: LogOption,
): void {
  const filePath = getTranscriptPath(sessionId)
  // 使用 fs.createWriteStream 追加,而不是读入内存再重写
  const stream = fs.createWriteStream(filePath, { flags: 'a' })
  for (const msg of messages) {
    stream.write(JSON.stringify(msg) + '\n')
  }
  stream.end()
}

为什么用 JSONL 而不是 JSON 数组?
会话可能非常大(GB 级),JSON 数组需要全部读入内存才能解析。JSONL 可以流式读取,且追加时只需写文件末尾,不需要重写整个文件。

7.4 loadTranscript() — 流式加载

export async function* loadTranscript(
  sessionId: SessionId,
): AsyncGenerator<TranscriptMessage, void> {
  const filePath = getTranscriptPath(sessionId)
  // 使用 readline 逐行读取(内存友好)
  const fileStream = createReadStream(filePath)
  const rl = createInterface({ input: fileStream })
  
  for await (const line of rl) {
    const entry = JSON.parse(line)
    if (isTranscriptMessage(entry)) {
      yield entry  // 只 yield TranscriptMessage,跳过 ProgressMessage
    }
  }
}

7.5 resumeSession() — 会话恢复

resumeSession(sessionId)
│
├── 1. 加载完整会话文件
│   └── loadTranscript(sessionId) → Message[]
│
├── 2. 截断到最后一个 compact_boundary
│   └── 只保留最近一个压缩边界后的消息(避免 context 过大)
│
├── 3. 重建 ToolUseContext
│   ├── 恢复 messages
│   ├── 恢复 abortController(新的 AbortController)
│   └── 恢复其他上下文状态
│
└── 4. 返回恢复后的状态
    return { messages, toolUseContext, ... }

7.6 Tombstone 机制

// 当 API 调用中途失败(如 fallback 切换模型)时,
// 已产生的部分 AssistantMessage 属于「孤儿消息」,
// 不能参与后续 API 调用(tool_use_id 不匹配)
// 解决:写入 tombstone 标记,加载时跳过这些消息

function createTombstoneMessage(message: AssistantMessage): TombstoneMessage {
  return {
    type: 'tombstone',
    tombstoneOf: message.uuid,
    // 不包含实际内容,只标记「此消息已废弃」
  }
}

7.7 设计亮点

  1. JSONL 格式:追加友好、流式读取、内存占用低

  2. Tombstone 机制:处理「API 调用中途失败」的边缘情况,保证会话文件一致性

  3. isTranscriptMessage() 类型守卫:明确区分 TranscriptMessageProgressMessage,避免进度消息污染对话链

  4. 大文件处理readHeadAndTail() 只读取文件头和尾(用于预览),不需要加载整个文件


8. 应用状态管理 (AppStateStore.ts) 深度解析

8.1 文件概览

属性

文件路径

src/state/AppStateStore.ts

代码行数

~571 行(不含类型定义)

核心导出

AppStateStore

职责

集中管理所有可变应用状态,提供 React 组件的 state 来源

8.2 AppState 类型(精简版)

export type AppState = DeepImmutable<{
  // 基础设置
  settings: SettingsJson
  verbose: boolean
  mainLoopModel: ModelSetting
  statusLineText: string | undefined

  // 权限
  toolPermissionContext: ToolPermissionContext

  // MCP
  mcp: {
    clients: MCPServerConnection[]
    tools: Tool[]
    commands: Command[]
    resources: Record<string, ServerResource[]>
    pluginReconnectKey: number
  }

  // 任务(Agent / Teammate / Workflow)
  tasks: { [taskId: string]: TaskState }

  // Kairos(持久助手)
  kairosEnabled: boolean
  remoteSessionUrl: string | undefined
  remoteBackgroundTaskCount: number

  // Bridge(远程控制)
  replBridgeEnabled: boolean
  replBridgeConnected: boolean
  replBridgeSessionActive: boolean
  // ...

  // 同伴(BUDDY 宠物)
  companionReaction?: string
  companionPetAt?: number
}> & {
  // 非 DeepImmutable 部分(包含函数类型)
  plugins: { ... }
}

8.3 状态更新机制

class AppStateStore {
  private state: AppState
  private listeners: Set<() => void> = new Set()

  // React 组件通过此函数订阅状态更新
  subscribe(listener: () => void): () => void {
    this.listeners.add(listener)
    return () => this.listeners.delete(listener)  // 返回取消订阅函数
  }

  // 更新状态(不变性保证)
  setState(update: (state: AppState) => AppState): void {
    const oldState = this.state
    const newState = update(oldState)
    this.state = newState
    // 通知所有监听器(触发 React 重新渲染)
    for (const listener of this.listeners) {
      listener()
    }
  }

  // 在 ToolUseContext 中使用(工具执行时读取/修改状态)
  getAppState(): AppState {
    return this.state
  }

  setAppState(update: (state: AppState) => AppState): void {
    this.setState(update)
  }
}

8.4 ToolUseContext 中的状态传递

type ToolUseContext = {
  // ...
  getAppState: () => AppState       // 读取当前状态
  setAppState: (update: (state: AppState) => AppState) => void  // 修改状态
  // ...
}

设计意图:工具执行时可能需要修改应用状态(例如 TodoWriteTool 更新 todo 列表,AgentTool 更新 tasks)。通过 ToolUseContext 传递 getAppState / setAppState,工具可以安全地修改状态,而不需要直接依赖 AppStateStore 单例。

8.5 React 集成(推测,基于 Ink 模式)

// REPL.tsx(主循环组件)
function REPL() {
  const [appState, setAppState] = useState(AppStateStore.getAppState())
  
  useEffect(() => {
    // 订阅状态更新
    return AppStateStore.subscribe(() => {
      setAppState(AppStateStore.getAppState())
    })
  }, [])

  return (
    <AppStateContext.Provider value={appState}>
      {/* 渲染 UI */}
    </AppStateContext.Provider>
  )
}

8.6 设计亮点

  1. DeepImmutable<AppState>:编译时强制不可变性,防止意外修改

  2. 爱挑剔更新模式setState(update) 接收函数而非值,避免竞态条件

  3. ToolUseContext 解耦:工具不需要知道 AppStateStore 的存在,通过 context 传递状态操作函数

  4. MCP 状态集中管理state.mcp 统一维护所有 MCP 连接、工具、资源,避免散落各处


9. 关键时序图汇总

9.1 完整工具执行时序图

sequenceDiagram participant REPL as REPL.tsx participant QE as query.ts<br/>(queryLoop) participant TO as toolOrchestration.ts<br/>(runTools) participant TE as toolExecution.ts<br/>(runToolUse) participant Tool as 具体 Tool 实现 participant Hook as Hooks 系统 participant Perm as 权限系统 REPL->>QE: yield* query(params) QE->>QE: 预处理(压缩、构建请求) QE->>QE: deps.callModel() → 流式 API 调用 QE->>QE: 解析 API 响应,收集 tool_use blocks QE->>TO: yield* runTools(toolUseBlocks, ...) loop 每个 Batch TO->>TO: partitionToolCalls() 决定并发/串行 alt 并发安全 Batch par 并发执行 TO->>TE: runToolUse(toolUse, ...) end else 非并发安全 Batch loop 串行执行 TO->>TE: runToolUse(toolUse, ...) end end TE->>Perm: canUseTool(toolName, input, ...) Perm->>Perm: 规则匹配(allow/deny/ask) alt 需要询问用户 TE->>REPL: yield ProgressMessage(显示权限对话框) REPL->>REPL: 用户点击「允许」或「拒绝」 REPL->>TE: Stream.enqueue(decision) end TE->>Hook: runPreToolUseHooks() TE->>Tool: tool.call(input, context, ...) Tool-->>TE: ToolResult TE->>Hook: runPostToolUseHooks() TE->>TO: yield MessageUpdate end TO->>QE: yield MessageUpdate(所有工具完成) QE->>QE: 组装 tool_result 消息 QE->>QE: 继续 API 调用(发送 tool_result) QE-->>REPL: yield AssistantMessage(最终响应)

9.2 权限检查详细时序图

sequenceDiagram participant Caller as 调用方<br/>(canUseTool) participant Rule as 规则匹配 participant Hook as Hook 系统 participant Classifier as Bash 分类器 participant Sandbox as 沙箱检查 participant Mode as 模式检查 Caller->>Rule: 1. 检查 alwaysDenyRules alt 命中拒绝规则 Rule-->>Caller: { behavior: 'deny', reason: { type: 'rule', rule } } else 未命中 Caller->>Rule: 2. 检查 alwaysAllowRules alt 命中允许规则 Rule-->>Caller: { behavior: 'allow', reason: { type: 'rule', rule } } else 未命中 Caller->>Hook: 3. runPreToolUseHooks() alt Hook 返回 deny Hook-->>Caller: { behavior: 'deny', reason: { type: 'hook', ... } } else Hook 通过 Caller->>Classifier: 4. classify(input)(如果启用) alt 高风险命令 Classifier-->>Caller: { behavior: 'ask', reason: { type: 'classifier', ... } } else 安全 Caller->>Sandbox: 5. shouldUseSandbox() alt 在沙箱中 Sandbox-->>Caller: { behavior: 'allow', reason: { type: 'sandboxOverride' } } else 不在沙箱 Caller->>Mode: 6. 检查当前模式 alt mode = 'bypass' Mode-->>Caller: { behavior: 'allow' } else mode = 'default' Mode-->>Caller: { behavior: 'ask' } else mode = 'acceptEdits' Mode-->>Caller: 编辑类 → 'allow',其他 → 'ask' end end end end end end

9.3 会话恢复时序图

sequenceDiagram participant User as 用户 participant REPL as REPL.tsx participant SS as sessionStorage.ts participant Query as query.ts User->>/resume: 恢复会话命令 REPL->>SS: resumeSession(sessionId) SS->>SS: loadTranscript(sessionId) SS->>SS: 读取 JSONL 文件(流式) SS->>SS: 截断到最后一个 compact_boundary SS->>SS: 重建 ToolUseContext SS-->>REPL: 返回 { messages, toolUseContext } REPL->>Query: yield* query(restoredParams) Query->>Query: 从恢复的消息列表继续主循环 Query-->>REPL: 继续对话 REPL-->>User: 显示恢复的会话


10. 设计模式与架构亮点总结

10.1 异步生成器模式(AsyncGenerator

使用场景query()runTools()runToolUse()优势

  • 流式产出中间结果,UI 可以实时渲染

  • 调用方通过 for await 驱动,控制节奏

  • 支持「暂停-恢复」(通过 Stream 桥接)

10.2 依赖注入模式(deps 参数)

// query.ts
type QueryDeps = {
  callModel: (options) => AsyncGenerator<...>
  microcompact: (messages, ...) => Promise<...>
  autocompact: (messages, ...) => Promise<...>
  uuid: () => string
  // ...
}

function query(params: QueryParams & { deps?: QueryDeps }) {
  const deps = params.deps ?? productionDeps()  // 生产环境使用真实实现
  // 测试环境可以传入 mock deps
}

优势:可测试性、可替换性(如 VCR 录制/回放)

10.3 分区并发模式(partitionToolCalls

问题:如何安全地并发执行工具调用?
解决:将工具调用分成「可并发批」和「必须串行批」,read-only 工具并发,write 工具串行。

10.4 规则- Hook-模式三层决策

问题:如何灵活且高效地做权限决策?解决

  1. 规则匹配(快,无 I/O)

  2. Hook 检查(灵活,可调用外部脚本)

  3. 模式检查(兜底,保证用户体验)

10.5 JSONL 持久化模式

问题:会话可能增长到 GB 级,如何高效持久化?解决

  • JSONL 格式:追加友好、流式读取

  • Tombstone 机制:处理中途失败的边缘情况

  • isTranscriptMessage() 类型守卫:明确区分对话消息和进度消息

10.6 上下文压缩三级策略

问题:对话历史太长,超过 API 限制怎么办?解决

  1. Snip Compact(快,直接删除)

  2. Micro Compact(中,缓存编辑)

  3. Auto Compact(慢,调用 API 生成摘要)


附录:文件索引

核心文件

行数

核心职责

src/query.ts

~1730

主查询循环

src/services/tools/toolOrchestration.ts

~190

工具编排(并发/串行)

src/services/tools/toolExecution.ts

~1747

单个工具执行生命周期

src/services/api/claude.ts

~3420

Anthropic API 通信

src/utils/permissions/permissions.ts

~1487

权限决策系统

src/services/compact/compact.ts

~1706

上下文自动压缩

src/utils/sessionStorage.ts

~5106

会话持久化

src/state/AppStateStore.ts

~571

应用状态管理