Multi-Agent Architectures with HopX
Single agents have limits. Complex tasks benefit from multiple specialized agents working together—a researcher, a coder, a reviewer. But running multiple agents with code execution capabilities creates security and coordination challenges.
HopX solves this by giving each agent its own isolated sandbox while enabling seamless coordination between them.
Why Multi-Agent?
Single agent limitations:
- Context window limits
- Jack of all trades, master of none
- Single point of failure
- Sequential processing only
Multi-agent advantages:
- Specialized expertise per agent
- Parallel task execution
- Redundancy and fault tolerance
- Modular, maintainable systems
Multi-Agent Patterns
Pattern 1: Supervisor-Worker
A supervisor agent delegates tasks to specialized worker agents:
| 1 | ┌─────────────────┐ |
| 2 | │ Supervisor │ |
| 3 | │ Agent │ |
| 4 | └────────┬────────┘ |
| 5 | │ |
| 6 | ┌────┴────┐ |
| 7 | │ │ |
| 8 | ┌───▼───┐ ┌───▼───┐ ┌───▼───┐ |
| 9 | │Worker │ │Worker │ │Worker │ |
| 10 | │Coder │ │Analyst│ │Writer │ |
| 11 | └───────┘ └───────┘ └───────┘ |
| 12 | |
Implementation:
| 1 | from hopx import Sandbox |
| 2 | import openai |
| 3 | import json |
| 4 | |
| 5 | class SupervisorAgent: |
| 6 | def __init__(self): |
| 7 | self.client = openai.OpenAI() |
| 8 | self.workers = { |
| 9 | "coder": CoderWorker(), |
| 10 | "analyst": AnalystWorker(), |
| 11 | "writer": WriterWorker() |
| 12 | } |
| 13 | |
| 14 | def process(self, task: str) -> str: |
| 15 | # Supervisor decides which workers to use |
| 16 | plan = self._create_plan(task) |
| 17 | |
| 18 | results = {} |
| 19 | for step in plan["steps"]: |
| 20 | worker = self.workers[step["worker"]] |
| 21 | result = worker.execute(step["task"], results) |
| 22 | results[step["id"]] = result |
| 23 | |
| 24 | return self._synthesize(results) |
| 25 | |
| 26 | def _create_plan(self, task: str) -> dict: |
| 27 | response = self.client.chat.completions.create( |
| 28 | model="gpt-4o", |
| 29 | messages=[{ |
| 30 | "role": "system", |
| 31 | "content": """You are a task planner. Break down tasks into steps. |
| 32 | Available workers: coder, analyst, writer. |
| 33 | Return JSON: {"steps": [{"id": "step1", "worker": "coder", "task": "..."}]}""" |
| 34 | }, { |
| 35 | "role": "user", |
| 36 | "content": task |
| 37 | }], |
| 38 | response_format={"type": "json_object"} |
| 39 | ) |
| 40 | return json.loads(response.choices[0].message.content) |
| 41 | |
| 42 | |
| 43 | class CoderWorker: |
| 44 | def execute(self, task: str, context: dict) -> str: |
| 45 | sandbox = Sandbox.create(template="code-interpreter") |
| 46 | |
| 47 | try: |
| 48 | # Generate code based on task |
| 49 | code = self._generate_code(task, context) |
| 50 | |
| 51 | # Execute in isolated sandbox |
| 52 | sandbox.files.write("/app/task.py", code) |
| 53 | result = sandbox.commands.run("cd /app && python task.py") |
| 54 | |
| 55 | return { |
| 56 | "code": code, |
| 57 | "output": result.stdout, |
| 58 | "error": result.stderr if result.exit_code != 0 else None |
| 59 | } |
| 60 | finally: |
| 61 | sandbox.kill() |
| 62 | |
| 63 | |
| 64 | class AnalystWorker: |
| 65 | def execute(self, task: str, context: dict) -> str: |
| 66 | sandbox = Sandbox.create(template="code-interpreter") |
| 67 | |
| 68 | try: |
| 69 | # Write analysis code |
| 70 | analysis_code = self._generate_analysis(task, context) |
| 71 | sandbox.files.write("/app/analyze.py", analysis_code) |
| 72 | |
| 73 | # If previous step produced data, upload it |
| 74 | if "data" in context: |
| 75 | sandbox.files.write("/app/data.json", json.dumps(context["data"])) |
| 76 | |
| 77 | result = sandbox.commands.run("cd /app && python analyze.py") |
| 78 | return {"analysis": result.stdout} |
| 79 | finally: |
| 80 | sandbox.kill() |
| 81 | |
| 82 | |
| 83 | class WriterWorker: |
| 84 | def execute(self, task: str, context: dict) -> str: |
| 85 | # Writer doesn't need sandbox - just text generation |
| 86 | response = openai.chat.completions.create( |
| 87 | model="gpt-4o", |
| 88 | messages=[{ |
| 89 | "role": "system", |
| 90 | "content": "You are a technical writer. Create clear documentation." |
| 91 | }, { |
| 92 | "role": "user", |
| 93 | "content": f"Write about: {task}\n\nContext: {json.dumps(context)}" |
| 94 | }] |
| 95 | ) |
| 96 | return {"document": response.choices[0].message.content} |
| 97 | |
| 98 | |
| 99 | # Usage |
| 100 | supervisor = SupervisorAgent() |
| 101 | result = supervisor.process( |
| 102 | "Analyze the Titanic dataset, create a survival prediction model, " |
| 103 | "and write a report explaining the findings." |
| 104 | ) |
| 105 | |
Pattern 2: Debate/Adversarial
Multiple agents debate or challenge each other's work:
| 1 | ┌─────────────────┐ |
| 2 | │ Proposer │◄─────┐ |
| 3 | │ Agent │ │ |
| 4 | └────────┬────────┘ │ |
| 5 | │ │ |
| 6 | ▼ │ |
| 7 | ┌─────────────────┐ │ |
| 8 | │ Critic │──────┘ |
| 9 | │ Agent │ |
| 10 | └────────┬────────┘ |
| 11 | │ |
| 12 | ▼ |
| 13 | ┌─────────────────┐ |
| 14 | │ Synthesizer │ |
| 15 | │ Agent │ |
| 16 | └─────────────────┘ |
| 17 | |
Implementation:
| 1 | from hopx import Sandbox |
| 2 | import openai |
| 3 | |
| 4 | class CodeDebateSystem: |
| 5 | def __init__(self): |
| 6 | self.client = openai.OpenAI() |
| 7 | self.max_rounds = 3 |
| 8 | |
| 9 | def solve(self, problem: str) -> str: |
| 10 | """Solve a coding problem through debate""" |
| 11 | |
| 12 | # Initial proposal |
| 13 | proposal = self._propose(problem) |
| 14 | |
| 15 | for round in range(self.max_rounds): |
| 16 | # Critic reviews and tests the code |
| 17 | critique = self._critique(problem, proposal) |
| 18 | |
| 19 | if critique["approved"]: |
| 20 | break |
| 21 | |
| 22 | # Proposer improves based on feedback |
| 23 | proposal = self._improve(problem, proposal, critique) |
| 24 | |
| 25 | return proposal["code"] |
| 26 | |
| 27 | def _propose(self, problem: str) -> dict: |
| 28 | """Proposer agent generates initial solution""" |
| 29 | response = self.client.chat.completions.create( |
| 30 | model="gpt-4o", |
| 31 | messages=[{ |
| 32 | "role": "system", |
| 33 | "content": "You are a Python expert. Solve coding problems." |
| 34 | }, { |
| 35 | "role": "user", |
| 36 | "content": problem |
| 37 | }] |
| 38 | ) |
| 39 | |
| 40 | code = self._extract_code(response.choices[0].message.content) |
| 41 | return {"code": code, "reasoning": response.choices[0].message.content} |
| 42 | |
| 43 | def _critique(self, problem: str, proposal: dict) -> dict: |
| 44 | """Critic agent tests and reviews the code""" |
| 45 | sandbox = Sandbox.create(template="code-interpreter") |
| 46 | |
| 47 | try: |
| 48 | # Test the proposed code |
| 49 | sandbox.files.write("/app/solution.py", proposal["code"]) |
| 50 | result = sandbox.commands.run("cd /app && python solution.py") |
| 51 | |
| 52 | # Generate critique |
| 53 | response = self.client.chat.completions.create( |
| 54 | model="gpt-4o", |
| 55 | messages=[{ |
| 56 | "role": "system", |
| 57 | "content": """You are a code reviewer. Analyze code for: |
| 58 | 1. Correctness |
| 59 | 2. Edge cases |
| 60 | 3. Performance |
| 61 | 4. Readability |
| 62 | Return JSON: {"approved": bool, "issues": [...], "suggestions": [...]}""" |
| 63 | }, { |
| 64 | "role": "user", |
| 65 | "content": f"""Problem: {problem} |
| 66 | |
| 67 | Code: |
| 68 | {proposal['code']} |
| 69 | |
| 70 | Execution result: |
| 71 | stdout: {result.stdout} |
| 72 | stderr: {result.stderr} |
| 73 | exit_code: {result.exit_code}""" |
| 74 | }], |
| 75 | response_format={"type": "json_object"} |
| 76 | ) |
| 77 | |
| 78 | return json.loads(response.choices[0].message.content) |
| 79 | finally: |
| 80 | sandbox.kill() |
| 81 | |
| 82 | def _improve(self, problem: str, proposal: dict, critique: dict) -> dict: |
| 83 | """Proposer improves based on critique""" |
| 84 | response = self.client.chat.completions.create( |
| 85 | model="gpt-4o", |
| 86 | messages=[{ |
| 87 | "role": "system", |
| 88 | "content": "Improve the code based on the feedback." |
| 89 | }, { |
| 90 | "role": "user", |
| 91 | "content": f"""Problem: {problem} |
| 92 | |
| 93 | Current code: |
| 94 | {proposal['code']} |
| 95 | |
| 96 | Critique: |
| 97 | {json.dumps(critique)} |
| 98 | |
| 99 | Please provide improved code addressing all issues.""" |
| 100 | }] |
| 101 | ) |
| 102 | |
| 103 | code = self._extract_code(response.choices[0].message.content) |
| 104 | return {"code": code, "reasoning": response.choices[0].message.content} |
| 105 | |
| 106 | |
| 107 | # Usage |
| 108 | debate_system = CodeDebateSystem() |
| 109 | solution = debate_system.solve( |
| 110 | "Write a function to find the longest palindromic substring in a string. " |
| 111 | "It should handle edge cases and be efficient." |
| 112 | ) |
| 113 | |
Pattern 3: Assembly Line (Pipeline)
Agents process data sequentially, each adding value:
| 1 | Input → [Agent 1] → [Agent 2] → [Agent 3] → Output |
| 2 | Extract Transform Validate |
| 3 | |
Implementation:
| 1 | from hopx import Sandbox |
| 2 | from dataclasses import dataclass |
| 3 | from typing import Any |
| 4 | |
| 5 | @dataclass |
| 6 | class PipelineContext: |
| 7 | data: Any |
| 8 | metadata: dict |
| 9 | errors: list |
| 10 | |
| 11 | class PipelineAgent: |
| 12 | """Base class for pipeline agents""" |
| 13 | |
| 14 | def process(self, context: PipelineContext) -> PipelineContext: |
| 15 | raise NotImplementedError |
| 16 | |
| 17 | |
| 18 | class DataExtractor(PipelineAgent): |
| 19 | """Extracts data from raw input""" |
| 20 | |
| 21 | def process(self, context: PipelineContext) -> PipelineContext: |
| 22 | sandbox = Sandbox.create(template="code-interpreter") |
| 23 | |
| 24 | try: |
| 25 | # Upload raw data |
| 26 | sandbox.files.write("/app/raw_data.txt", context.data) |
| 27 | |
| 28 | # Run extraction |
| 29 | sandbox.files.write("/app/extract.py", """ |
| 30 | import json |
| 31 | |
| 32 | with open('/app/raw_data.txt') as f: |
| 33 | raw = f.read() |
| 34 | |
| 35 | # Parse and extract structured data |
| 36 | extracted = { |
| 37 | 'lines': raw.strip().split('\\n'), |
| 38 | 'word_count': len(raw.split()), |
| 39 | 'char_count': len(raw) |
| 40 | } |
| 41 | |
| 42 | with open('/app/extracted.json', 'w') as f: |
| 43 | json.dump(extracted, f) |
| 44 | |
| 45 | print(json.dumps(extracted)) |
| 46 | """) |
| 47 | |
| 48 | result = sandbox.commands.run("cd /app && python extract.py") |
| 49 | context.data = json.loads(result.stdout) |
| 50 | context.metadata["extraction_complete"] = True |
| 51 | |
| 52 | return context |
| 53 | finally: |
| 54 | sandbox.kill() |
| 55 | |
| 56 | |
| 57 | class DataTransformer(PipelineAgent): |
| 58 | """Transforms extracted data""" |
| 59 | |
| 60 | def process(self, context: PipelineContext) -> PipelineContext: |
| 61 | sandbox = Sandbox.create(template="code-interpreter") |
| 62 | |
| 63 | try: |
| 64 | sandbox.files.write("/app/data.json", json.dumps(context.data)) |
| 65 | |
| 66 | sandbox.files.write("/app/transform.py", """ |
| 67 | import json |
| 68 | |
| 69 | with open('/app/data.json') as f: |
| 70 | data = json.load(f) |
| 71 | |
| 72 | # Transform: add computed fields |
| 73 | transformed = data.copy() |
| 74 | transformed['avg_line_length'] = data['char_count'] / max(len(data['lines']), 1) |
| 75 | transformed['lines_processed'] = [line.strip().upper() for line in data['lines']] |
| 76 | |
| 77 | with open('/app/transformed.json', 'w') as f: |
| 78 | json.dump(transformed, f) |
| 79 | |
| 80 | print(json.dumps(transformed)) |
| 81 | """) |
| 82 | |
| 83 | result = sandbox.commands.run("cd /app && python transform.py") |
| 84 | context.data = json.loads(result.stdout) |
| 85 | context.metadata["transformation_complete"] = True |
| 86 | |
| 87 | return context |
| 88 | finally: |
| 89 | sandbox.kill() |
| 90 | |
| 91 | |
| 92 | class DataValidator(PipelineAgent): |
| 93 | """Validates transformed data""" |
| 94 | |
| 95 | def process(self, context: PipelineContext) -> PipelineContext: |
| 96 | sandbox = Sandbox.create(template="code-interpreter") |
| 97 | |
| 98 | try: |
| 99 | sandbox.files.write("/app/data.json", json.dumps(context.data)) |
| 100 | |
| 101 | sandbox.files.write("/app/validate.py", """ |
| 102 | import json |
| 103 | import sys |
| 104 | |
| 105 | with open('/app/data.json') as f: |
| 106 | data = json.load(f) |
| 107 | |
| 108 | errors = [] |
| 109 | |
| 110 | # Validation rules |
| 111 | if data.get('word_count', 0) < 1: |
| 112 | errors.append("No words found") |
| 113 | |
| 114 | if data.get('avg_line_length', 0) > 1000: |
| 115 | errors.append("Lines too long") |
| 116 | |
| 117 | if not data.get('lines_processed'): |
| 118 | errors.append("No processed lines") |
| 119 | |
| 120 | result = { |
| 121 | 'valid': len(errors) == 0, |
| 122 | 'errors': errors, |
| 123 | 'data': data |
| 124 | } |
| 125 | |
| 126 | print(json.dumps(result)) |
| 127 | """) |
| 128 | |
| 129 | result = sandbox.commands.run("cd /app && python validate.py") |
| 130 | validation = json.loads(result.stdout) |
| 131 | |
| 132 | context.errors.extend(validation["errors"]) |
| 133 | context.metadata["validation_complete"] = True |
| 134 | context.metadata["valid"] = validation["valid"] |
| 135 | |
| 136 | return context |
| 137 | finally: |
| 138 | sandbox.kill() |
| 139 | |
| 140 | |
| 141 | class Pipeline: |
| 142 | def __init__(self, agents: list[PipelineAgent]): |
| 143 | self.agents = agents |
| 144 | |
| 145 | def run(self, input_data: Any) -> PipelineContext: |
| 146 | context = PipelineContext( |
| 147 | data=input_data, |
| 148 | metadata={}, |
| 149 | errors=[] |
| 150 | ) |
| 151 | |
| 152 | for agent in self.agents: |
| 153 | context = agent.process(context) |
| 154 | |
| 155 | # Stop on critical errors |
| 156 | if context.errors and not context.metadata.get("continue_on_error"): |
| 157 | break |
| 158 | |
| 159 | return context |
| 160 | |
| 161 | |
| 162 | # Usage |
| 163 | pipeline = Pipeline([ |
| 164 | DataExtractor(), |
| 165 | DataTransformer(), |
| 166 | DataValidator() |
| 167 | ]) |
| 168 | |
| 169 | result = pipeline.run("Hello World\nThis is a test\nMultiple lines here") |
| 170 | print(f"Valid: {result.metadata['valid']}") |
| 171 | print(f"Final data: {result.data}") |
| 172 | |
Pattern 4: Parallel Specialists
Multiple specialized agents work simultaneously:
| 1 | ┌───────────┐ |
| 2 | │ Task │ |
| 3 | │ Router │ |
| 4 | └─────┬─────┘ |
| 5 | │ |
| 6 | ┌────────────┼────────────┐ |
| 7 | │ │ │ |
| 8 | ▼ ▼ ▼ |
| 9 | ┌─────────┐ ┌─────────┐ ┌─────────┐ |
| 10 | │ Math │ │ Code │ │Research │ |
| 11 | │ Expert │ │ Expert │ │ Expert │ |
| 12 | └────┬────┘ └────┬────┘ └────┬────┘ |
| 13 | │ │ │ |
| 14 | └────────────┼────────────┘ |
| 15 | │ |
| 16 | ┌─────▼─────┐ |
| 17 | │ Result │ |
| 18 | │ Merger │ |
| 19 | └───────────┘ |
| 20 | |
Implementation:
| 1 | from hopx import Sandbox |
| 2 | import concurrent.futures |
| 3 | import openai |
| 4 | |
| 5 | class ParallelAgentSystem: |
| 6 | def __init__(self): |
| 7 | self.client = openai.OpenAI() |
| 8 | self.specialists = { |
| 9 | "math": MathExpert(), |
| 10 | "code": CodeExpert(), |
| 11 | "research": ResearchExpert() |
| 12 | } |
| 13 | |
| 14 | def solve(self, task: str) -> str: |
| 15 | # Route task to relevant specialists |
| 16 | relevant = self._identify_specialists(task) |
| 17 | |
| 18 | # Execute in parallel |
| 19 | with concurrent.futures.ThreadPoolExecutor(max_workers=len(relevant)) as executor: |
| 20 | futures = { |
| 21 | executor.submit(self.specialists[name].solve, task): name |
| 22 | for name in relevant |
| 23 | } |
| 24 | |
| 25 | results = {} |
| 26 | for future in concurrent.futures.as_completed(futures): |
| 27 | name = futures[future] |
| 28 | results[name] = future.result() |
| 29 | |
| 30 | # Merge results |
| 31 | return self._merge_results(task, results) |
| 32 | |
| 33 | def _identify_specialists(self, task: str) -> list[str]: |
| 34 | """Determine which specialists are needed""" |
| 35 | response = self.client.chat.completions.create( |
| 36 | model="gpt-4o", |
| 37 | messages=[{ |
| 38 | "role": "system", |
| 39 | "content": """Identify which specialists are needed. |
| 40 | Available: math, code, research |
| 41 | Return JSON: {"specialists": ["math", "code"]}""" |
| 42 | }, { |
| 43 | "role": "user", |
| 44 | "content": task |
| 45 | }], |
| 46 | response_format={"type": "json_object"} |
| 47 | ) |
| 48 | return json.loads(response.choices[0].message.content)["specialists"] |
| 49 | |
| 50 | def _merge_results(self, task: str, results: dict) -> str: |
| 51 | """Combine results from all specialists""" |
| 52 | response = self.client.chat.completions.create( |
| 53 | model="gpt-4o", |
| 54 | messages=[{ |
| 55 | "role": "system", |
| 56 | "content": "Synthesize the results from multiple specialists into a coherent answer." |
| 57 | }, { |
| 58 | "role": "user", |
| 59 | "content": f"Task: {task}\n\nResults:\n{json.dumps(results, indent=2)}" |
| 60 | }] |
| 61 | ) |
| 62 | return response.choices[0].message.content |
| 63 | |
| 64 | |
| 65 | class MathExpert: |
| 66 | def solve(self, task: str) -> dict: |
| 67 | sandbox = Sandbox.create(template="code-interpreter") |
| 68 | |
| 69 | try: |
| 70 | # Use symbolic math |
| 71 | sandbox.commands.run("pip install sympy -q") |
| 72 | |
| 73 | code = f''' |
| 74 | import sympy as sp |
| 75 | from sympy import symbols, solve, simplify, diff, integrate |
| 76 | |
| 77 | # Math computation based on task |
| 78 | x, y, z = symbols('x y z') |
| 79 | |
| 80 | # Example: solve equations, compute derivatives, etc. |
| 81 | # This would be generated based on the specific math task |
| 82 | |
| 83 | result = {{"computed": True, "method": "symbolic"}} |
| 84 | print(result) |
| 85 | ''' |
| 86 | sandbox.files.write("/app/math.py", code) |
| 87 | result = sandbox.commands.run("cd /app && python math.py") |
| 88 | |
| 89 | return {"output": result.stdout, "type": "mathematical"} |
| 90 | finally: |
| 91 | sandbox.kill() |
| 92 | |
| 93 | |
| 94 | class CodeExpert: |
| 95 | def solve(self, task: str) -> dict: |
| 96 | sandbox = Sandbox.create(template="code-interpreter") |
| 97 | |
| 98 | try: |
| 99 | # Generate and test code solution |
| 100 | response = openai.chat.completions.create( |
| 101 | model="gpt-4o", |
| 102 | messages=[{ |
| 103 | "role": "system", |
| 104 | "content": "Write Python code to solve this problem." |
| 105 | }, { |
| 106 | "role": "user", |
| 107 | "content": task |
| 108 | }] |
| 109 | ) |
| 110 | |
| 111 | code = extract_code(response.choices[0].message.content) |
| 112 | sandbox.files.write("/app/solution.py", code) |
| 113 | result = sandbox.commands.run("cd /app && python solution.py") |
| 114 | |
| 115 | return { |
| 116 | "code": code, |
| 117 | "output": result.stdout, |
| 118 | "type": "code_solution" |
| 119 | } |
| 120 | finally: |
| 121 | sandbox.kill() |
| 122 | |
| 123 | |
| 124 | class ResearchExpert: |
| 125 | def solve(self, task: str) -> dict: |
| 126 | # Research doesn't need sandbox |
| 127 | response = openai.chat.completions.create( |
| 128 | model="gpt-4o", |
| 129 | messages=[{ |
| 130 | "role": "system", |
| 131 | "content": "Provide research insights and background information." |
| 132 | }, { |
| 133 | "role": "user", |
| 134 | "content": task |
| 135 | }] |
| 136 | ) |
| 137 | |
| 138 | return { |
| 139 | "research": response.choices[0].message.content, |
| 140 | "type": "research" |
| 141 | } |
| 142 | |
| 143 | |
| 144 | # Usage |
| 145 | system = ParallelAgentSystem() |
| 146 | answer = system.solve( |
| 147 | "Calculate the area under the curve y=x^2 from 0 to 5, " |
| 148 | "implement a numerical integration in Python, " |
| 149 | "and explain the mathematical theory behind integration." |
| 150 | ) |
| 151 | |
Agent Communication
Shared Memory via Files
Agents can communicate through files in a shared sandbox:
| 1 | from hopx import Sandbox |
| 2 | |
| 3 | # Create shared sandbox for communication |
| 4 | shared_sandbox = Sandbox.create(template="code-interpreter") |
| 5 | |
| 6 | class Agent: |
| 7 | def __init__(self, name: str, shared: Sandbox): |
| 8 | self.name = name |
| 9 | self.shared = shared |
| 10 | self.mailbox = f"/comm/{name}" |
| 11 | shared.commands.run(f"mkdir -p {self.mailbox}") |
| 12 | |
| 13 | def send(self, to: str, message: dict): |
| 14 | """Send message to another agent""" |
| 15 | import time |
| 16 | msg_id = f"{time.time()}" |
| 17 | path = f"/comm/{to}/{msg_id}.json" |
| 18 | self.shared.files.write(path, json.dumps({ |
| 19 | "from": self.name, |
| 20 | "message": message, |
| 21 | "timestamp": msg_id |
| 22 | })) |
| 23 | |
| 24 | def receive(self) -> list[dict]: |
| 25 | """Receive all pending messages""" |
| 26 | messages = [] |
| 27 | result = self.shared.commands.run(f"ls {self.mailbox}/*.json 2>/dev/null || true") |
| 28 | |
| 29 | for path in result.stdout.strip().split('\n'): |
| 30 | if path: |
| 31 | content = self.shared.files.read(path) |
| 32 | messages.append(json.loads(content)) |
| 33 | self.shared.commands.run(f"rm {path}") |
| 34 | |
| 35 | return messages |
| 36 | |
Event-Based Communication
| 1 | import asyncio |
| 2 | from hopx import Sandbox |
| 3 | |
| 4 | class EventBus: |
| 5 | def __init__(self): |
| 6 | self.subscribers = {} |
| 7 | |
| 8 | def subscribe(self, event_type: str, callback): |
| 9 | if event_type not in self.subscribers: |
| 10 | self.subscribers[event_type] = [] |
| 11 | self.subscribers[event_type].append(callback) |
| 12 | |
| 13 | async def publish(self, event_type: str, data: dict): |
| 14 | if event_type in self.subscribers: |
| 15 | for callback in self.subscribers[event_type]: |
| 16 | await callback(data) |
| 17 | |
| 18 | |
| 19 | class ReactiveAgent: |
| 20 | def __init__(self, name: str, bus: EventBus): |
| 21 | self.name = name |
| 22 | self.bus = bus |
| 23 | self.sandbox = None |
| 24 | |
| 25 | async def start(self): |
| 26 | self.sandbox = Sandbox.create(template="code-interpreter") |
| 27 | |
| 28 | # Subscribe to relevant events |
| 29 | self.bus.subscribe("task_available", self.on_task) |
| 30 | self.bus.subscribe("data_ready", self.on_data) |
| 31 | |
| 32 | async def on_task(self, data: dict): |
| 33 | if data["type"] == self.name: |
| 34 | result = await self.process(data) |
| 35 | await self.bus.publish("task_complete", { |
| 36 | "agent": self.name, |
| 37 | "result": result |
| 38 | }) |
| 39 | |
| 40 | async def process(self, task: dict) -> dict: |
| 41 | # Execute in sandbox |
| 42 | result = self.sandbox.commands.run(task["command"]) |
| 43 | return {"output": result.stdout} |
| 44 | |
| 45 | |
| 46 | # Usage |
| 47 | bus = EventBus() |
| 48 | agents = [ |
| 49 | ReactiveAgent("processor", bus), |
| 50 | ReactiveAgent("validator", bus) |
| 51 | ] |
| 52 | |
| 53 | async def main(): |
| 54 | for agent in agents: |
| 55 | await agent.start() |
| 56 | |
| 57 | # Publish a task |
| 58 | await bus.publish("task_available", { |
| 59 | "type": "processor", |
| 60 | "command": "echo 'Processing...'" |
| 61 | }) |
| 62 | |
Best Practices
1. Isolate Each Agent
Each agent should have its own sandbox:
| 1 | class IsolatedAgent: |
| 2 | def __init__(self): |
| 3 | self.sandbox = None |
| 4 | |
| 5 | def __enter__(self): |
| 6 | self.sandbox = Sandbox.create(template="code-interpreter") |
| 7 | return self |
| 8 | |
| 9 | def __exit__(self, *args): |
| 10 | if self.sandbox: |
| 11 | self.sandbox.kill() |
| 12 | |
2. Implement Timeouts
Prevent runaway agents:
| 1 | import signal |
| 2 | |
| 3 | class TimeoutAgent: |
| 4 | def execute(self, task: str, timeout: int = 30): |
| 5 | sandbox = Sandbox.create(template="code-interpreter") |
| 6 | |
| 7 | try: |
| 8 | result = sandbox.commands.run( |
| 9 | task, |
| 10 | timeout=timeout |
| 11 | ) |
| 12 | return result |
| 13 | except TimeoutError: |
| 14 | sandbox.kill() # Force cleanup |
| 15 | raise |
| 16 | |
3. Log Agent Activities
Track what agents do:
| 1 | import logging |
| 2 | |
| 3 | logging.basicConfig(level=logging.INFO) |
| 4 | logger = logging.getLogger("multi_agent") |
| 5 | |
| 6 | class LoggedAgent: |
| 7 | def execute(self, task: str): |
| 8 | logger.info(f"Agent {self.name} starting task: {task[:50]}...") |
| 9 | |
| 10 | result = self._do_execute(task) |
| 11 | |
| 12 | logger.info(f"Agent {self.name} completed. Exit code: {result.exit_code}") |
| 13 | return result |
| 14 | |
4. Handle Failures Gracefully
| 1 | class ResilientAgentSystem: |
| 2 | def execute_with_retry(self, agent, task, max_retries=3): |
| 3 | for attempt in range(max_retries): |
| 4 | try: |
| 5 | return agent.execute(task) |
| 6 | except Exception as e: |
| 7 | logger.warning(f"Attempt {attempt + 1} failed: {e}") |
| 8 | if attempt == max_retries - 1: |
| 9 | raise |
| 10 | time.sleep(2 ** attempt) # Exponential backoff |
| 11 | |
Conclusion
Multi-agent architectures unlock powerful capabilities:
- Specialization - Each agent masters its domain
- Parallelism - Execute tasks simultaneously
- Resilience - Failure in one agent doesn't break the system
- Scalability - Add more agents as needed
HopX sandboxes make this secure by isolating each agent's code execution, preventing one agent from affecting another.
Start building your multi-agent system today with the patterns in this guide.