Skip to main content
Build a production-ready multi-agent system where multiple AI agents coordinate to execute code, share results, and recover from errors. This cookbook demonstrates how to orchestrate multiple agents using HopX for secure code execution.

Overview

Multi-agent systems coordinate multiple AI agents to solve complex problems. Each agent can execute code, share results with other agents, and the system aggregates outcomes. This pattern is used in advanced AI systems where agents collaborate on tasks.

Prerequisites

  • HopX API key (Get one here)
  • Python 3.8+ or Node.js 16+
  • Understanding of async programming
  • Basic knowledge of agent orchestration patterns

Architecture

┌──────────────┐
│  Orchestrator │ Coordinates agents
└──────┬───────┘

       ├──► Agent 1 ──► Sandbox 1
       ├──► Agent 2 ──► Sandbox 2
       └──► Agent 3 ──► Sandbox 3


       ┌──────────────┐
       │  Aggregator  │ Combine results
       └──────────────┘

Implementation

Step 1: Basic Multi-Agent System

Create a system that coordinates multiple agents:
from hopx_ai import Sandbox
import os
import asyncio
from typing import Dict, List, Any
from concurrent.futures import ThreadPoolExecutor

class Agent:
    def __init__(self, agent_id: str, api_key: str):
        self.agent_id = agent_id
        self.api_key = api_key
        self.sandbox = None
    
    def initialize(self):
        """Initialize agent sandbox"""
        self.sandbox = Sandbox.create(
            template="code-interpreter",
            api_key=self.api_key,
            timeout_seconds=600
        )
    
    def execute_task(self, task_code: str, context: Dict = None) -> Dict[str, Any]:
        """Execute task assigned to this agent"""
        try:
            # Set context if provided
            if context:
                for key, value in context.items():
                    self.sandbox.env.set(key, str(value))
            
            # Execute task
            result = self.sandbox.run_code(task_code, timeout=30)
            
            return {
                "agent_id": self.agent_id,
                "success": result.success,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "execution_time": result.execution_time
            }
        except Exception as e:
            return {
                "agent_id": self.agent_id,
                "success": False,
                "error": str(e)
            }
    
    def cleanup(self):
        """Clean up agent resources"""
        if self.sandbox:
            self.sandbox.kill()
            self.sandbox = None

class MultiAgentOrchestrator:
    def __init__(self, api_key: str, num_agents: int = 3):
        self.api_key = api_key
        self.agents = [
            Agent(f"agent_{i}", api_key)
            for i in range(num_agents)
        ]
    
    def initialize_agents(self):
        """Initialize all agents"""
        for agent in self.agents:
            agent.initialize()
    
    def execute_parallel_tasks(self, tasks: List[Dict]) -> List[Dict[str, Any]]:
        """Execute tasks in parallel across agents"""
        results = []
        
        with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
            # Assign tasks to agents
            future_to_agent = {}
            for i, task in enumerate(tasks):
                agent = self.agents[i % len(self.agents)]
                future = executor.submit(agent.execute_task, task["code"], task.get("context"))
                future_to_agent[future] = (agent, task)
            
            # Collect results
            for future in future_to_agent:
                agent, task = future_to_agent[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    results.append({
                        "agent_id": agent.agent_id,
                        "success": False,
                        "error": str(e)
                    })
        
        return results
    
    def cleanup_all(self):
        """Clean up all agents"""
        for agent in self.agents:
            agent.cleanup()

# Usage
orchestrator = MultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
orchestrator.initialize_agents()

tasks = [
    {"code": "result = 2 + 2\nprint(f'Result: {result}')", "context": None},
    {"code": "result = 3 * 4\nprint(f'Result: {result}')", "context": None},
    {"code": "result = 10 / 2\nprint(f'Result: {result}')", "context": None}
]

results = orchestrator.execute_parallel_tasks(tasks)
for result in results:
    print(f"Agent {result['agent_id']}: {result.get('stdout', result.get('error'))}")

orchestrator.cleanup_all()

Step 2: Result Aggregation

Aggregate results from multiple agents:
class ResultAggregator:
    def __init__(self):
        self.results = []
    
    def add_result(self, result: Dict[str, Any]):
        """Add agent result"""
        self.results.append(result)
    
    def aggregate(self) -> Dict[str, Any]:
        """Aggregate all results"""
        total = len(self.results)
        successful = sum(1 for r in self.results if r.get("success", False))
        failed = total - successful
        
        # Extract outputs
        outputs = [r.get("stdout", "") for r in self.results if r.get("success")]
        
        # Calculate average execution time
        execution_times = [
            r.get("execution_time", 0)
            for r in self.results
            if "execution_time" in r
        ]
        avg_time = sum(execution_times) / len(execution_times) if execution_times else 0
        
        return {
            "total_agents": total,
            "successful": successful,
            "failed": failed,
            "success_rate": (successful / total * 100) if total > 0 else 0,
            "average_execution_time": avg_time,
            "outputs": outputs,
            "all_results": self.results
        }

# Usage with orchestrator
orchestrator = MultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
orchestrator.initialize_agents()

tasks = [
    {"code": "print('Agent 1 result: 10')"},
    {"code": "print('Agent 2 result: 20')"},
    {"code": "print('Agent 3 result: 30')"}
]

results = orchestrator.execute_parallel_tasks(tasks)

aggregator = ResultAggregator()
for result in results:
    aggregator.add_result(result)

summary = aggregator.aggregate()
print(f"Success rate: {summary['success_rate']:.1f}%")
print(f"Average execution time: {summary['average_execution_time']:.2f}s")

orchestrator.cleanup_all()

Step 3: Error Recovery Patterns

Implement error recovery for failed agents:
class ResilientMultiAgentOrchestrator(MultiAgentOrchestrator):
    def execute_with_retry(self, task: Dict, max_retries: int = 3) -> Dict[str, Any]:
        """Execute task with retry logic"""
        agent = self.agents[0]  # Use first available agent
        
        for attempt in range(max_retries):
            result = agent.execute_task(task["code"], task.get("context"))
            
            if result.get("success"):
                return result
            
            # Wait before retry
            import time
            time.sleep(1)
        
        return {
            "agent_id": agent.agent_id,
            "success": False,
            "error": f"Failed after {max_retries} attempts"
        }
    
    def execute_with_fallback(self, tasks: List[Dict]) -> List[Dict[str, Any]]:
        """Execute tasks with fallback agents"""
        results = []
        
        for task in tasks:
            # Try primary agent
            primary_result = self.agents[0].execute_task(task["code"], task.get("context"))
            
            if primary_result.get("success"):
                results.append(primary_result)
            else:
                # Fallback to secondary agent
                fallback_result = self.agents[1].execute_task(task["code"], task.get("context"))
                results.append({
                    **fallback_result,
                    "used_fallback": True,
                    "primary_failed": True
                })
        
        return results

# Usage
orchestrator = ResilientMultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
orchestrator.initialize_agents()

# Task that might fail
task = {"code": "result = 1 / 0\nprint(result)"}  # Will fail

# Execute with retry
result = orchestrator.execute_with_retry(task, max_retries=3)
print(f"Result after retry: {result}")

orchestrator.cleanup_all()

Best Practices

Coordination

Use a message queue or shared state system for agents to communicate results and coordinate tasks.
  1. Task Distribution: Distribute tasks evenly across agents
  2. State Sharing: Share necessary state between agents
  3. Result Aggregation: Aggregate results efficiently
  4. Error Handling: Handle agent failures gracefully

Performance

  1. Parallel Execution: Execute independent tasks in parallel
  2. Resource Management: Monitor and limit resource usage
  3. Caching: Cache common results between agents
  4. Load Balancing: Balance load across agents

Reliability

  1. Retry Logic: Implement retry for transient failures
  2. Fallback Agents: Use fallback agents when primary fails
  3. Health Checks: Monitor agent health
  4. Graceful Degradation: Continue with available agents

Real-World Examples

This pattern is used by:
  • AutoGPT: Multi-agent AI system
  • LangChain Multi-Agent: Agent orchestration framework
  • CrewAI: Multi-agent orchestration platform

Next Steps

  1. Implement agent communication protocol
  2. Add task queue for agent coordination
  3. Create agent health monitoring
  4. Implement dynamic agent scaling
  5. Add result persistence