The Orchestrator Pattern: Coordinating Complex AI Agent Workflows
Single agents hit walls. They run out of context, lack specialized skills, and struggle with complex multi-step tasks. The solution? Don't build one super-agent—build an orchestrator that coordinates many specialized agents.
The Orchestrator Pattern is how you build AI systems that tackle enterprise-grade complexity: routing tasks to the right specialists, managing dependencies, handling failures, and synthesizing results.
This guide shows you how to build orchestrators that turn chaos into coordination.
What Is the Orchestrator Pattern?
An orchestrator is a meta-agent that doesn't do the work itself—it decides who should do the work and when:
| 1 | ┌─────────────────────────────────────────────────────────────┐ |
| 2 | │ ORCHESTRATOR │ |
| 3 | │ │ |
| 4 | │ "Analyze sales data, create visualizations, │ |
| 5 | │ and write an executive summary" │ |
| 6 | │ │ |
| 7 | │ │ │ |
| 8 | │ ▼ │ |
| 9 | │ ┌─────────────────────┐ │ |
| 10 | │ │ Task Decomposer │ │ |
| 11 | │ └──────────┬──────────┘ │ |
| 12 | │ │ │ |
| 13 | │ ┌───────────────┼───────────────┐ │ |
| 14 | │ ▼ ▼ ▼ │ |
| 15 | │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ |
| 16 | │ │ Data │ │ Viz │ │ Writer │ │ |
| 17 | │ │ Analyst │ │ Agent │ │ Agent │ │ |
| 18 | │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ |
| 19 | │ │ │ │ │ |
| 20 | │ └───────────────┼───────────────┘ │ |
| 21 | │ ▼ │ |
| 22 | │ ┌─────────────────────┐ │ |
| 23 | │ │ Result Synthesizer │ │ |
| 24 | │ └─────────────────────┘ │ |
| 25 | │ │ │ |
| 26 | │ ▼ │ |
| 27 | │ Final Output │ |
| 28 | │ │ |
| 29 | └─────────────────────────────────────────────────────────────┘ |
| 30 | |
The orchestrator handles:
- Task decomposition: Breaking complex tasks into subtasks
- Agent selection: Routing each subtask to the right specialist
- Dependency management: Ensuring correct execution order
- Result synthesis: Combining outputs into a coherent whole
- Error handling: Retrying, rerouting, or escalating failures
Why Orchestration Matters
1. Specialization Beats Generalization
One agent trying to do everything:
| 1 | ❌ Jack of all trades, master of none |
| 2 | ❌ Context window filled with irrelevant instructions |
| 3 | ❌ Conflicting objectives in one prompt |
| 4 | |
Specialized agents with orchestration:
| 1 | ✅ Each agent masters its domain |
| 2 | ✅ Focused context for each task |
| 3 | ✅ Clear, single-purpose prompts |
| 4 | |
2. Scalability
| 1 | Single Agent Orchestrated System |
| 2 | │ │ |
| 3 | ▼ ▼ |
| 4 | ┌─────────┐ ┌─────────────┐ |
| 5 | │ One LLM │ │Orchestrator │ |
| 6 | │ Call │ └──────┬──────┘ |
| 7 | └─────────┘ │ |
| 8 | ┌─────────┼─────────┐ |
| 9 | ▼ ▼ ▼ |
| 10 | ┌───────┐ ┌───────┐ ┌───────┐ |
| 11 | │Agent 1│ │Agent 2│ │Agent 3│ |
| 12 | └───────┘ └───────┘ └───────┘ |
| 13 | │ │ │ |
| 14 | └─────────┼─────────┘ |
| 15 | ▼ |
| 16 | Run in parallel = 3x faster |
| 17 | |
3. Fault Isolation
When one agent fails:
- Without orchestration: Entire task fails
- With orchestration: Retry, use backup agent, or gracefully degrade
Basic Orchestrator Implementation
Here's a complete, minimal orchestrator:
| 1 | import openai |
| 2 | import json |
| 3 | from dataclasses import dataclass |
| 4 | from enum import Enum |
| 5 | from typing import Callable |
| 6 | import concurrent.futures |
| 7 | |
| 8 | class TaskStatus(Enum): |
| 9 | PENDING = "pending" |
| 10 | RUNNING = "running" |
| 11 | COMPLETED = "completed" |
| 12 | FAILED = "failed" |
| 13 | |
| 14 | @dataclass |
| 15 | class Task: |
| 16 | id: str |
| 17 | description: str |
| 18 | agent_type: str |
| 19 | dependencies: list[str] |
| 20 | status: TaskStatus = TaskStatus.PENDING |
| 21 | result: str = None |
| 22 | error: str = None |
| 23 | |
| 24 | @dataclass |
| 25 | class Agent: |
| 26 | name: str |
| 27 | description: str |
| 28 | execute: Callable[[str, dict], str] |
| 29 | |
| 30 | class Orchestrator: |
| 31 | def __init__(self, agents: dict[str, Agent]): |
| 32 | self.client = openai.OpenAI() |
| 33 | self.agents = agents |
| 34 | self.tasks: dict[str, Task] = {} |
| 35 | self.results: dict[str, str] = {} |
| 36 | |
| 37 | def run(self, goal: str) -> dict: |
| 38 | """Orchestrate agents to achieve the goal""" |
| 39 | |
| 40 | # Phase 1: Decompose into tasks |
| 41 | tasks = self._decompose(goal) |
| 42 | self.tasks = {t.id: t for t in tasks} |
| 43 | |
| 44 | print(f"Created {len(tasks)} tasks") |
| 45 | |
| 46 | # Phase 2: Execute tasks respecting dependencies |
| 47 | while not self._all_complete(): |
| 48 | # Find tasks ready to run |
| 49 | ready = self._get_ready_tasks() |
| 50 | |
| 51 | if not ready: |
| 52 | if self._has_failures(): |
| 53 | break |
| 54 | continue |
| 55 | |
| 56 | # Execute ready tasks in parallel |
| 57 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| 58 | futures = { |
| 59 | executor.submit(self._execute_task, task): task |
| 60 | for task in ready |
| 61 | } |
| 62 | |
| 63 | for future in concurrent.futures.as_completed(futures): |
| 64 | task = futures[future] |
| 65 | try: |
| 66 | result = future.result() |
| 67 | task.status = TaskStatus.COMPLETED |
| 68 | task.result = result |
| 69 | self.results[task.id] = result |
| 70 | except Exception as e: |
| 71 | task.status = TaskStatus.FAILED |
| 72 | task.error = str(e) |
| 73 | |
| 74 | # Phase 3: Synthesize results |
| 75 | if self._has_failures(): |
| 76 | return { |
| 77 | "success": False, |
| 78 | "completed": [t.id for t in self.tasks.values() if t.status == TaskStatus.COMPLETED], |
| 79 | "failed": [t.id for t in self.tasks.values() if t.status == TaskStatus.FAILED], |
| 80 | "partial_results": self.results |
| 81 | } |
| 82 | |
| 83 | final_result = self._synthesize(goal, self.results) |
| 84 | |
| 85 | return { |
| 86 | "success": True, |
| 87 | "result": final_result, |
| 88 | "tasks_completed": len(self.tasks) |
| 89 | } |
| 90 | |
| 91 | def _decompose(self, goal: str) -> list[Task]: |
| 92 | """Break goal into tasks with dependencies""" |
| 93 | |
| 94 | agent_descriptions = "\n".join([ |
| 95 | f"- {name}: {agent.description}" |
| 96 | for name, agent in self.agents.items() |
| 97 | ]) |
| 98 | |
| 99 | response = self.client.chat.completions.create( |
| 100 | model="gpt-4o", |
| 101 | messages=[{ |
| 102 | "role": "system", |
| 103 | "content": f"""Decompose this goal into tasks for available agents. |
| 104 | |
| 105 | Available agents: |
| 106 | {agent_descriptions} |
| 107 | |
| 108 | Return JSON: |
| 109 | {{ |
| 110 | "tasks": [ |
| 111 | {{ |
| 112 | "id": "task_1", |
| 113 | "description": "What to do", |
| 114 | "agent_type": "agent_name", |
| 115 | "dependencies": [] |
| 116 | }}, |
| 117 | {{ |
| 118 | "id": "task_2", |
| 119 | "description": "Next task", |
| 120 | "agent_type": "agent_name", |
| 121 | "dependencies": ["task_1"] |
| 122 | }} |
| 123 | ] |
| 124 | }} |
| 125 | |
| 126 | Rules: |
| 127 | - Break into 2-8 tasks |
| 128 | - Each task should be focused and achievable |
| 129 | - List dependencies (tasks that must complete first) |
| 130 | - Assign to the most appropriate agent""" |
| 131 | }, { |
| 132 | "role": "user", |
| 133 | "content": goal |
| 134 | }], |
| 135 | response_format={"type": "json_object"} |
| 136 | ) |
| 137 | |
| 138 | data = json.loads(response.choices[0].message.content) |
| 139 | |
| 140 | return [ |
| 141 | Task( |
| 142 | id=t["id"], |
| 143 | description=t["description"], |
| 144 | agent_type=t["agent_type"], |
| 145 | dependencies=t.get("dependencies", []) |
| 146 | ) |
| 147 | for t in data["tasks"] |
| 148 | ] |
| 149 | |
| 150 | def _get_ready_tasks(self) -> list[Task]: |
| 151 | """Get tasks whose dependencies are all complete""" |
| 152 | ready = [] |
| 153 | for task in self.tasks.values(): |
| 154 | if task.status != TaskStatus.PENDING: |
| 155 | continue |
| 156 | |
| 157 | deps_complete = all( |
| 158 | self.tasks[dep].status == TaskStatus.COMPLETED |
| 159 | for dep in task.dependencies |
| 160 | ) |
| 161 | |
| 162 | if deps_complete: |
| 163 | ready.append(task) |
| 164 | |
| 165 | return ready |
| 166 | |
| 167 | def _execute_task(self, task: Task) -> str: |
| 168 | """Execute a single task using the appropriate agent""" |
| 169 | task.status = TaskStatus.RUNNING |
| 170 | |
| 171 | agent = self.agents.get(task.agent_type) |
| 172 | if not agent: |
| 173 | raise ValueError(f"Unknown agent type: {task.agent_type}") |
| 174 | |
| 175 | # Gather context from dependencies |
| 176 | context = { |
| 177 | dep: self.results[dep] |
| 178 | for dep in task.dependencies |
| 179 | } |
| 180 | |
| 181 | return agent.execute(task.description, context) |
| 182 | |
| 183 | def _all_complete(self) -> bool: |
| 184 | return all( |
| 185 | t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED] |
| 186 | for t in self.tasks.values() |
| 187 | ) |
| 188 | |
| 189 | def _has_failures(self) -> bool: |
| 190 | return any(t.status == TaskStatus.FAILED for t in self.tasks.values()) |
| 191 | |
| 192 | def _synthesize(self, goal: str, results: dict) -> str: |
| 193 | """Combine task results into final output""" |
| 194 | |
| 195 | results_text = "\n\n".join([ |
| 196 | f"=== {task_id} ===\n{result}" |
| 197 | for task_id, result in results.items() |
| 198 | ]) |
| 199 | |
| 200 | response = self.client.chat.completions.create( |
| 201 | model="gpt-4o", |
| 202 | messages=[{ |
| 203 | "role": "system", |
| 204 | "content": "Synthesize these task results into a coherent final response." |
| 205 | }, { |
| 206 | "role": "user", |
| 207 | "content": f"Goal: {goal}\n\nTask Results:\n{results_text}" |
| 208 | }] |
| 209 | ) |
| 210 | |
| 211 | return response.choices[0].message.content |
| 212 | |
| 213 | |
| 214 | # Define specialized agents |
| 215 | def create_data_analyst(): |
| 216 | client = openai.OpenAI() |
| 217 | |
| 218 | def execute(task: str, context: dict) -> str: |
| 219 | response = client.chat.completions.create( |
| 220 | model="gpt-4o", |
| 221 | messages=[{ |
| 222 | "role": "system", |
| 223 | "content": "You are a data analyst. Analyze data and provide insights." |
| 224 | }, { |
| 225 | "role": "user", |
| 226 | "content": f"Task: {task}\n\nContext: {json.dumps(context)}" |
| 227 | }] |
| 228 | ) |
| 229 | return response.choices[0].message.content |
| 230 | |
| 231 | return Agent( |
| 232 | name="data_analyst", |
| 233 | description="Analyzes data, finds patterns, calculates statistics", |
| 234 | execute=execute |
| 235 | ) |
| 236 | |
| 237 | def create_writer(): |
| 238 | client = openai.OpenAI() |
| 239 | |
| 240 | def execute(task: str, context: dict) -> str: |
| 241 | response = client.chat.completions.create( |
| 242 | model="gpt-4o", |
| 243 | messages=[{ |
| 244 | "role": "system", |
| 245 | "content": "You are a professional writer. Create clear, engaging content." |
| 246 | }, { |
| 247 | "role": "user", |
| 248 | "content": f"Task: {task}\n\nContext: {json.dumps(context)}" |
| 249 | }] |
| 250 | ) |
| 251 | return response.choices[0].message.content |
| 252 | |
| 253 | return Agent( |
| 254 | name="writer", |
| 255 | description="Writes reports, summaries, and documentation", |
| 256 | execute=execute |
| 257 | ) |
| 258 | |
| 259 | def create_coder(): |
| 260 | from hopx import Sandbox |
| 261 | client = openai.OpenAI() |
| 262 | |
| 263 | def execute(task: str, context: dict) -> str: |
| 264 | # Generate code |
| 265 | response = client.chat.completions.create( |
| 266 | model="gpt-4o", |
| 267 | messages=[{ |
| 268 | "role": "system", |
| 269 | "content": "Write Python code to accomplish the task. Output only code." |
| 270 | }, { |
| 271 | "role": "user", |
| 272 | "content": f"Task: {task}\n\nContext: {json.dumps(context)}" |
| 273 | }] |
| 274 | ) |
| 275 | |
| 276 | code = response.choices[0].message.content |
| 277 | |
| 278 | # Execute in sandbox |
| 279 | sandbox = Sandbox.create(template="code-interpreter") |
| 280 | try: |
| 281 | sandbox.files.write("/app/task.py", code) |
| 282 | result = sandbox.commands.run("python /app/task.py") |
| 283 | return result.stdout if result.exit_code == 0 else f"Error: {result.stderr}" |
| 284 | finally: |
| 285 | sandbox.kill() |
| 286 | |
| 287 | return Agent( |
| 288 | name="coder", |
| 289 | description="Writes and executes Python code for data processing and analysis", |
| 290 | execute=execute |
| 291 | ) |
| 292 | |
| 293 | |
| 294 | # Usage |
| 295 | orchestrator = Orchestrator({ |
| 296 | "data_analyst": create_data_analyst(), |
| 297 | "writer": create_writer(), |
| 298 | "coder": create_coder() |
| 299 | }) |
| 300 | |
| 301 | result = orchestrator.run( |
| 302 | "Analyze our Q4 sales data, identify the top 3 trends, " |
| 303 | "create visualizations, and write an executive summary." |
| 304 | ) |
| 305 | |
| 306 | print(result) |
| 307 | |
Orchestration Patterns
Pattern 1: Sequential Pipeline
Tasks flow in a fixed order:
| 1 | Input → Agent A → Agent B → Agent C → Output |
| 2 | |
| 1 | class PipelineOrchestrator: |
| 2 | def __init__(self, stages: list[Agent]): |
| 3 | self.stages = stages |
| 4 | |
| 5 | def run(self, input_data: str) -> str: |
| 6 | current = input_data |
| 7 | |
| 8 | for stage in self.stages: |
| 9 | print(f"Running stage: {stage.name}") |
| 10 | current = stage.execute(current, {}) |
| 11 | |
| 12 | return current |
| 13 | |
| 14 | |
| 15 | # Usage |
| 16 | pipeline = PipelineOrchestrator([ |
| 17 | extract_agent, # Extract key information |
| 18 | transform_agent, # Transform data |
| 19 | analyze_agent, # Analyze patterns |
| 20 | report_agent # Generate report |
| 21 | ]) |
| 22 | |
| 23 | result = pipeline.run(raw_document) |
| 24 | |
Pattern 2: Router/Dispatcher
Route tasks to specialized agents based on content:
| 1 | class RouterOrchestrator: |
| 2 | def __init__(self, agents: dict[str, Agent]): |
| 3 | self.client = openai.OpenAI() |
| 4 | self.agents = agents |
| 5 | |
| 6 | def run(self, task: str) -> str: |
| 7 | # Classify the task |
| 8 | agent_name = self._route(task) |
| 9 | |
| 10 | # Execute with selected agent |
| 11 | agent = self.agents[agent_name] |
| 12 | return agent.execute(task, {}) |
| 13 | |
| 14 | def _route(self, task: str) -> str: |
| 15 | agent_options = "\n".join([ |
| 16 | f"- {name}: {agent.description}" |
| 17 | for name, agent in self.agents.items() |
| 18 | ]) |
| 19 | |
| 20 | response = self.client.chat.completions.create( |
| 21 | model="gpt-4o", |
| 22 | messages=[{ |
| 23 | "role": "user", |
| 24 | "content": f"""Which agent should handle this task? |
| 25 | |
| 26 | Task: {task} |
| 27 | |
| 28 | Agents: |
| 29 | {agent_options} |
| 30 | |
| 31 | Reply with just the agent name.""" |
| 32 | }] |
| 33 | ) |
| 34 | |
| 35 | return response.choices[0].message.content.strip() |
| 36 | |
| 37 | |
| 38 | # Usage |
| 39 | router = RouterOrchestrator({ |
| 40 | "code": code_agent, |
| 41 | "writing": writing_agent, |
| 42 | "research": research_agent, |
| 43 | "math": math_agent |
| 44 | }) |
| 45 | |
| 46 | # Automatically routes to appropriate agent |
| 47 | result = router.run("Write a function to calculate compound interest") |
| 48 | |
Pattern 3: Hierarchical Orchestration
Orchestrators managing other orchestrators:
| 1 | ┌─────────────────┐ |
| 2 | │ Master │ |
| 3 | │ Orchestrator │ |
| 4 | └────────┬────────┘ |
| 5 | │ |
| 6 | ┌─────────────────┼─────────────────┐ |
| 7 | ▼ ▼ ▼ |
| 8 | ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ |
| 9 | │ Research │ │ Development │ │ QA │ |
| 10 | │ Orchestrator│ │ Orchestrator│ │ Orchestrator│ |
| 11 | └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ |
| 12 | │ │ │ |
| 13 | ┌───┼───┐ ┌───┼───┐ ┌───┼───┐ |
| 14 | ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ |
| 15 | A1 A2 A3 A4 A5 A6 A7 A8 A9 |
| 16 | |
| 1 | class HierarchicalOrchestrator: |
| 2 | def __init__(self, sub_orchestrators: dict[str, Orchestrator]): |
| 3 | self.client = openai.OpenAI() |
| 4 | self.sub_orchestrators = sub_orchestrators |
| 5 | |
| 6 | def run(self, goal: str) -> dict: |
| 7 | # Decompose into high-level phases |
| 8 | phases = self._plan_phases(goal) |
| 9 | |
| 10 | results = {} |
| 11 | for phase in phases: |
| 12 | sub_orch = self.sub_orchestrators[phase["orchestrator"]] |
| 13 | result = sub_orch.run(phase["goal"]) |
| 14 | results[phase["name"]] = result |
| 15 | |
| 16 | return self._synthesize(goal, results) |
| 17 | |
| 18 | def _plan_phases(self, goal: str) -> list[dict]: |
| 19 | orchestrator_list = ", ".join(self.sub_orchestrators.keys()) |
| 20 | |
| 21 | response = self.client.chat.completions.create( |
| 22 | model="gpt-4o", |
| 23 | messages=[{ |
| 24 | "role": "user", |
| 25 | "content": f"""Break this goal into phases. |
| 26 | |
| 27 | Goal: {goal} |
| 28 | |
| 29 | Available orchestrators: {orchestrator_list} |
| 30 | |
| 31 | Return JSON: |
| 32 | {{"phases": [{{"name": "phase_1", "orchestrator": "name", "goal": "sub-goal"}}]}}""" |
| 33 | }], |
| 34 | response_format={"type": "json_object"} |
| 35 | ) |
| 36 | |
| 37 | return json.loads(response.choices[0].message.content)["phases"] |
| 38 | |
Pattern 4: Dynamic Agent Creation
Create agents on-the-fly based on task requirements:
| 1 | class DynamicOrchestrator: |
| 2 | def __init__(self): |
| 3 | self.client = openai.OpenAI() |
| 4 | self.agent_cache = {} |
| 5 | |
| 6 | def run(self, goal: str) -> str: |
| 7 | # Determine what agents we need |
| 8 | agent_specs = self._design_agents(goal) |
| 9 | |
| 10 | # Create or retrieve agents |
| 11 | agents = {} |
| 12 | for spec in agent_specs: |
| 13 | agent = self._get_or_create_agent(spec) |
| 14 | agents[spec["name"]] = agent |
| 15 | |
| 16 | # Create orchestrator with these agents |
| 17 | orchestrator = Orchestrator(agents) |
| 18 | return orchestrator.run(goal) |
| 19 | |
| 20 | def _design_agents(self, goal: str) -> list[dict]: |
| 21 | response = self.client.chat.completions.create( |
| 22 | model="gpt-4o", |
| 23 | messages=[{ |
| 24 | "role": "user", |
| 25 | "content": f"""Design specialized agents for this goal. |
| 26 | |
| 27 | Goal: {goal} |
| 28 | |
| 29 | Return JSON: |
| 30 | {{ |
| 31 | "agents": [ |
| 32 | {{ |
| 33 | "name": "agent_name", |
| 34 | "role": "expert role description", |
| 35 | "capabilities": ["capability1", "capability2"], |
| 36 | "system_prompt": "You are..." |
| 37 | }} |
| 38 | ] |
| 39 | }}""" |
| 40 | }], |
| 41 | response_format={"type": "json_object"} |
| 42 | ) |
| 43 | |
| 44 | return json.loads(response.choices[0].message.content)["agents"] |
| 45 | |
| 46 | def _get_or_create_agent(self, spec: dict) -> Agent: |
| 47 | cache_key = spec["name"] |
| 48 | |
| 49 | if cache_key in self.agent_cache: |
| 50 | return self.agent_cache[cache_key] |
| 51 | |
| 52 | def create_execute(system_prompt): |
| 53 | def execute(task: str, context: dict) -> str: |
| 54 | response = self.client.chat.completions.create( |
| 55 | model="gpt-4o", |
| 56 | messages=[ |
| 57 | {"role": "system", "content": system_prompt}, |
| 58 | {"role": "user", "content": f"Task: {task}\nContext: {context}"} |
| 59 | ] |
| 60 | ) |
| 61 | return response.choices[0].message.content |
| 62 | return execute |
| 63 | |
| 64 | agent = Agent( |
| 65 | name=spec["name"], |
| 66 | description=spec["role"], |
| 67 | execute=create_execute(spec["system_prompt"]) |
| 68 | ) |
| 69 | |
| 70 | self.agent_cache[cache_key] = agent |
| 71 | return agent |
| 72 | |
Error Handling and Recovery
Retry with Backoff
| 1 | class ResilientOrchestrator(Orchestrator): |
| 2 | def __init__(self, agents, max_retries=3): |
| 3 | super().__init__(agents) |
| 4 | self.max_retries = max_retries |
| 5 | |
| 6 | def _execute_task(self, task: Task) -> str: |
| 7 | last_error = None |
| 8 | |
| 9 | for attempt in range(self.max_retries): |
| 10 | try: |
| 11 | return super()._execute_task(task) |
| 12 | except Exception as e: |
| 13 | last_error = e |
| 14 | wait_time = 2 ** attempt # Exponential backoff |
| 15 | print(f"Task {task.id} failed, retrying in {wait_time}s...") |
| 16 | time.sleep(wait_time) |
| 17 | |
| 18 | raise last_error |
| 19 | |
Fallback Agents
| 1 | class FallbackOrchestrator(Orchestrator): |
| 2 | def __init__(self, agents, fallback_agents): |
| 3 | super().__init__(agents) |
| 4 | self.fallback_agents = fallback_agents |
| 5 | |
| 6 | def _execute_task(self, task: Task) -> str: |
| 7 | try: |
| 8 | return super()._execute_task(task) |
| 9 | except Exception as primary_error: |
| 10 | # Try fallback agent |
| 11 | fallback = self.fallback_agents.get(task.agent_type) |
| 12 | if fallback: |
| 13 | print(f"Primary agent failed, using fallback for {task.id}") |
| 14 | return fallback.execute(task.description, self._get_context(task)) |
| 15 | raise primary_error |
| 16 | |
Partial Results
| 1 | class GracefulOrchestrator(Orchestrator): |
| 2 | def run(self, goal: str) -> dict: |
| 3 | result = super().run(goal) |
| 4 | |
| 5 | if not result["success"]: |
| 6 | # Return what we could complete |
| 7 | completed_results = { |
| 8 | t.id: t.result |
| 9 | for t in self.tasks.values() |
| 10 | if t.status == TaskStatus.COMPLETED |
| 11 | } |
| 12 | |
| 13 | return { |
| 14 | "success": False, |
| 15 | "partial_result": self._synthesize_partial(goal, completed_results), |
| 16 | "completed_tasks": list(completed_results.keys()), |
| 17 | "failed_tasks": [t.id for t in self.tasks.values() if t.status == TaskStatus.FAILED], |
| 18 | "note": "Some tasks failed. Partial results provided." |
| 19 | } |
| 20 | |
| 21 | return result |
| 22 | |
Production Orchestrator
A complete production-ready orchestrator with monitoring:
| 1 | from hopx import Sandbox |
| 2 | import openai |
| 3 | import json |
| 4 | from datetime import datetime |
| 5 | from dataclasses import dataclass, field |
| 6 | import asyncio |
| 7 | from typing import Optional |
| 8 | import logging |
| 9 | |
| 10 | logging.basicConfig(level=logging.INFO) |
| 11 | logger = logging.getLogger("orchestrator") |
| 12 | |
| 13 | @dataclass |
| 14 | class ExecutionMetrics: |
| 15 | start_time: datetime |
| 16 | end_time: Optional[datetime] = None |
| 17 | tasks_total: int = 0 |
| 18 | tasks_completed: int = 0 |
| 19 | tasks_failed: int = 0 |
| 20 | total_tokens: int = 0 |
| 21 | |
| 22 | @property |
| 23 | def duration_seconds(self) -> float: |
| 24 | if self.end_time: |
| 25 | return (self.end_time - self.start_time).total_seconds() |
| 26 | return 0 |
| 27 | |
| 28 | class ProductionOrchestrator: |
| 29 | def __init__( |
| 30 | self, |
| 31 | agents: dict, |
| 32 | max_parallel: int = 5, |
| 33 | task_timeout: int = 300, |
| 34 | enable_monitoring: bool = True |
| 35 | ): |
| 36 | self.client = openai.OpenAI() |
| 37 | self.agents = agents |
| 38 | self.max_parallel = max_parallel |
| 39 | self.task_timeout = task_timeout |
| 40 | self.enable_monitoring = enable_monitoring |
| 41 | self.metrics = None |
| 42 | |
| 43 | async def run(self, goal: str, metadata: dict = None) -> dict: |
| 44 | """Execute orchestrated workflow""" |
| 45 | |
| 46 | self.metrics = ExecutionMetrics(start_time=datetime.now()) |
| 47 | |
| 48 | logger.info(f"Starting orchestration: {goal[:100]}...") |
| 49 | |
| 50 | try: |
| 51 | # Decompose |
| 52 | tasks = await self._decompose(goal) |
| 53 | self.metrics.tasks_total = len(tasks) |
| 54 | logger.info(f"Decomposed into {len(tasks)} tasks") |
| 55 | |
| 56 | # Execute |
| 57 | results = await self._execute_all(tasks) |
| 58 | |
| 59 | # Synthesize |
| 60 | final = await self._synthesize(goal, results) |
| 61 | |
| 62 | self.metrics.end_time = datetime.now() |
| 63 | |
| 64 | return { |
| 65 | "success": True, |
| 66 | "result": final, |
| 67 | "metrics": self._get_metrics_dict(), |
| 68 | "trace": self._get_execution_trace(tasks) |
| 69 | } |
| 70 | |
| 71 | except Exception as e: |
| 72 | logger.error(f"Orchestration failed: {e}") |
| 73 | self.metrics.end_time = datetime.now() |
| 74 | |
| 75 | return { |
| 76 | "success": False, |
| 77 | "error": str(e), |
| 78 | "metrics": self._get_metrics_dict() |
| 79 | } |
| 80 | |
| 81 | async def _execute_all(self, tasks: list) -> dict: |
| 82 | """Execute all tasks respecting dependencies""" |
| 83 | |
| 84 | task_map = {t["id"]: t for t in tasks} |
| 85 | results = {} |
| 86 | completed = set() |
| 87 | |
| 88 | while len(completed) < len(tasks): |
| 89 | # Find ready tasks |
| 90 | ready = [ |
| 91 | t for t in tasks |
| 92 | if t["id"] not in completed |
| 93 | and all(dep in completed for dep in t.get("dependencies", [])) |
| 94 | ] |
| 95 | |
| 96 | if not ready: |
| 97 | pending = [t["id"] for t in tasks if t["id"] not in completed] |
| 98 | raise RuntimeError(f"Deadlock detected. Pending: {pending}") |
| 99 | |
| 100 | # Execute batch in parallel |
| 101 | batch_results = await asyncio.gather(*[ |
| 102 | self._execute_single(t, results) |
| 103 | for t in ready[:self.max_parallel] |
| 104 | ], return_exceptions=True) |
| 105 | |
| 106 | # Process results |
| 107 | for task, result in zip(ready[:self.max_parallel], batch_results): |
| 108 | if isinstance(result, Exception): |
| 109 | self.metrics.tasks_failed += 1 |
| 110 | logger.error(f"Task {task['id']} failed: {result}") |
| 111 | raise result |
| 112 | |
| 113 | results[task["id"]] = result |
| 114 | completed.add(task["id"]) |
| 115 | self.metrics.tasks_completed += 1 |
| 116 | logger.info(f"Completed: {task['id']}") |
| 117 | |
| 118 | return results |
| 119 | |
| 120 | async def _execute_single(self, task: dict, context: dict) -> str: |
| 121 | """Execute single task with timeout""" |
| 122 | |
| 123 | agent = self.agents.get(task["agent"]) |
| 124 | if not agent: |
| 125 | raise ValueError(f"Unknown agent: {task['agent']}") |
| 126 | |
| 127 | # Build context from dependencies |
| 128 | dep_context = { |
| 129 | dep: context[dep] |
| 130 | for dep in task.get("dependencies", []) |
| 131 | if dep in context |
| 132 | } |
| 133 | |
| 134 | try: |
| 135 | result = await asyncio.wait_for( |
| 136 | asyncio.to_thread(agent.execute, task["description"], dep_context), |
| 137 | timeout=self.task_timeout |
| 138 | ) |
| 139 | return result |
| 140 | except asyncio.TimeoutError: |
| 141 | raise TimeoutError(f"Task {task['id']} timed out after {self.task_timeout}s") |
| 142 | |
| 143 | def _get_metrics_dict(self) -> dict: |
| 144 | return { |
| 145 | "duration_seconds": self.metrics.duration_seconds, |
| 146 | "tasks_total": self.metrics.tasks_total, |
| 147 | "tasks_completed": self.metrics.tasks_completed, |
| 148 | "tasks_failed": self.metrics.tasks_failed, |
| 149 | "success_rate": self.metrics.tasks_completed / max(self.metrics.tasks_total, 1) |
| 150 | } |
| 151 | |
| 152 | def _get_execution_trace(self, tasks: list) -> list: |
| 153 | return [ |
| 154 | { |
| 155 | "id": t["id"], |
| 156 | "agent": t["agent"], |
| 157 | "description": t["description"][:100], |
| 158 | "dependencies": t.get("dependencies", []) |
| 159 | } |
| 160 | for t in tasks |
| 161 | ] |
| 162 | |
| 163 | |
| 164 | # Usage |
| 165 | async def main(): |
| 166 | orchestrator = ProductionOrchestrator( |
| 167 | agents={ |
| 168 | "researcher": research_agent, |
| 169 | "analyst": analyst_agent, |
| 170 | "writer": writer_agent, |
| 171 | "coder": coder_agent |
| 172 | }, |
| 173 | max_parallel=3, |
| 174 | task_timeout=120 |
| 175 | ) |
| 176 | |
| 177 | result = await orchestrator.run( |
| 178 | "Research the latest AI agent frameworks, analyze their features, " |
| 179 | "create a comparison table, and write a recommendation report." |
| 180 | ) |
| 181 | |
| 182 | print(f"Success: {result['success']}") |
| 183 | print(f"Duration: {result['metrics']['duration_seconds']:.1f}s") |
| 184 | print(f"Tasks: {result['metrics']['tasks_completed']}/{result['metrics']['tasks_total']}") |
| 185 | |
| 186 | if result['success']: |
| 187 | print(f"\nResult:\n{result['result']}") |
| 188 | |
| 189 | # asyncio.run(main()) |
| 190 | |
Best Practices
1. Keep Orchestrator Logic Simple
| 1 | # ❌ Orchestrator doing too much |
| 2 | class BadOrchestrator: |
| 3 | def run(self, goal): |
| 4 | # Orchestrator shouldn't contain domain logic |
| 5 | if "sales" in goal: |
| 6 | return self._analyze_sales() |
| 7 | elif "marketing" in goal: |
| 8 | return self._analyze_marketing() |
| 9 | |
| 10 | # ✅ Orchestrator focuses on coordination |
| 11 | class GoodOrchestrator: |
| 12 | def run(self, goal): |
| 13 | tasks = self._decompose(goal) # What to do |
| 14 | agents = self._select_agents(tasks) # Who does it |
| 15 | results = self._execute(tasks, agents) # Coordination |
| 16 | return self._synthesize(results) # Combine results |
| 17 | |
2. Design Clear Agent Interfaces
| 1 | # All agents should follow the same interface |
| 2 | class AgentInterface: |
| 3 | def execute(self, task: str, context: dict) -> str: |
| 4 | """ |
| 5 | Args: |
| 6 | task: What to do |
| 7 | context: Results from dependency tasks |
| 8 | |
| 9 | Returns: |
| 10 | Result as string (or structured data as JSON string) |
| 11 | """ |
| 12 | raise NotImplementedError |
| 13 | |
3. Monitor Everything
| 1 | def _execute_task(self, task): |
| 2 | start = time.time() |
| 3 | |
| 4 | try: |
| 5 | result = self.agents[task.agent].execute(task.description, context) |
| 6 | |
| 7 | self.metrics.record({ |
| 8 | "task_id": task.id, |
| 9 | "agent": task.agent, |
| 10 | "duration": time.time() - start, |
| 11 | "success": True, |
| 12 | "result_size": len(result) |
| 13 | }) |
| 14 | |
| 15 | return result |
| 16 | except Exception as e: |
| 17 | self.metrics.record({ |
| 18 | "task_id": task.id, |
| 19 | "agent": task.agent, |
| 20 | "duration": time.time() - start, |
| 21 | "success": False, |
| 22 | "error": str(e) |
| 23 | }) |
| 24 | raise |
| 25 | |
4. Enable Graceful Degradation
| 1 | def run(self, goal: str) -> dict: |
| 2 | try: |
| 3 | return self._full_execution(goal) |
| 4 | except Exception as e: |
| 5 | logger.warning(f"Full execution failed: {e}") |
| 6 | |
| 7 | # Try simpler approach |
| 8 | try: |
| 9 | return self._simplified_execution(goal) |
| 10 | except: |
| 11 | # Last resort: single agent |
| 12 | return self._single_agent_fallback(goal) |
| 13 | |
Conclusion
The Orchestrator Pattern is how you scale AI agents to enterprise complexity:
- Task decomposition breaks big problems into manageable pieces
- Agent specialization ensures each task is handled by an expert
- Parallel execution maximizes throughput
- Dependency management ensures correct ordering
- Fault tolerance keeps systems running despite failures
Start with a simple pipeline orchestrator. Add routing when you have diverse task types. Move to hierarchical orchestration for truly complex workflows.
The system that orchestrates specialists outperforms the generalist. Every time.
Ready to orchestrate agents with secure code execution? Get started with HopX — sandboxes that give each agent isolated environments.
Further Reading
- What Is an AI Agent? — Agent fundamentals
- Multi-Agent Architectures — Patterns for agent coordination
- The Planning Pattern — Task decomposition strategies
- Tool Use Pattern — Agent capabilities
- Human-in-the-Loop — Orchestration with human oversight