Back to Blog

CrewAI Multi-Agent Pipelines with Secure Code Execution

TutorialsAlin Dobra13 min read

CrewAI Multi-Agent Pipelines with Secure Code Execution

CrewAI makes building multi-agent systems surprisingly simple. Define agents with roles, give them tools, and let them collaborate. But when those agents need to execute code, you face a choice: run it unsafely on your host, or set up complex containerization.

HopX sandboxes give you a third option: instant, isolated execution that works seamlessly with CrewAI's tool system.

What We're Building

A research crew that:

  1. Researcher Agent - Gathers information and identifies analysis needs
  2. Data Analyst Agent - Writes and executes Python code for analysis
  3. Report Writer Agent - Synthesizes findings into a report
text
1
2
                         CrewAI Crew                              
3
                                                                  
4
                 
5
    Researcher   Data Analyst Report Writer        
6
                 
7
                                                               
8
                                                               
9
                                                               
10
                                
11
     Search            HopX               File             
12
      Tool           Sandbox              Tool             
13
                                
14
                                                                 
15
                   Isolated Code Execution                        
16
17
 

Prerequisites

bash
1
pip install crewai crewai-tools hopx-ai
2
 

Set environment variables:

bash
1
export OPENAI_API_KEY="sk-..."
2
export HOPX_API_KEY="..."
3
 

Step 1: Create Secure Code Execution Tools

First, build CrewAI-compatible tools that execute code in HopX:

python
1
from crewai.tools import BaseTool
2
from hopx import Sandbox
3
from pydantic import BaseModel, Field
4
from typing import Type, Optional
5
 
6
class PythonCodeInput(BaseModel):
7
    """Input schema for Python execution."""
8
    code: str = Field(description="Python code to execute")
9
 
10
class PythonExecutorTool(BaseTool):
11
    """Execute Python code in a secure HopX sandbox."""
12
    
13
    name: str = "python_executor"
14
    description: str = """Execute Python code in an isolated sandbox.
15
    
16
Use this tool to:
17
- Perform calculations and data analysis
18
- Process and transform data with pandas
19
- Create visualizations with matplotlib
20
- Run any Python computation
21
 
22
The sandbox has pandas, numpy, matplotlib, seaborn, scikit-learn installed.
23
Always print() results you want to see.
24
For charts, use plt.savefig('/app/chart.png') then mention the file.
25
"""
26
    args_schema: Type[BaseModel] = PythonCodeInput
27
    
28
    def _run(self, code: str) -> str:
29
        """Execute code in sandbox."""
30
        sandbox = None
31
        try:
32
            sandbox = Sandbox.create(template="code-interpreter")
33
            result = sandbox.runCode(code, language="python", timeout=60)
34
            
35
            if result.exitCode == 0:
36
                output = result.stdout or "Code executed successfully (no output)"
37
                return f"✅ Success:\n{output}"
38
            else:
39
                return f"❌ Error:\n{result.stderr}"
40
                
41
        except Exception as e:
42
            return f"❌ Sandbox error: {str(e)}"
43
        finally:
44
            if sandbox:
45
                sandbox.kill()
46
 
47
 
48
class BashCommandInput(BaseModel):
49
    """Input schema for bash commands."""
50
    command: str = Field(description="Bash command to execute")
51
 
52
class BashExecutorTool(BaseTool):
53
    """Execute bash commands in a secure sandbox."""
54
    
55
    name: str = "bash_executor"
56
    description: str = """Execute bash/shell commands in an isolated sandbox.
57
    
58
Use for:
59
- File operations (ls, cat, head, tail)
60
- Installing packages (pip install)
61
- System commands
62
"""
63
    args_schema: Type[BaseModel] = BashCommandInput
64
    
65
    def _run(self, command: str) -> str:
66
        sandbox = None
67
        try:
68
            sandbox = Sandbox.create(template="code-interpreter")
69
            result = sandbox.runCode(command, language="bash", timeout=60)
70
            
71
            if result.exitCode == 0:
72
                return f"✅ Success:\n{result.stdout}"
73
            else:
74
                return f"❌ Error (exit {result.exitCode}):\n{result.stderr}"
75
        except Exception as e:
76
            return f"❌ Error: {str(e)}"
77
        finally:
78
            if sandbox:
79
                sandbox.kill()
80
 

Step 2: Create a Persistent Sandbox Tool

For multi-step analysis, agents need to share state. Create a tool with persistent sandbox:

python
1
from crewai.tools import BaseTool
2
from hopx import Sandbox
3
from pydantic import BaseModel, Field
4
from typing import Type, Optional, ClassVar
5
import threading
6
 
7
class SharedSandboxManager:
8
    """Singleton manager for shared sandbox across agents."""
9
    
10
    _instance: Optional['SharedSandboxManager'] = None
11
    _lock: ClassVar[threading.Lock] = threading.Lock()
12
    
13
    def __init__(self):
14
        self.sandbox: Optional[Sandbox] = None
15
        self.ttl = 600  # 10 minutes
16
    
17
    @classmethod
18
    def get_instance(cls) -> 'SharedSandboxManager':
19
        if cls._instance is None:
20
            with cls._lock:
21
                if cls._instance is None:
22
                    cls._instance = cls()
23
        return cls._instance
24
    
25
    def get_sandbox(self) -> Sandbox:
26
        if self.sandbox is None:
27
            self.sandbox = Sandbox.create(
28
                template="code-interpreter",
29
                ttl=self.ttl
30
            )
31
        return self.sandbox
32
    
33
    def cleanup(self):
34
        if self.sandbox:
35
            try:
36
                self.sandbox.kill()
37
            except:
38
                pass
39
            self.sandbox = None
40
 
41
 
42
class SharedPythonInput(BaseModel):
43
    code: str = Field(description="Python code to execute")
44
 
45
class SharedPythonTool(BaseTool):
46
    """Execute Python with persistent state across agents."""
47
    
48
    name: str = "shared_python"
49
    description: str = """Execute Python code with PERSISTENT STATE.
50
    
51
Variables, imports, and data persist between calls.
52
Use this when you need to build on previous computations.
53
All agents share the same execution environment.
54
 
55
Example workflow:
56
1. First call: import pandas; df = pd.read_csv('data.csv')
57
2. Second call: df['new_col'] = df['a'] * 2  # df still exists!
58
3. Third call: print(df.describe())  # works!
59
"""
60
    args_schema: Type[BaseModel] = SharedPythonInput
61
    
62
    def _run(self, code: str) -> str:
63
        manager = SharedSandboxManager.get_instance()
64
        
65
        try:
66
            sandbox = manager.get_sandbox()
67
            result = sandbox.runCode(code, language="python", timeout=60)
68
            
69
            if result.exitCode == 0:
70
                return result.stdout or "Executed (no output)"
71
            else:
72
                return f"Error: {result.stderr}"
73
        except Exception as e:
74
            manager.cleanup()
75
            return f"Sandbox error: {str(e)}"
76
 

Step 3: Define Your Agents

Create specialized agents with their tools:

python
1
from crewai import Agent, Task, Crew, Process
2
 
3
# Initialize tools
4
python_tool = PythonExecutorTool()
5
bash_tool = BashExecutorTool()
6
shared_python = SharedPythonTool()
7
 
8
# Research Agent - Gathers information
9
researcher = Agent(
10
    role="Senior Research Analyst",
11
    goal="Identify key data points and analysis requirements",
12
    backstory="""You are an experienced research analyst who excels at 
13
    breaking down complex problems into specific, measurable questions.
14
    You identify what data is needed and what analyses will provide insights.""",
15
    tools=[],  # Researcher doesn't need code execution
16
    verbose=True,
17
    allow_delegation=False
18
)
19
 
20
# Data Analyst Agent - Runs analysis
21
data_analyst = Agent(
22
    role="Senior Data Analyst",
23
    goal="Analyze data using Python to extract meaningful insights",
24
    backstory="""You are a skilled data analyst proficient in Python, pandas, 
25
    and statistical analysis. You write clean, efficient code to analyze data 
26
    and create visualizations. You always verify your results.""",
27
    tools=[shared_python, bash_tool],
28
    verbose=True,
29
    allow_delegation=False
30
)
31
 
32
# Report Writer Agent - Creates final report
33
report_writer = Agent(
34
    role="Technical Report Writer",
35
    goal="Create clear, actionable reports from analysis results",
36
    backstory="""You excel at translating complex technical findings into 
37
    clear, well-structured reports. You highlight key insights and provide 
38
    actionable recommendations.""",
39
    tools=[shared_python],  # Can run code to format/verify data
40
    verbose=True,
41
    allow_delegation=False
42
)
43
 

Step 4: Define Tasks

Create tasks that flow between agents:

python
1
# Task 1: Research
2
research_task = Task(
3
    description="""Analyze the requirements for understanding sales performance.
4
    
5
    We have sales data with columns: date, product, region, quantity, revenue.
6
    
7
    Identify:
8
    1. Key metrics to calculate (totals, averages, trends)
9
    2. Important comparisons (by product, by region, over time)
10
    3. Specific questions the analysis should answer
11
    
12
    Output a structured analysis plan.""",
13
    expected_output="A detailed analysis plan with specific metrics and questions",
14
    agent=researcher
15
)
16
 
17
# Task 2: Data Analysis
18
analysis_task = Task(
19
    description="""Execute the analysis plan using Python.
20
    
21
    First, create sample sales data that matches the description:
22
    - 1000 rows of sales data
23
    - Columns: date, product, region, quantity, revenue
24
    - Date range: 2024-01-01 to 2024-12-31
25
    - Products: ["Laptop", "Phone", "Tablet", "Watch", "Headphones"]
26
    - Regions: ["North", "South", "East", "West"]
27
    
28
    Then perform the analysis:
29
    1. Calculate total revenue and quantity by product
30
    2. Calculate average order value by region
31
    3. Identify monthly trends
32
    4. Find top performing product-region combinations
33
    5. Calculate month-over-month growth rates
34
    
35
    Use the shared_python tool to maintain state between code executions.
36
    Print all results clearly with labels.""",
37
    expected_output="Complete analysis results with all calculated metrics",
38
    agent=data_analyst,
39
    context=[research_task]  # Depends on research
40
)
41
 
42
# Task 3: Report Writing
43
report_task = Task(
44
    description="""Create a comprehensive report based on the analysis.
45
    
46
    Structure:
47
    1. Executive Summary (key findings in 3-4 bullet points)
48
    2. Methodology (how data was analyzed)
49
    3. Key Findings (detailed results with numbers)
50
    4. Recommendations (3-5 actionable items)
51
    5. Appendix (any additional details)
52
    
53
    Use the shared_python tool if you need to access or verify any data.
54
    Format the report in clean markdown.""",
55
    expected_output="A well-structured markdown report with all sections",
56
    agent=report_writer,
57
    context=[analysis_task]  # Depends on analysis
58
)
59
 

Step 5: Run the Crew

Assemble and execute:

python
1
# Create the crew
2
sales_analysis_crew = Crew(
3
    agents=[researcher, data_analyst, report_writer],
4
    tasks=[research_task, analysis_task, report_task],
5
    process=Process.sequential,  # Tasks run in order
6
    verbose=True
7
)
8
 
9
# Run it!
10
if __name__ == "__main__":
11
    try:
12
        result = sales_analysis_crew.kickoff()
13
        print("\n" + "="*60)
14
        print("FINAL REPORT")
15
        print("="*60)
16
        print(result)
17
    finally:
18
        # Clean up sandbox
19
        SharedSandboxManager.get_instance().cleanup()
20
 

Advanced: Hierarchical Crew with Manager

For complex workflows, use a manager agent to coordinate:

python
1
from crewai import Agent, Task, Crew, Process
2
 
3
# Manager Agent
4
manager = Agent(
5
    role="Project Manager",
6
    goal="Coordinate the team to deliver high-quality analysis",
7
    backstory="""You are an experienced project manager who ensures 
8
    deliverables are on time and meet quality standards. You delegate 
9
    effectively and provide clear guidance.""",
10
    allow_delegation=True,
11
    verbose=True
12
)
13
 
14
# Specialist Agents
15
python_expert = Agent(
16
    role="Python Developer",
17
    goal="Write efficient, well-documented Python code",
18
    backstory="Expert Python developer with 10 years of experience.",
19
    tools=[shared_python, bash_tool],
20
    verbose=True
21
)
22
 
23
data_scientist = Agent(
24
    role="Data Scientist", 
25
    goal="Apply statistical methods and ML to extract insights",
26
    backstory="PhD in Statistics with expertise in ML and data analysis.",
27
    tools=[shared_python],
28
    verbose=True
29
)
30
 
31
visualizer = Agent(
32
    role="Data Visualization Expert",
33
    goal="Create clear, impactful visualizations",
34
    backstory="Expert in matplotlib, seaborn, and data storytelling.",
35
    tools=[shared_python],
36
    verbose=True
37
)
38
 
39
# Create hierarchical crew
40
hierarchical_crew = Crew(
41
    agents=[python_expert, data_scientist, visualizer],
42
    tasks=[...],  # Define tasks
43
    manager_agent=manager,
44
    process=Process.hierarchical,
45
    verbose=True
46
)
47
 

Parallel Agent Execution

For independent tasks, run agents in parallel:

python
1
from crewai import Crew, Process
2
import asyncio
3
 
4
# Define independent analysis tasks
5
north_analysis = Task(
6
    description="Analyze North region sales data",
7
    agent=data_analyst,
8
    expected_output="North region analysis"
9
)
10
 
11
south_analysis = Task(
12
    description="Analyze South region sales data", 
13
    agent=data_analyst,
14
    expected_output="South region analysis"
15
)
16
 
17
east_analysis = Task(
18
    description="Analyze East region sales data",
19
    agent=data_analyst,
20
    expected_output="East region analysis"
21
)
22
 
23
west_analysis = Task(
24
    description="Analyze West region sales data",
25
    agent=data_analyst,
26
    expected_output="West region analysis"
27
)
28
 
29
# Consolidation task (runs after all parallel tasks)
30
consolidate_task = Task(
31
    description="Consolidate all regional analyses into final report",
32
    agent=report_writer,
33
    expected_output="Consolidated report",
34
    context=[north_analysis, south_analysis, east_analysis, west_analysis]
35
)
36
 
37
# Parallel crew
38
parallel_crew = Crew(
39
    agents=[data_analyst, report_writer],
40
    tasks=[north_analysis, south_analysis, east_analysis, west_analysis, consolidate_task],
41
    process=Process.sequential,  # CrewAI handles parallelization internally
42
    verbose=True
43
)
44
 

Tool with File Upload Support

Enable agents to work with uploaded files:

python
1
from crewai.tools import BaseTool
2
from hopx import Sandbox
3
from pydantic import BaseModel, Field
4
from typing import Type
5
import base64
6
 
7
class FileAnalysisInput(BaseModel):
8
    file_content: str = Field(description="Base64 encoded file content")
9
    filename: str = Field(description="Name of the file")
10
    analysis_code: str = Field(description="Python code to analyze the file")
11
 
12
class FileAnalysisTool(BaseTool):
13
    """Upload and analyze files in sandbox."""
14
    
15
    name: str = "file_analyzer"
16
    description: str = """Upload a file and analyze it with Python code.
17
    
18
Provide:
19
1. file_content: Base64 encoded file data
20
2. filename: Name to save as (e.g., 'data.csv')
21
3. analysis_code: Python code to analyze (file is at /app/{filename})
22
"""
23
    args_schema: Type[BaseModel] = FileAnalysisInput
24
    
25
    def _run(self, file_content: str, filename: str, analysis_code: str) -> str:
26
        sandbox = None
27
        try:
28
            sandbox = Sandbox.create(template="code-interpreter")
29
            
30
            # Decode and upload file
31
            file_bytes = base64.b64decode(file_content)
32
            sandbox.files.write(f"/app/{filename}", file_bytes)
33
            
34
            # Run analysis
35
            result = sandbox.runCode(analysis_code, language="python", timeout=120)
36
            
37
            if result.exitCode == 0:
38
                return f"✅ Analysis complete:\n{result.stdout}"
39
            else:
40
                return f"❌ Error:\n{result.stderr}"
41
                
42
        except Exception as e:
43
            return f"❌ Error: {str(e)}"
44
        finally:
45
            if sandbox:
46
                sandbox.kill()
47
 

Error Handling in Multi-Agent Systems

Robust error handling for production:

python
1
from crewai.tools import BaseTool
2
from hopx import Sandbox
3
import time
4
 
5
class RobustPythonTool(BaseTool):
6
    """Python execution with retry and error recovery."""
7
    
8
    name: str = "robust_python"
9
    description: str = "Execute Python with automatic error recovery"
10
    
11
    max_retries: int = 3
12
    retry_delay: float = 2.0
13
    
14
    def _run(self, code: str) -> str:
15
        last_error = None
16
        
17
        for attempt in range(self.max_retries):
18
            sandbox = None
19
            try:
20
                sandbox = Sandbox.create(template="code-interpreter")
21
                result = sandbox.runCode(code, language="python", timeout=60)
22
                
23
                if result.exitCode == 0:
24
                    return result.stdout or "Success"
25
                else:
26
                    # Syntax/runtime error - don't retry, let agent fix
27
                    return f"Code error:\n{result.stderr}"
28
                    
29
            except Exception as e:
30
                last_error = str(e)
31
                if attempt < self.max_retries - 1:
32
                    time.sleep(self.retry_delay)
33
            finally:
34
                if sandbox:
35
                    try:
36
                        sandbox.kill()
37
                    except:
38
                        pass
39
        
40
        return f"Sandbox failed after {self.max_retries} attempts: {last_error}"
41
 

Complete Example: Market Research Crew

Here's a production-ready example:

python
1
"""
2
Market Research Crew with Secure Code Execution
3
"""
4
 
5
from crewai import Agent, Task, Crew, Process
6
from crewai.tools import BaseTool
7
from hopx import Sandbox
8
from pydantic import BaseModel, Field
9
from typing import Type, Optional
10
import os
11
 
12
# Ensure API keys
13
assert os.environ.get("OPENAI_API_KEY"), "Set OPENAI_API_KEY"
14
assert os.environ.get("HOPX_API_KEY"), "Set HOPX_API_KEY"
15
 
16
 
17
class SandboxManager:
18
    """Manage shared sandbox for crew."""
19
    _sandbox: Optional[Sandbox] = None
20
    
21
    @classmethod
22
    def get(cls) -> Sandbox:
23
        if cls._sandbox is None:
24
            cls._sandbox = Sandbox.create(template="code-interpreter", ttl=600)
25
        return cls._sandbox
26
    
27
    @classmethod
28
    def cleanup(cls):
29
        if cls._sandbox:
30
            cls._sandbox.kill()
31
            cls._sandbox = None
32
 
33
 
34
class CodeInput(BaseModel):
35
    code: str = Field(description="Python code to execute")
36
 
37
 
38
class AnalysisTool(BaseTool):
39
    name: str = "analyze"
40
    description: str = "Execute Python for data analysis. State persists."
41
    args_schema: Type[BaseModel] = CodeInput
42
    
43
    def _run(self, code: str) -> str:
44
        try:
45
            result = SandboxManager.get().runCode(code, language="python", timeout=60)
46
            return result.stdout if result.exitCode == 0 else f"Error: {result.stderr}"
47
        except Exception as e:
48
            return f"Error: {e}"
49
 
50
 
51
def create_market_research_crew():
52
    """Create a crew for market research analysis."""
53
    
54
    tool = AnalysisTool()
55
    
56
    # Agents
57
    market_analyst = Agent(
58
        role="Market Research Analyst",
59
        goal="Analyze market data to identify trends and opportunities",
60
        backstory="15 years of experience in market research and competitive analysis.",
61
        tools=[tool],
62
        verbose=True
63
    )
64
    
65
    quant_analyst = Agent(
66
        role="Quantitative Analyst",
67
        goal="Apply statistical methods to validate market insights",
68
        backstory="Former hedge fund quant with expertise in statistical modeling.",
69
        tools=[tool],
70
        verbose=True
71
    )
72
    
73
    strategist = Agent(
74
        role="Business Strategist",
75
        goal="Translate analysis into actionable business strategies",
76
        backstory="MBA from Wharton, 10 years in strategy consulting.",
77
        tools=[tool],
78
        verbose=True
79
    )
80
    
81
    # Tasks
82
    data_collection = Task(
83
        description="""Create a synthetic market dataset for analysis:
84
        
85
        Generate data with:
86
        - 500 companies
87
        - Columns: company, sector, revenue, growth_rate, market_share, employees, founded_year
88
        - Sectors: Tech, Healthcare, Finance, Retail, Manufacturing
89
        - Realistic distributions
90
        
91
        Save to /app/market_data.csv and show summary stats.""",
92
        expected_output="Dataset created with summary statistics",
93
        agent=market_analyst
94
    )
95
    
96
    statistical_analysis = Task(
97
        description="""Perform statistical analysis on market_data.csv:
98
        
99
        1. Correlation analysis between variables
100
        2. Sector comparison (ANOVA for revenue differences)
101
        3. Growth rate distribution analysis
102
        4. Identify statistical outliers
103
        5. Regression: what predicts growth_rate?
104
        
105
        Print all statistical results with interpretations.""",
106
        expected_output="Complete statistical analysis with interpretations",
107
        agent=quant_analyst,
108
        context=[data_collection]
109
    )
110
    
111
    strategic_report = Task(
112
        description="""Create strategic recommendations based on analysis:
113
        
114
        1. Executive Summary
115
        2. Key Market Insights (backed by data)
116
        3. Sector Opportunities (ranked)
117
        4. Risk Assessment
118
        5. Strategic Recommendations (5 specific actions)
119
        
120
        Reference specific numbers from the analysis.""",
121
        expected_output="Strategic report with data-backed recommendations",
122
        agent=strategist,
123
        context=[statistical_analysis]
124
    )
125
    
126
    return Crew(
127
        agents=[market_analyst, quant_analyst, strategist],
128
        tasks=[data_collection, statistical_analysis, strategic_report],
129
        process=Process.sequential,
130
        verbose=True
131
    )
132
 
133
 
134
if __name__ == "__main__":
135
    crew = create_market_research_crew()
136
    
137
    try:
138
        result = crew.kickoff()
139
        
140
        print("\n" + "="*70)
141
        print("MARKET RESEARCH REPORT")
142
        print("="*70)
143
        print(result)
144
        
145
    finally:
146
        SandboxManager.cleanup()
147
 

Best Practices

1. One Sandbox Per Crew Session

python
1
# Don't create sandbox per tool call
2
# DO: Share sandbox across the crew run
3
manager = SharedSandboxManager.get_instance()
4
 

2. Clear Tool Descriptions

python
1
# Bad: Vague
2
description = "Run Python code"
3
 
4
# Good: Specific with examples
5
description = """Execute Python for data analysis.
6
Use pandas for data manipulation: df = pd.read_csv(...)
7
Use matplotlib for charts: plt.savefig('/app/chart.png')
8
Always print() results you want to see."""
9
 

3. Task Context for Data Flow

python
1
# Ensure data flows between tasks
2
analysis_task = Task(
3
    ...,
4
    context=[data_prep_task]  # Access previous task output
5
)
6
 

4. Cleanup on Completion

python
1
try:
2
    result = crew.kickoff()
3
finally:
4
    SandboxManager.cleanup()  # Always cleanup
5
 

Conclusion

CrewAI + HopX gives you:

  • Multi-agent collaboration with specialized roles
  • Safe code execution in isolated sandboxes
  • Persistent state for complex analyses
  • Production-ready error handling and cleanup

Your agents can think, collaborate, and execute code—without putting your infrastructure at risk.


Ready to build your own AI crew? Get started with HopX for secure agent execution.

Further Reading