Learn Claude Code
🔗 原文链接 Learn Claude Code
从 0 到 1 构建 nano Claude Code-like agent,每次只加一个机制
Agent 的核心机制:所有 AI 编程 Agent 共享同一个循环:调用模型、执行工具、回传结果。生产级系统会在其上叠加策略、权限和生命周期层。
接下来,渐进式的完成 Agent 设计。
工具与执行 Tools & Execution
s01 Agent 循环
Bash is All You Need
The minimal agent kernel is a while loop + one tool. 最小的 Agent 核心是一个循环加上一个工具
每个 Agent 都是一个循环,它会持续调用模型,直到模型发出 “停止” 指令。
"One loop & Bash is all you need" -- 一个工具 + 一个循环 = 一个智能体。
- 问题
语言模型能推理代码, 但碰不到真实世界 -- 不能读文件、跑测试、看报错。没有循环, 每次工具调用你都得手动把结果粘回去。你自己就是那个循环。
- 解决方案
+--------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tool |
| prompt | | | | execute |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
(loop until stop_reason != "tool_use")
def agent_loop(query):
messages = [{"role": "user", "content": query}]
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
整个 AI Agent 的灵魂压缩到几十行代码:所有复杂的 AI Agent,底层都是这个循环,只是套了更多策略、日志、重试、安全限制。
- 循环
- 工具调用
- 结果回传,把执行工具的结果作为输出给到 LLM
- 自动迭代解决问题
- 变更内容
| 组件 | 之前 | 之后 |
|---|---|---|
| Agent loop | (无) | while True + stop_reason |
| Tools | (无) | bash (单一工具) |
| Messages | (无) | 累积式消息列表 |
| Control flow | (无) | stop_reason != "tool_use" |
s02 工具
One Handler Per Tool
The loop stays the same; new tools register into the dispatch map. 循环保持不变;新工具注册到调度映射中
字典将工具名称映射到处理函数。循环代码从未改变。
"加一个工具, 只加一个 handler" -- 循环不用动, 新工具注册进 dispatch map 就行。
- 问题
只有 bash 时, 所有操作都走 shell。cat 截断不可预测, sed 遇到特殊字符就崩, 每次 bash 调用都是不受约束的安全面。专用工具 (read_file, write_file) 可以在工具层面做路径沙箱。
关键洞察: 加工具不需要改循环。
- 解决方案
+--------+ +-------+ +------------------+
| User | ---> | LLM | ---> | Tool Dispatch |
| prompt | | | | { |
+--------+ +---+---+ | bash: run_bash |
^ | read: run_read |
| | write: run_wr |
+-----------+ edit: run_edit |
tool_result | } |
+------------------+
The dispatch map is a dict: {tool_name: handler_function}.
One lookup replaces any if/elif chain.
# 1. 每个工具有一个处理函数。路径沙箱防止逃逸工作区。
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_read(path: str, limit: int = None) -> str:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50000]
# 2. dispatch map 将工具名映射到处理函数。
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"],
kw["new_text"]),
}
# 3. 循环中按名称查找处理函数。循环体本身与 s01 完全一致。
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler \
else f"Unknown tool: {block.name}"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
- 变更内容
| 组件 | 之前 | 之后 |
|---|---|---|
| Tools | 1 (仅 bash) | 4 (bash, read, write, edit) |
| Dispatch | 硬编码 bash 调用 | TOOL_HANDLERS 字典 |
| 路径安全 | 无 | safe_path() 沙箱 |
| Agent loop | 不变 | 不变 |
规划与协调 Planning & Coordination
s03 TodoWrite
Plan Before You Act
An agent without a plan drifts; list the steps first, then execute. 没有计划的智能体就会漫无目的地漂泊;先列出步骤,然后再执行

"没有计划的 agent 走哪算哪" -- 先列步骤再动手, 完成率翻倍。
- 问题
多步任务中, 模型会丢失进度 -- 重复做过的事、跳步、跑偏。对话越长越严重: 工具结果不断填满上下文, 系统提示的影响力逐渐被稀释。一个 10 步重构可能做完 1-3 步就开始即兴发挥, 因为 4-10 步已经被挤出注意力了。
- 解决方案
制定详细的计划, 并设置提醒,模型会更有方向。
+--------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tools |
| prompt | | | | + todo |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
|
+-----------+-----------+
| TodoManager state |
| [ ] task A |
| [>] task B <- doing |
| [x] task C |
+-----------------------+
|
if rounds_since_todo >= 3:
inject <reminder> into tool_result
# 1. TodoManager 存储带状态的项目。同一时间只允许一个 in_progress。
class TodoManager:
def update(self, items: list) -> str:
validated, in_progress_count = [], 0
for item in items:
status = item.get("status", "pending")
if status == "in_progress":
in_progress_count += 1
validated.append({"id": item["id"], "text": item["text"],
"status": status})
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress")
self.items = validated
return self.render()
# 2. todo 工具和其他工具一样加入 dispatch map。
TOOL_HANDLERS = {
# ...base tools...
"todo": lambda **kw: TODO.update(kw["items"]),
}
# 3. nag reminder: 模型连续 3 轮以上不调用 todo 时注入提醒。
if rounds_since_todo >= 3 and messages:
last = messages[-1]
if last["role"] == "user" and isinstance(last.get("content"), list):
last["content"].insert(0, {
"type": "text",
"text": "<reminder>Update your todos.</reminder>",
})
"同时只能有一个 in_progress" 强制顺序聚焦。nag reminder 制造问责压力 -- 你不更新计划, 系统就追着你问。
- 变更内容
| 组件 | 之前 (s02) | 之后 (s03) |
|---|---|---|
| Tools 工具 | 4 | 5 (+todo) |
| 规划 | 无 | 带状态的 TodoManager |
| Nag 注入 | 无 | 3 轮后注入 <reminder> |
| Agent loop 智能体循环 | 简单分发 | + rounds_since_todo 计数器 |
s04 子 Agent
Clean Context Per Subtask
Subagents use independent messages[], keeping the main conversation clean 子智能体使用独立的消息数组,保持主对话的整洁

"大任务拆小, 每个小任务干净的上下文" -- 子智能体用独立 messages[], 不污染主对话。
- 问题
智能体工作越久, messages 数组越胖。每次读文件、跑命令的输出都永久留在上下文里。"这个项目用什么测试框架?" 可能要读 5 个文件, 但父智能体只需要一个词: "pytest。"
- 解决方案 智能体记了一大堆过程,上下文也会越来越长,但其实只需要最后一个结论。
Parent agent Subagent
+------------------+ +------------------+
| messages=[...] | | messages=[] | <-- fresh
| | dispatch | |
| tool: task | ----------> | while tool_use: |
| prompt="..." | | call tools |
| | summary | append results |
| result = "..." | <---------- | return last text |
+------------------+ +------------------+
Parent context stays clean. Subagent context is discarded.
# 1. 父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具 (禁止递归生成)。
# Child gets all base tools except task (no recursive spawning)
CHILD_TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
]
PARENT_TOOLS = CHILD_TOOLS + [
{"name": "task",
"description": "Spawn a subagent with fresh context.",
"input_schema": {
"type": "object",
"properties": {"prompt": {"type": "string"}},
"required": ["prompt"],
}},
]
# 2. 子智能体以 messages=[] 启动, 运行自己的循环。只有最终文本返回给父智能体。
def run_subagent(prompt: str) -> str:
sub_messages = [{"role": "user", "content": prompt}]
for _ in range(30): # safety limit
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({"role": "assistant",
"content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)[:50000]})
sub_messages.append({"role": "user", "content": results})
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
子智能体可能跑了 30+ 次工具调用, 但整个消息历史直接丢弃。父智能体收到的只是一段摘要文本, 作为普通 tool_result 返回。
- 变更内容
| 组件 | 之前 (s03) | 之后 (s04) |
|---|---|---|
| Tools | 5 | 5 (基础) + task (仅父端) |
| 上下文 | 单一共享 | 父 + 子隔离 |
| Subagent | 无 | run_subagent() 函数 |
| 返回值 | 不适用 | 仅摘要文本 |
s05 按需技能加载
Load on Demand
Inject knowledge via tool_result when needed, not upfront in the system prompt 需要时通过 tool_result 注入知识,而不是预先在系统提示中注入。

"用到什么知识, 临时加载什么知识" -- 通过 tool_result 注入, 不塞 system prompt。
-
问题 你希望智能体遵循特定领域的工作流: git 约定、测试模式、代码审查清单。全塞进系统提示太浪费 -- 10 个技能, 每个 2000 token, 就是 20,000 token, 大部分跟当前任务毫无关系。
-
解决方案
System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent. |
| Skills available: |
| - git: Git workflow helpers | ~100 tokens/skill
| - test: Testing best practices |
+--------------------------------------+
When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand): |
| <skill name="git"> |
| Full git workflow instructions... | ~2000 tokens
| Step 1: ... |
| </skill> |
# 1. 每个技能是一个目录, 包含 SKILL.md 文件和 YAML frontmatter。
skills/
pdf/
SKILL.md # ---\n name: pdf\n description: Process PDF files\n ---\n ...
code-review/
SKILL.md # ---\n name: code-review\n description: Review code\n ---\n ...
# 2. SkillLoader 递归扫描 SKILL.md 文件, 用目录名作为技能标识。
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
for f in sorted(skills_dir.rglob("SKILL.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body}
def get_descriptions(self) -> str:
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "")
lines.append(f" - {name}: {desc}")
return "\n".join(lines)
def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'."
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
# 3. 第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""
TOOL_HANDLERS = {
# ...base tools...
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}
模型知道有哪些技能 (便宜), 需要时再加载完整内容 (贵)。
- 变更内容
| 组件 | 之前 (s04) | 之后 (s05) |
|---|---|---|
| Tools | 5 (基础 + task) | 5 (基础 + load_skill) |
| 系统提示 | 静态字符串 | + 技能描述列表 |
| 知识库 | 无 | skills/*/SKILL.md 文件 |
| 注入方式 | 无 | 两层 (系统提示 + result) |
s07 任务系统
Task Graph + Dependencies
A file-based task graph with ordering, parallelism, and dependencies -- the coordination backbone for multi-agent work 一个基于文件的任务图,具有排序、并行性和依赖性——多智能体工作的协调核心

"大目标要拆成小任务, 排好序, 记在磁盘上" -- 文件持久化的任务图, 为多 agent 协作打基础。
- 问题
s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖、状态只有做完没做完。真实目标是有结构的 -- 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。
没有显式的关系, 智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。
- 解决方案
把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。任务图随时回答三个问题:
- 什么可以做? -- 状态为 pending 且 blockedBy 为空的任务。
- 什么被卡住? -- 等待前置任务完成的任务。
- 什么做完了? -- 状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}
任务图 (DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+
顺序: task 1 必须先完成, 才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed
这个任务图是 s07 之后所有机制的协调骨架: 后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。
# 1. TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def create(self, subject, description=""):
task = {"id": self._next_id, "subject": subject,
"status": "pending", "blockedBy": [],
"blocks": [], "owner": ""}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
# 2. 依赖解除: 完成任务时, 自动将其 ID 从其他任务的 blockedBy 中移除, 解锁后续任务。
def _clear_dependency(self, completed_id):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
# 3. 状态变更 + 依赖关联: update 处理状态转换和依赖边。
def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
task = self._load(task_id)
if status:
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
self._save(task)
# 4. 四个任务工具加入 dispatch map。
TOOL_HANDLERS = {
# ...base tools...
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}
- 相比 s06 变更内容
| 组件 | 之前 (s06) | 之后 (s07) |
|---|---|---|
| Tools 工具 | 5 | 8 (task_create/update/list/get) |
| 规划模型 | 扁平清单 (仅内存) | 带依赖关系的任务图 (磁盘) |
| 关系 | 无 | blockedBy + blocks 边 |
| 状态追踪 | 做完没做完 | pending -> in_progress -> completed |
| 持久化 | 压缩后丢失 | 压缩和重启后存活 |
内存管理 Memory Management
s06 下文压缩
Three-Layer Compression
Context will fill up; three-layer compression strategy enables infinite sessions 上下文会被填满;三层压缩策略支持无限会话

"上下文总会满, 要有办法腾地方" -- 三层压缩策略, 换来无限会话。
Stage 1: Micro-Compact
用简短摘要替换旧的 tool_results。这一过程是自动的,对模型而言是透明的。
Stage 2: Auto-Compact
整个对话被总结成一个简洁的区块。在达到令牌阈值时触发。
Stage 3: /compact
用户触发,极具侵略性。三层策略性遗忘实现无限会话。
- 问题
上下文窗口是有限的。读一个 1000 行的文件就吃掉 ~4000 token; 读 30 个文件、跑 20 条命令, 轻松突破 100k token。不压缩, 智能体根本没法在大项目里干活。
- 解决方案
三层压缩, 激进程度递增:
Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace tool_result > 3 turns old
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save transcript to .transcripts/
LLM summarizes conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact explicitly.
Same summarization as auto_compact.
# 1. 第一层 -- micro_compact: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。
def micro_compact(messages: list) -> list:
tool_results = []
for i, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for j, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((i, j, part))
if len(tool_results) <= KEEP_RECENT:
return messages
for _, _, part in tool_results[:-KEEP_RECENT]:
if len(part.get("content", "")) > 100:
part["content"] = f"[Previous: used {tool_name}]"
return messages
# 2. 第二层 -- auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。
def auto_compact(messages: list) -> list:
# Save transcript for recovery
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
# LLM summarizes
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity..."
+ json.dumps(messages, default=str)[:80000]}],
max_tokens=2000,
)
return [
{"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
{"role": "assistant", "content": "Understood. Continuing."},
]
# 3. 第三层 -- manual compact: compact 工具按需触发同样的摘要机制。
# 4. 循环整合三层:
def agent_loop(messages: list):
while True:
micro_compact(messages) # Layer 1
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages) # Layer 2
response = client.messages.create(...)
# ... tool execution ...
if manual_compact:
messages[:] = auto_compact(messages) # Layer 3
- 相对 s05 的变更内容
| 组件 | 之前 (s05) | 之后 (s06) |
|---|---|---|
| Tools | 5 | 5 (基础 + compact) |
| 上下文管理 | 无 | 三层压缩 |
| Micro-compact | 无 | 旧结果 -> 占位符 |
| Auto-compact | 无 | token 阈值触发 |
| Transcripts | 无 | 保存到 .transcripts/ |
并发 Concurrency
s08 后台任务
Background Threads + Notifications
Run slow operations in the background; the agent keeps thinking ahead 在后台运行缓慢的操作;智能体会持续提前思考

"慢操作丢后台, agent 继续想下一步" -- 后台线程跑命令, 完成后注入通知。
- 问题
有些命令要跑好几分钟: npm install、pytest、docker build。阻塞式循环下模型只能干等。用户说 "装依赖, 顺便建个配置文件", 智能体却只能一个一个来。
- 解决方案
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- results injected before next LLM call --+
- 相对 s07 的变更
| 组件 | 之前 (s07) | 之后 (s08) |
|---|---|---|
| Tools | 8 | 6 (基础 + background_run + check) |
| 执行方式 | 仅阻塞 | 阻塞 + 后台线程 |
| 通知机制 | 无 | 每轮排空的队列 |
| 并发 | 无 | 守护线程 |
协作 Collaboration
s09 Agent 团队
Teammates + Mailboxes
When one agent can't finish, delegate to persistent teammates via async mailboxes 当一个智能体无法完成任务时,通过异步邮箱将任务委托给持续在线的队友

"任务太大一个人干不完, 要能分给队友" -- 持久化队友 + JSONL 邮箱。
- 问题
子智能体 (s04) 是一次性的: 生成、干活、返回摘要、消亡。没有身份, 没有跨调用的记忆。后台任务 (s08) 能跑 shell 命令, 但做不了 LLM 引导的决策。
真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久智能体, (2) 身份和生命周期管理, (3) 智能体之间的通信通道。
- 解决方案
解决方案
Teammate lifecycle:
spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN
Communication:
.team/
config.json <- team roster + statuses
inbox/
alice.jsonl <- append-only, drain-on-read
bob.jsonl
lead.jsonl
+--------+ send("alice","bob","...") +--------+
| alice | -----------------------------> | bob |
| loop | bob.jsonl << {json_line} | loop |
+--------+ +--------+
^ |
| BUS.read_inbox("alice") |
+---- alice.jsonl -> read + drain ---------+
- 相对 s08 的变更
| 组件 | 之前 (s08) | 之后 (s09) |
|---|---|---|
| Tools | 6 | 9 (+spawn/send/read_inbox) |
| 智能体数量 | 单一 | 领导 + N 个队友 |
| 持久化 | 无 | config.json + JSONL 收件箱 |
| 线程 | 后台命令 | 每线程完整 agent loop |
| 生命周期 | 一次性 | idle -> working -> idle |
| 通信 | 无 | message + broadcast |
s10 团队协议
Shared Communication Rules
One request-response pattern drives all team negotiation 一种请求-响应模式主导着所有团队协商。

"队友之间要有统一的沟通规矩" -- 一个 request-response 模式驱动所有协商。
- 问题
s09 中队友能干活能通信, 但缺少结构化协调:
关机: 直接杀线程会留下写了一半的文件和过期的 config.json。需要握手 -- 领导请求, 队友批准 (收尾退出) 或拒绝 (继续干)。
计划审批: 领导说 "重构认证模块", 队友立刻开干。高风险变更应该先过审。
两者结构一样: 一方发带唯一 ID 的请求, 另一方引用同一 ID 响应。
- 解决方案
Shutdown Protocol Plan Approval Protocol
================== ======================
Lead Teammate Teammate Lead
| | | |
|--shutdown_req-->| |--plan_req------>|
| {req_id:"abc"} | | {req_id:"xyz"} |
| | | |
|<--shutdown_resp-| |<--plan_resp-----|
| {req_id:"abc", | | {req_id:"xyz", |
| approve:true} | | approve:true} |
Shared FSM:
[pending] --approve--> [approved]
[pending] --reject---> [rejected]
Trackers:
shutdown_requests = {req_id: {target, status}}
plan_requests = {req_id: {from, plan, status}}
s11 自主 Agent
Scan Board, Claim Tasks 扫描看板,认领任务
Teammates scan the board and claim tasks themselves; no need for the lead to assign each one 队友们查看看板并自行认领任务;无需负责人逐一分配。

"队友自己看看板, 有活就认领" -- 不需要领导逐个分配, 自组织。
- 问题
s09-s10 中, 队友只在被明确指派时才动。领导得给每个队友写 prompt, 任务看板上 10 个未认领的任务得手动分配。这扩展不了。
真正的自治: 队友自己扫描任务看板, 认领没人做的任务, 做完再找下一个。
一个细节: 上下文压缩 (s06) 后智能体可能忘了自己是谁。身份重注入解决这个问题。
- 解决方案
Teammate lifecycle with idle cycle:
+-------+
| spawn |
+---+---+
|
v
+-------+ tool_use +-------+
| WORK | <------------- | LLM |
+---+---+ +-------+
|
| stop_reason != tool_use (or idle tool called)
v
+--------+
| IDLE | poll every 5s for up to 60s
+---+----+
|
+---> check inbox --> message? ----------> WORK
|
+---> scan .tasks/ --> unclaimed? -------> claim -> WORK
|
+---> 60s timeout ----------------------> SHUTDOWN
Identity re-injection after compression:
if len(messages) <= 3:
messages.insert(0, identity_block)
- 相对 s10 的变更
| 组件 | 之前 (s10) | 之后 (s11) |
|---|---|---|
| Tools | 12 | 14 (+idle, +claim_task) |
| 自治性 | 领导指派 | 自组织 |
| 空闲阶段 | 无 | 轮询收件箱 + 任务看板 |
| 任务认领 | 仅手动 | 自动认领未分配任务 |
| 身份 | 系统提示 | + 压缩后重注入 |
| 超时 | 无 | 60 秒空闲 -> 自动关机 |
s12 Worktree + 任务隔离
Isolate by Directory
Each works in its own directory; tasks manage goals, worktrees manage directories, bound by ID 每个都在自己的目录中运行;任务管理目标,工作树管理目录,它们通过ID绑定。
"各干各的目录, 互不干扰" -- 任务管目标, worktree 管目录, 按 ID 绑定。

- 问题
到 s11, 智能体已经能自主认领和完成任务。但所有任务共享一个目录。两个智能体同时重构不同模块 -- A 改 config.py, B 也改 config.py, 未提交的改动互相污染, 谁也没法干净回滚。
任务板管 "做什么" 但不管 "在哪做"。解法: 给每个任务一个独立的 git worktree 目录, 用任务 ID 把两边关联起来。
- 解决方案
Control plane (.tasks/) Execution plane (.worktrees/)
+------------------+ +------------------------+
| task_1.json | | auth-refactor/ |
| status: in_progress <------> branch: wt/auth-refactor
| worktree: "auth-refactor" | task_id: 1 |
+------------------+ +------------------------+
| task_2.json | | ui-login/ |
| status: pending <------> branch: wt/ui-login
| worktree: "ui-login" | task_id: 2 |
+------------------+ +------------------------+
|
index.json (worktree registry)
events.jsonl (lifecycle log)
State machines:
Task: pending -> in_progress -> completed
Worktree: absent -> active -> removed | kept
- 相对 s11 的变更
| 组件 | 之前 (s11) | 之后 (s12) |
|---|---|---|
| 协调 | 任务板 (owner/status) | 任务板 + worktree 显式绑定 |
| 执行范围 | 共享目录 | 每个任务独立目录 |
| 可恢复性 | 仅任务状态 | 任务状态 + worktree 索引 |
| 收尾 | 任务完成 | 任务完成 + 显式 keep/remove |
| 生命周期可见性 | 隐式日志 | .worktrees/events.jsonl 显式事件流 |