Back to Blog

The Orchestrator Pattern: Coordinating Complex AI Agent Workflows

AI AgentsAlin Dobra14 min read

The Orchestrator Pattern: Coordinating Complex AI Agent Workflows

Single agents hit walls. They run out of context, lack specialized skills, and struggle with complex multi-step tasks. The solution? Don't build one super-agent—build an orchestrator that coordinates many specialized agents.

The Orchestrator Pattern is how you build AI systems that tackle enterprise-grade complexity: routing tasks to the right specialists, managing dependencies, handling failures, and synthesizing results.

This guide shows you how to build orchestrators that turn chaos into coordination.

What Is the Orchestrator Pattern?

An orchestrator is a meta-agent that doesn't do the work itself—it decides who should do the work and when:

text
1
2
                    ORCHESTRATOR                             
3
                                                             
4
   "Analyze sales data, create visualizations,               
5
    and write an executive summary"                          
6
                                                             
7
                                                            
8
                                                            
9
                                      
10
                 Task Decomposer                           
11
                                      
12
                                                            
13
                            
14
                                                          
15
                     
16
     Data           Viz          Writer               
17
     Analyst       Agent         Agent                
18
                     
19
                                                          
20
                            
21
                                                            
22
                                      
23
                Result Synthesizer                         
24
                                      
25
                                                            
26
                                                            
27
                   Final Output                              
28
                                                             
29
30
 

The orchestrator handles:

  • Task decomposition: Breaking complex tasks into subtasks
  • Agent selection: Routing each subtask to the right specialist
  • Dependency management: Ensuring correct execution order
  • Result synthesis: Combining outputs into a coherent whole
  • Error handling: Retrying, rerouting, or escalating failures

Why Orchestration Matters

1. Specialization Beats Generalization

One agent trying to do everything:

text
1
 Jack of all trades, master of none
2
 Context window filled with irrelevant instructions
3
 Conflicting objectives in one prompt
4
 

Specialized agents with orchestration:

text
1
 Each agent masters its domain
2
 Focused context for each task
3
 Clear, single-purpose prompts
4
 

2. Scalability

text
1
Single Agent           Orchestrated System
2
                             
3
                             
4
            
5
 One LLM             Orchestrator 
6
  Call               
7
                   
8
                    
9
                                      
10
                 
11
               Agent 1 Agent 2 Agent 3
12
                 
13
                                      
14
                    
15
                              
16
                    Run in parallel = 3x faster
17
 

3. Fault Isolation

When one agent fails:

  • Without orchestration: Entire task fails
  • With orchestration: Retry, use backup agent, or gracefully degrade

Basic Orchestrator Implementation

Here's a complete, minimal orchestrator:

python
1
import openai
2
import json
3
from dataclasses import dataclass
4
from enum import Enum
5
from typing import Callable
6
import concurrent.futures
7
 
8
class TaskStatus(Enum):
9
    PENDING = "pending"
10
    RUNNING = "running"
11
    COMPLETED = "completed"
12
    FAILED = "failed"
13
 
14
@dataclass
15
class Task:
16
    id: str
17
    description: str
18
    agent_type: str
19
    dependencies: list[str]
20
    status: TaskStatus = TaskStatus.PENDING
21
    result: str = None
22
    error: str = None
23
 
24
@dataclass 
25
class Agent:
26
    name: str
27
    description: str
28
    execute: Callable[[str, dict], str]
29
 
30
class Orchestrator:
31
    def __init__(self, agents: dict[str, Agent]):
32
        self.client = openai.OpenAI()
33
        self.agents = agents
34
        self.tasks: dict[str, Task] = {}
35
        self.results: dict[str, str] = {}
36
    
37
    def run(self, goal: str) -> dict:
38
        """Orchestrate agents to achieve the goal"""
39
        
40
        # Phase 1: Decompose into tasks
41
        tasks = self._decompose(goal)
42
        self.tasks = {t.id: t for t in tasks}
43
        
44
        print(f"Created {len(tasks)} tasks")
45
        
46
        # Phase 2: Execute tasks respecting dependencies
47
        while not self._all_complete():
48
            # Find tasks ready to run
49
            ready = self._get_ready_tasks()
50
            
51
            if not ready:
52
                if self._has_failures():
53
                    break
54
                continue
55
            
56
            # Execute ready tasks in parallel
57
            with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
58
                futures = {
59
                    executor.submit(self._execute_task, task): task
60
                    for task in ready
61
                }
62
                
63
                for future in concurrent.futures.as_completed(futures):
64
                    task = futures[future]
65
                    try:
66
                        result = future.result()
67
                        task.status = TaskStatus.COMPLETED
68
                        task.result = result
69
                        self.results[task.id] = result
70
                    except Exception as e:
71
                        task.status = TaskStatus.FAILED
72
                        task.error = str(e)
73
        
74
        # Phase 3: Synthesize results
75
        if self._has_failures():
76
            return {
77
                "success": False,
78
                "completed": [t.id for t in self.tasks.values() if t.status == TaskStatus.COMPLETED],
79
                "failed": [t.id for t in self.tasks.values() if t.status == TaskStatus.FAILED],
80
                "partial_results": self.results
81
            }
82
        
83
        final_result = self._synthesize(goal, self.results)
84
        
85
        return {
86
            "success": True,
87
            "result": final_result,
88
            "tasks_completed": len(self.tasks)
89
        }
90
    
91
    def _decompose(self, goal: str) -> list[Task]:
92
        """Break goal into tasks with dependencies"""
93
        
94
        agent_descriptions = "\n".join([
95
            f"- {name}: {agent.description}"
96
            for name, agent in self.agents.items()
97
        ])
98
        
99
        response = self.client.chat.completions.create(
100
            model="gpt-4o",
101
            messages=[{
102
                "role": "system",
103
                "content": f"""Decompose this goal into tasks for available agents.
104
 
105
Available agents:
106
{agent_descriptions}
107
 
108
Return JSON:
109
{{
110
    "tasks": [
111
        {{
112
            "id": "task_1",
113
            "description": "What to do",
114
            "agent_type": "agent_name",
115
            "dependencies": []
116
        }},
117
        {{
118
            "id": "task_2",
119
            "description": "Next task",
120
            "agent_type": "agent_name",
121
            "dependencies": ["task_1"]
122
        }}
123
    ]
124
}}
125
 
126
Rules:
127
- Break into 2-8 tasks
128
- Each task should be focused and achievable
129
- List dependencies (tasks that must complete first)
130
- Assign to the most appropriate agent"""
131
            }, {
132
                "role": "user",
133
                "content": goal
134
            }],
135
            response_format={"type": "json_object"}
136
        )
137
        
138
        data = json.loads(response.choices[0].message.content)
139
        
140
        return [
141
            Task(
142
                id=t["id"],
143
                description=t["description"],
144
                agent_type=t["agent_type"],
145
                dependencies=t.get("dependencies", [])
146
            )
147
            for t in data["tasks"]
148
        ]
149
    
150
    def _get_ready_tasks(self) -> list[Task]:
151
        """Get tasks whose dependencies are all complete"""
152
        ready = []
153
        for task in self.tasks.values():
154
            if task.status != TaskStatus.PENDING:
155
                continue
156
            
157
            deps_complete = all(
158
                self.tasks[dep].status == TaskStatus.COMPLETED
159
                for dep in task.dependencies
160
            )
161
            
162
            if deps_complete:
163
                ready.append(task)
164
        
165
        return ready
166
    
167
    def _execute_task(self, task: Task) -> str:
168
        """Execute a single task using the appropriate agent"""
169
        task.status = TaskStatus.RUNNING
170
        
171
        agent = self.agents.get(task.agent_type)
172
        if not agent:
173
            raise ValueError(f"Unknown agent type: {task.agent_type}")
174
        
175
        # Gather context from dependencies
176
        context = {
177
            dep: self.results[dep]
178
            for dep in task.dependencies
179
        }
180
        
181
        return agent.execute(task.description, context)
182
    
183
    def _all_complete(self) -> bool:
184
        return all(
185
            t.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]
186
            for t in self.tasks.values()
187
        )
188
    
189
    def _has_failures(self) -> bool:
190
        return any(t.status == TaskStatus.FAILED for t in self.tasks.values())
191
    
192
    def _synthesize(self, goal: str, results: dict) -> str:
193
        """Combine task results into final output"""
194
        
195
        results_text = "\n\n".join([
196
            f"=== {task_id} ===\n{result}"
197
            for task_id, result in results.items()
198
        ])
199
        
200
        response = self.client.chat.completions.create(
201
            model="gpt-4o",
202
            messages=[{
203
                "role": "system",
204
                "content": "Synthesize these task results into a coherent final response."
205
            }, {
206
                "role": "user",
207
                "content": f"Goal: {goal}\n\nTask Results:\n{results_text}"
208
            }]
209
        )
210
        
211
        return response.choices[0].message.content
212
 
213
 
214
# Define specialized agents
215
def create_data_analyst():
216
    client = openai.OpenAI()
217
    
218
    def execute(task: str, context: dict) -> str:
219
        response = client.chat.completions.create(
220
            model="gpt-4o",
221
            messages=[{
222
                "role": "system",
223
                "content": "You are a data analyst. Analyze data and provide insights."
224
            }, {
225
                "role": "user",
226
                "content": f"Task: {task}\n\nContext: {json.dumps(context)}"
227
            }]
228
        )
229
        return response.choices[0].message.content
230
    
231
    return Agent(
232
        name="data_analyst",
233
        description="Analyzes data, finds patterns, calculates statistics",
234
        execute=execute
235
    )
236
 
237
def create_writer():
238
    client = openai.OpenAI()
239
    
240
    def execute(task: str, context: dict) -> str:
241
        response = client.chat.completions.create(
242
            model="gpt-4o",
243
            messages=[{
244
                "role": "system",
245
                "content": "You are a professional writer. Create clear, engaging content."
246
            }, {
247
                "role": "user",
248
                "content": f"Task: {task}\n\nContext: {json.dumps(context)}"
249
            }]
250
        )
251
        return response.choices[0].message.content
252
    
253
    return Agent(
254
        name="writer",
255
        description="Writes reports, summaries, and documentation",
256
        execute=execute
257
    )
258
 
259
def create_coder():
260
    from hopx import Sandbox
261
    client = openai.OpenAI()
262
    
263
    def execute(task: str, context: dict) -> str:
264
        # Generate code
265
        response = client.chat.completions.create(
266
            model="gpt-4o",
267
            messages=[{
268
                "role": "system",
269
                "content": "Write Python code to accomplish the task. Output only code."
270
            }, {
271
                "role": "user",
272
                "content": f"Task: {task}\n\nContext: {json.dumps(context)}"
273
            }]
274
        )
275
        
276
        code = response.choices[0].message.content
277
        
278
        # Execute in sandbox
279
        sandbox = Sandbox.create(template="code-interpreter")
280
        try:
281
            sandbox.files.write("/app/task.py", code)
282
            result = sandbox.commands.run("python /app/task.py")
283
            return result.stdout if result.exit_code == 0 else f"Error: {result.stderr}"
284
        finally:
285
            sandbox.kill()
286
    
287
    return Agent(
288
        name="coder",
289
        description="Writes and executes Python code for data processing and analysis",
290
        execute=execute
291
    )
292
 
293
 
294
# Usage
295
orchestrator = Orchestrator({
296
    "data_analyst": create_data_analyst(),
297
    "writer": create_writer(),
298
    "coder": create_coder()
299
})
300
 
301
result = orchestrator.run(
302
    "Analyze our Q4 sales data, identify the top 3 trends, "
303
    "create visualizations, and write an executive summary."
304
)
305
 
306
print(result)
307
 

Orchestration Patterns

Pattern 1: Sequential Pipeline

Tasks flow in a fixed order:

text
1
Input  Agent A  Agent B  Agent C  Output
2
 
python
1
class PipelineOrchestrator:
2
    def __init__(self, stages: list[Agent]):
3
        self.stages = stages
4
    
5
    def run(self, input_data: str) -> str:
6
        current = input_data
7
        
8
        for stage in self.stages:
9
            print(f"Running stage: {stage.name}")
10
            current = stage.execute(current, {})
11
        
12
        return current
13
 
14
 
15
# Usage
16
pipeline = PipelineOrchestrator([
17
    extract_agent,    # Extract key information
18
    transform_agent,  # Transform data
19
    analyze_agent,    # Analyze patterns
20
    report_agent      # Generate report
21
])
22
 
23
result = pipeline.run(raw_document)
24
 

Pattern 2: Router/Dispatcher

Route tasks to specialized agents based on content:

python
1
class RouterOrchestrator:
2
    def __init__(self, agents: dict[str, Agent]):
3
        self.client = openai.OpenAI()
4
        self.agents = agents
5
    
6
    def run(self, task: str) -> str:
7
        # Classify the task
8
        agent_name = self._route(task)
9
        
10
        # Execute with selected agent
11
        agent = self.agents[agent_name]
12
        return agent.execute(task, {})
13
    
14
    def _route(self, task: str) -> str:
15
        agent_options = "\n".join([
16
            f"- {name}: {agent.description}"
17
            for name, agent in self.agents.items()
18
        ])
19
        
20
        response = self.client.chat.completions.create(
21
            model="gpt-4o",
22
            messages=[{
23
                "role": "user",
24
                "content": f"""Which agent should handle this task?
25
 
26
Task: {task}
27
 
28
Agents:
29
{agent_options}
30
 
31
Reply with just the agent name."""
32
            }]
33
        )
34
        
35
        return response.choices[0].message.content.strip()
36
 
37
 
38
# Usage
39
router = RouterOrchestrator({
40
    "code": code_agent,
41
    "writing": writing_agent,
42
    "research": research_agent,
43
    "math": math_agent
44
})
45
 
46
# Automatically routes to appropriate agent
47
result = router.run("Write a function to calculate compound interest")
48
 

Pattern 3: Hierarchical Orchestration

Orchestrators managing other orchestrators:

text
1
                    
2
                     Master          
3
                     Orchestrator    
4
                    
5
                             
6
           
7
                                             
8
          
9
     Research        Development     QA          
10
     Orchestrator    Orchestrator    Orchestrator
11
          
12
                                             
13
                         
14
                                           
15
      A1  A2  A3        A4  A5  A6        A7  A8  A9
16
 
python
1
class HierarchicalOrchestrator:
2
    def __init__(self, sub_orchestrators: dict[str, Orchestrator]):
3
        self.client = openai.OpenAI()
4
        self.sub_orchestrators = sub_orchestrators
5
    
6
    def run(self, goal: str) -> dict:
7
        # Decompose into high-level phases
8
        phases = self._plan_phases(goal)
9
        
10
        results = {}
11
        for phase in phases:
12
            sub_orch = self.sub_orchestrators[phase["orchestrator"]]
13
            result = sub_orch.run(phase["goal"])
14
            results[phase["name"]] = result
15
        
16
        return self._synthesize(goal, results)
17
    
18
    def _plan_phases(self, goal: str) -> list[dict]:
19
        orchestrator_list = ", ".join(self.sub_orchestrators.keys())
20
        
21
        response = self.client.chat.completions.create(
22
            model="gpt-4o",
23
            messages=[{
24
                "role": "user",
25
                "content": f"""Break this goal into phases.
26
 
27
Goal: {goal}
28
 
29
Available orchestrators: {orchestrator_list}
30
 
31
Return JSON:
32
{{"phases": [{{"name": "phase_1", "orchestrator": "name", "goal": "sub-goal"}}]}}"""
33
            }],
34
            response_format={"type": "json_object"}
35
        )
36
        
37
        return json.loads(response.choices[0].message.content)["phases"]
38
 

Pattern 4: Dynamic Agent Creation

Create agents on-the-fly based on task requirements:

python
1
class DynamicOrchestrator:
2
    def __init__(self):
3
        self.client = openai.OpenAI()
4
        self.agent_cache = {}
5
    
6
    def run(self, goal: str) -> str:
7
        # Determine what agents we need
8
        agent_specs = self._design_agents(goal)
9
        
10
        # Create or retrieve agents
11
        agents = {}
12
        for spec in agent_specs:
13
            agent = self._get_or_create_agent(spec)
14
            agents[spec["name"]] = agent
15
        
16
        # Create orchestrator with these agents
17
        orchestrator = Orchestrator(agents)
18
        return orchestrator.run(goal)
19
    
20
    def _design_agents(self, goal: str) -> list[dict]:
21
        response = self.client.chat.completions.create(
22
            model="gpt-4o",
23
            messages=[{
24
                "role": "user",
25
                "content": f"""Design specialized agents for this goal.
26
 
27
Goal: {goal}
28
 
29
Return JSON:
30
{{
31
    "agents": [
32
        {{
33
            "name": "agent_name",
34
            "role": "expert role description",
35
            "capabilities": ["capability1", "capability2"],
36
            "system_prompt": "You are..."
37
        }}
38
    ]
39
}}"""
40
            }],
41
            response_format={"type": "json_object"}
42
        )
43
        
44
        return json.loads(response.choices[0].message.content)["agents"]
45
    
46
    def _get_or_create_agent(self, spec: dict) -> Agent:
47
        cache_key = spec["name"]
48
        
49
        if cache_key in self.agent_cache:
50
            return self.agent_cache[cache_key]
51
        
52
        def create_execute(system_prompt):
53
            def execute(task: str, context: dict) -> str:
54
                response = self.client.chat.completions.create(
55
                    model="gpt-4o",
56
                    messages=[
57
                        {"role": "system", "content": system_prompt},
58
                        {"role": "user", "content": f"Task: {task}\nContext: {context}"}
59
                    ]
60
                )
61
                return response.choices[0].message.content
62
            return execute
63
        
64
        agent = Agent(
65
            name=spec["name"],
66
            description=spec["role"],
67
            execute=create_execute(spec["system_prompt"])
68
        )
69
        
70
        self.agent_cache[cache_key] = agent
71
        return agent
72
 

Error Handling and Recovery

Retry with Backoff

python
1
class ResilientOrchestrator(Orchestrator):
2
    def __init__(self, agents, max_retries=3):
3
        super().__init__(agents)
4
        self.max_retries = max_retries
5
    
6
    def _execute_task(self, task: Task) -> str:
7
        last_error = None
8
        
9
        for attempt in range(self.max_retries):
10
            try:
11
                return super()._execute_task(task)
12
            except Exception as e:
13
                last_error = e
14
                wait_time = 2 ** attempt  # Exponential backoff
15
                print(f"Task {task.id} failed, retrying in {wait_time}s...")
16
                time.sleep(wait_time)
17
        
18
        raise last_error
19
 

Fallback Agents

python
1
class FallbackOrchestrator(Orchestrator):
2
    def __init__(self, agents, fallback_agents):
3
        super().__init__(agents)
4
        self.fallback_agents = fallback_agents
5
    
6
    def _execute_task(self, task: Task) -> str:
7
        try:
8
            return super()._execute_task(task)
9
        except Exception as primary_error:
10
            # Try fallback agent
11
            fallback = self.fallback_agents.get(task.agent_type)
12
            if fallback:
13
                print(f"Primary agent failed, using fallback for {task.id}")
14
                return fallback.execute(task.description, self._get_context(task))
15
            raise primary_error
16
 

Partial Results

python
1
class GracefulOrchestrator(Orchestrator):
2
    def run(self, goal: str) -> dict:
3
        result = super().run(goal)
4
        
5
        if not result["success"]:
6
            # Return what we could complete
7
            completed_results = {
8
                t.id: t.result 
9
                for t in self.tasks.values() 
10
                if t.status == TaskStatus.COMPLETED
11
            }
12
            
13
            return {
14
                "success": False,
15
                "partial_result": self._synthesize_partial(goal, completed_results),
16
                "completed_tasks": list(completed_results.keys()),
17
                "failed_tasks": [t.id for t in self.tasks.values() if t.status == TaskStatus.FAILED],
18
                "note": "Some tasks failed. Partial results provided."
19
            }
20
        
21
        return result
22
 

Production Orchestrator

A complete production-ready orchestrator with monitoring:

python
1
from hopx import Sandbox
2
import openai
3
import json
4
from datetime import datetime
5
from dataclasses import dataclass, field
6
import asyncio
7
from typing import Optional
8
import logging
9
 
10
logging.basicConfig(level=logging.INFO)
11
logger = logging.getLogger("orchestrator")
12
 
13
@dataclass
14
class ExecutionMetrics:
15
    start_time: datetime
16
    end_time: Optional[datetime] = None
17
    tasks_total: int = 0
18
    tasks_completed: int = 0
19
    tasks_failed: int = 0
20
    total_tokens: int = 0
21
    
22
    @property
23
    def duration_seconds(self) -> float:
24
        if self.end_time:
25
            return (self.end_time - self.start_time).total_seconds()
26
        return 0
27
 
28
class ProductionOrchestrator:
29
    def __init__(
30
        self,
31
        agents: dict,
32
        max_parallel: int = 5,
33
        task_timeout: int = 300,
34
        enable_monitoring: bool = True
35
    ):
36
        self.client = openai.OpenAI()
37
        self.agents = agents
38
        self.max_parallel = max_parallel
39
        self.task_timeout = task_timeout
40
        self.enable_monitoring = enable_monitoring
41
        self.metrics = None
42
    
43
    async def run(self, goal: str, metadata: dict = None) -> dict:
44
        """Execute orchestrated workflow"""
45
        
46
        self.metrics = ExecutionMetrics(start_time=datetime.now())
47
        
48
        logger.info(f"Starting orchestration: {goal[:100]}...")
49
        
50
        try:
51
            # Decompose
52
            tasks = await self._decompose(goal)
53
            self.metrics.tasks_total = len(tasks)
54
            logger.info(f"Decomposed into {len(tasks)} tasks")
55
            
56
            # Execute
57
            results = await self._execute_all(tasks)
58
            
59
            # Synthesize
60
            final = await self._synthesize(goal, results)
61
            
62
            self.metrics.end_time = datetime.now()
63
            
64
            return {
65
                "success": True,
66
                "result": final,
67
                "metrics": self._get_metrics_dict(),
68
                "trace": self._get_execution_trace(tasks)
69
            }
70
            
71
        except Exception as e:
72
            logger.error(f"Orchestration failed: {e}")
73
            self.metrics.end_time = datetime.now()
74
            
75
            return {
76
                "success": False,
77
                "error": str(e),
78
                "metrics": self._get_metrics_dict()
79
            }
80
    
81
    async def _execute_all(self, tasks: list) -> dict:
82
        """Execute all tasks respecting dependencies"""
83
        
84
        task_map = {t["id"]: t for t in tasks}
85
        results = {}
86
        completed = set()
87
        
88
        while len(completed) < len(tasks):
89
            # Find ready tasks
90
            ready = [
91
                t for t in tasks
92
                if t["id"] not in completed
93
                and all(dep in completed for dep in t.get("dependencies", []))
94
            ]
95
            
96
            if not ready:
97
                pending = [t["id"] for t in tasks if t["id"] not in completed]
98
                raise RuntimeError(f"Deadlock detected. Pending: {pending}")
99
            
100
            # Execute batch in parallel
101
            batch_results = await asyncio.gather(*[
102
                self._execute_single(t, results)
103
                for t in ready[:self.max_parallel]
104
            ], return_exceptions=True)
105
            
106
            # Process results
107
            for task, result in zip(ready[:self.max_parallel], batch_results):
108
                if isinstance(result, Exception):
109
                    self.metrics.tasks_failed += 1
110
                    logger.error(f"Task {task['id']} failed: {result}")
111
                    raise result
112
                
113
                results[task["id"]] = result
114
                completed.add(task["id"])
115
                self.metrics.tasks_completed += 1
116
                logger.info(f"Completed: {task['id']}")
117
        
118
        return results
119
    
120
    async def _execute_single(self, task: dict, context: dict) -> str:
121
        """Execute single task with timeout"""
122
        
123
        agent = self.agents.get(task["agent"])
124
        if not agent:
125
            raise ValueError(f"Unknown agent: {task['agent']}")
126
        
127
        # Build context from dependencies
128
        dep_context = {
129
            dep: context[dep]
130
            for dep in task.get("dependencies", [])
131
            if dep in context
132
        }
133
        
134
        try:
135
            result = await asyncio.wait_for(
136
                asyncio.to_thread(agent.execute, task["description"], dep_context),
137
                timeout=self.task_timeout
138
            )
139
            return result
140
        except asyncio.TimeoutError:
141
            raise TimeoutError(f"Task {task['id']} timed out after {self.task_timeout}s")
142
    
143
    def _get_metrics_dict(self) -> dict:
144
        return {
145
            "duration_seconds": self.metrics.duration_seconds,
146
            "tasks_total": self.metrics.tasks_total,
147
            "tasks_completed": self.metrics.tasks_completed,
148
            "tasks_failed": self.metrics.tasks_failed,
149
            "success_rate": self.metrics.tasks_completed / max(self.metrics.tasks_total, 1)
150
        }
151
    
152
    def _get_execution_trace(self, tasks: list) -> list:
153
        return [
154
            {
155
                "id": t["id"],
156
                "agent": t["agent"],
157
                "description": t["description"][:100],
158
                "dependencies": t.get("dependencies", [])
159
            }
160
            for t in tasks
161
        ]
162
 
163
 
164
# Usage
165
async def main():
166
    orchestrator = ProductionOrchestrator(
167
        agents={
168
            "researcher": research_agent,
169
            "analyst": analyst_agent,
170
            "writer": writer_agent,
171
            "coder": coder_agent
172
        },
173
        max_parallel=3,
174
        task_timeout=120
175
    )
176
    
177
    result = await orchestrator.run(
178
        "Research the latest AI agent frameworks, analyze their features, "
179
        "create a comparison table, and write a recommendation report."
180
    )
181
    
182
    print(f"Success: {result['success']}")
183
    print(f"Duration: {result['metrics']['duration_seconds']:.1f}s")
184
    print(f"Tasks: {result['metrics']['tasks_completed']}/{result['metrics']['tasks_total']}")
185
    
186
    if result['success']:
187
        print(f"\nResult:\n{result['result']}")
188
 
189
# asyncio.run(main())
190
 

Best Practices

1. Keep Orchestrator Logic Simple

python
1
# ❌ Orchestrator doing too much
2
class BadOrchestrator:
3
    def run(self, goal):
4
        # Orchestrator shouldn't contain domain logic
5
        if "sales" in goal:
6
            return self._analyze_sales()
7
        elif "marketing" in goal:
8
            return self._analyze_marketing()
9
 
10
# ✅ Orchestrator focuses on coordination
11
class GoodOrchestrator:
12
    def run(self, goal):
13
        tasks = self._decompose(goal)  # What to do
14
        agents = self._select_agents(tasks)  # Who does it
15
        results = self._execute(tasks, agents)  # Coordination
16
        return self._synthesize(results)  # Combine results
17
 

2. Design Clear Agent Interfaces

python
1
# All agents should follow the same interface
2
class AgentInterface:
3
    def execute(self, task: str, context: dict) -> str:
4
        """
5
        Args:
6
            task: What to do
7
            context: Results from dependency tasks
8
        
9
        Returns:
10
            Result as string (or structured data as JSON string)
11
        """
12
        raise NotImplementedError
13
 

3. Monitor Everything

python
1
def _execute_task(self, task):
2
    start = time.time()
3
    
4
    try:
5
        result = self.agents[task.agent].execute(task.description, context)
6
        
7
        self.metrics.record({
8
            "task_id": task.id,
9
            "agent": task.agent,
10
            "duration": time.time() - start,
11
            "success": True,
12
            "result_size": len(result)
13
        })
14
        
15
        return result
16
    except Exception as e:
17
        self.metrics.record({
18
            "task_id": task.id,
19
            "agent": task.agent,
20
            "duration": time.time() - start,
21
            "success": False,
22
            "error": str(e)
23
        })
24
        raise
25
 

4. Enable Graceful Degradation

python
1
def run(self, goal: str) -> dict:
2
    try:
3
        return self._full_execution(goal)
4
    except Exception as e:
5
        logger.warning(f"Full execution failed: {e}")
6
        
7
        # Try simpler approach
8
        try:
9
            return self._simplified_execution(goal)
10
        except:
11
            # Last resort: single agent
12
            return self._single_agent_fallback(goal)
13
 

Conclusion

The Orchestrator Pattern is how you scale AI agents to enterprise complexity:

  • Task decomposition breaks big problems into manageable pieces
  • Agent specialization ensures each task is handled by an expert
  • Parallel execution maximizes throughput
  • Dependency management ensures correct ordering
  • Fault tolerance keeps systems running despite failures

Start with a simple pipeline orchestrator. Add routing when you have diverse task types. Move to hierarchical orchestration for truly complex workflows.

The system that orchestrates specialists outperforms the generalist. Every time.


Ready to orchestrate agents with secure code execution? Get started with HopX — sandboxes that give each agent isolated environments.

Further Reading