> ## Documentation Index
> Fetch the complete documentation index at: https://docs.hopx.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Multi-Agent Code Execution Workflow

> Coordinate multiple AI agents with code execution, parallel execution, result aggregation, and error recovery patterns

Build a production-ready multi-agent system where multiple AI agents coordinate to execute code, share results, and recover from errors. This cookbook demonstrates how to orchestrate multiple agents using HopX for secure code execution.

## Overview

Multi-agent systems coordinate multiple AI agents to solve complex problems. Each agent can execute code, share results with other agents, and the system aggregates outcomes. This pattern is used in advanced AI systems where agents collaborate on tasks.

## Prerequisites

* HopX API key ([Get one here](https://console.hopx.dev/api-keys))
* Python 3.8+ or Node.js 16+
* Understanding of async programming
* Basic knowledge of agent orchestration patterns

## Architecture

```
┌──────────────┐
│  Orchestrator │ Coordinates agents
└──────┬───────┘
       │
       ├──► Agent 1 ──► Sandbox 1
       ├──► Agent 2 ──► Sandbox 2
       └──► Agent 3 ──► Sandbox 3
              │
              ▼
       ┌──────────────┐
       │  Aggregator  │ Combine results
       └──────────────┘
```

## Implementation

### Step 1: Basic Multi-Agent System

Create a system that coordinates multiple agents:

<CodeGroup>
  ```python Python theme={null}
  from hopx_ai import Sandbox
  import os
  import asyncio
  from typing import Dict, List, Any
  from concurrent.futures import ThreadPoolExecutor

  class Agent:
      def __init__(self, agent_id: str, api_key: str):
          self.agent_id = agent_id
          self.api_key = api_key
          self.sandbox = None
      
      def initialize(self):
          """Initialize agent sandbox"""
          self.sandbox = Sandbox.create(
              template="code-interpreter",
              api_key=self.api_key,
              timeout_seconds=600
          )
      
      def execute_task(self, task_code: str, context: Dict = None) -> Dict[str, Any]:
          """Execute task assigned to this agent"""
          try:
              # Set context if provided
              if context:
                  for key, value in context.items():
                      self.sandbox.env.set(key, str(value))
              
              # Execute task
              result = self.sandbox.run_code(task_code, timeout=30)
              
              return {
                  "agent_id": self.agent_id,
                  "success": result.success,
                  "stdout": result.stdout,
                  "stderr": result.stderr,
                  "execution_time": result.execution_time
              }
          except Exception as e:
              return {
                  "agent_id": self.agent_id,
                  "success": False,
                  "error": str(e)
              }
      
      def cleanup(self):
          """Clean up agent resources"""
          if self.sandbox:
              self.sandbox.kill()
              self.sandbox = None

  class MultiAgentOrchestrator:
      def __init__(self, api_key: str, num_agents: int = 3):
          self.api_key = api_key
          self.agents = [
              Agent(f"agent_{i}", api_key)
              for i in range(num_agents)
          ]
      
      def initialize_agents(self):
          """Initialize all agents"""
          for agent in self.agents:
              agent.initialize()
      
      def execute_parallel_tasks(self, tasks: List[Dict]) -> List[Dict[str, Any]]:
          """Execute tasks in parallel across agents"""
          results = []
          
          with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
              # Assign tasks to agents
              future_to_agent = {}
              for i, task in enumerate(tasks):
                  agent = self.agents[i % len(self.agents)]
                  future = executor.submit(agent.execute_task, task["code"], task.get("context"))
                  future_to_agent[future] = (agent, task)
              
              # Collect results
              for future in future_to_agent:
                  agent, task = future_to_agent[future]
                  try:
                      result = future.result()
                      results.append(result)
                  except Exception as e:
                      results.append({
                          "agent_id": agent.agent_id,
                          "success": False,
                          "error": str(e)
                      })
          
          return results
      
      def cleanup_all(self):
          """Clean up all agents"""
          for agent in self.agents:
              agent.cleanup()

  # Usage
  orchestrator = MultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
  orchestrator.initialize_agents()

  tasks = [
      {"code": "result = 2 + 2\nprint(f'Result: {result}')", "context": None},
      {"code": "result = 3 * 4\nprint(f'Result: {result}')", "context": None},
      {"code": "result = 10 / 2\nprint(f'Result: {result}')", "context": None}
  ]

  results = orchestrator.execute_parallel_tasks(tasks)
  for result in results:
      print(f"Agent {result['agent_id']}: {result.get('stdout', result.get('error'))}")

  orchestrator.cleanup_all()
  ```

  ```javascript JavaScript theme={null}
  import { Sandbox } from '@hopx-ai/sdk';

  class Agent {
      constructor(agentId, apiKey) {
          this.agentId = agentId;
          this.apiKey = apiKey;
          this.sandbox = null;
      }
      
      async initialize() {
          this.sandbox = await Sandbox.create({
              template: 'code-interpreter',
              apiKey: this.apiKey,
              timeoutSeconds: 600
          });
      }
      
      async executeTask(taskCode, context = null) {
          try {
              // Set context if provided
              if (context) {
                  for (const [key, value] of Object.entries(context)) {
                      await this.sandbox.env.set(key, String(value));
                  }
              }
              
              // Execute task
              const result = await this.sandbox.runCode(taskCode, { timeout: 30 });
              
              return {
                  agentId: this.agentId,
                  success: result.success,
                  stdout: result.stdout,
                  stderr: result.stderr,
                  executionTime: result.execution_time
              };
          } catch (error) {
              return {
                  agentId: this.agentId,
                  success: false,
                  error: error.message
              };
          }
      }
      
      async cleanup() {
          if (this.sandbox) {
              await this.sandbox.kill();
              this.sandbox = null;
          }
      }
  }

  class MultiAgentOrchestrator {
      constructor(apiKey, numAgents = 3) {
          this.apiKey = apiKey;
          this.agents = Array.from({ length: numAgents }, (_, i) => 
              new Agent(`agent_${i}`, apiKey)
          );
      }
      
      async initializeAgents() {
          await Promise.all(this.agents.map(agent => agent.initialize()));
      }
      
      async executeParallelTasks(tasks) {
          const results = [];
          
          // Execute tasks in parallel
          const promises = tasks.map(async (task, index) => {
              const agent = this.agents[index % this.agents.length];
              return await agent.executeTask(task.code, task.context);
          });
          
          const taskResults = await Promise.all(promises);
          results.push(...taskResults);
          
          return results;
      }
      
      async cleanupAll() {
          await Promise.all(this.agents.map(agent => agent.cleanup()));
      }
  }

  // Usage
  const orchestrator = new MultiAgentOrchestrator(process.env.HOPX_API_KEY, 3);
  await orchestrator.initializeAgents();

  const tasks = [
      { code: "result = 2 + 2\nprint(f'Result: {result}')", context: null },
      { code: "result = 3 * 4\nprint(f'Result: {result}')", context: null },
      { code: "result = 10 / 2\nprint(f'Result: {result}')", context: null }
  ];

  const results = await orchestrator.executeParallelTasks(tasks);
  for (const result of results) {
      console.log(`Agent ${result.agentId}: ${result.stdout || result.error}`);
  }

  await orchestrator.cleanupAll();
  ```
</CodeGroup>

### Step 2: Result Aggregation

Aggregate results from multiple agents:

<CodeGroup>
  ```python Python theme={null}
  class ResultAggregator:
      def __init__(self):
          self.results = []
      
      def add_result(self, result: Dict[str, Any]):
          """Add agent result"""
          self.results.append(result)
      
      def aggregate(self) -> Dict[str, Any]:
          """Aggregate all results"""
          total = len(self.results)
          successful = sum(1 for r in self.results if r.get("success", False))
          failed = total - successful
          
          # Extract outputs
          outputs = [r.get("stdout", "") for r in self.results if r.get("success")]
          
          # Calculate average execution time
          execution_times = [
              r.get("execution_time", 0)
              for r in self.results
              if "execution_time" in r
          ]
          avg_time = sum(execution_times) / len(execution_times) if execution_times else 0
          
          return {
              "total_agents": total,
              "successful": successful,
              "failed": failed,
              "success_rate": (successful / total * 100) if total > 0 else 0,
              "average_execution_time": avg_time,
              "outputs": outputs,
              "all_results": self.results
          }

  # Usage with orchestrator
  orchestrator = MultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
  orchestrator.initialize_agents()

  tasks = [
      {"code": "print('Agent 1 result: 10')"},
      {"code": "print('Agent 2 result: 20')"},
      {"code": "print('Agent 3 result: 30')"}
  ]

  results = orchestrator.execute_parallel_tasks(tasks)

  aggregator = ResultAggregator()
  for result in results:
      aggregator.add_result(result)

  summary = aggregator.aggregate()
  print(f"Success rate: {summary['success_rate']:.1f}%")
  print(f"Average execution time: {summary['average_execution_time']:.2f}s")

  orchestrator.cleanup_all()
  ```

  ```javascript JavaScript theme={null}
  class ResultAggregator {
      constructor() {
          this.results = [];
      }
      
      addResult(result) {
          this.results.push(result);
      }
      
      aggregate() {
          const total = this.results.length;
          const successful = this.results.filter(r => r.success).length;
          const failed = total - successful;
          
          // Extract outputs
          const outputs = this.results
              .filter(r => r.success)
              .map(r => r.stdout || '');
          
          // Calculate average execution time
          const executionTimes = this.results
              .map(r => r.executionTime || 0)
              .filter(t => t > 0);
          const avgTime = executionTimes.length > 0
              ? executionTimes.reduce((a, b) => a + b, 0) / executionTimes.length
              : 0;
          
          return {
              totalAgents: total,
              successful,
              failed,
              successRate: total > 0 ? (successful / total * 100) : 0,
              averageExecutionTime: avgTime,
              outputs,
              allResults: this.results
          };
      }
  }

  // Usage with orchestrator
  const orchestrator = new MultiAgentOrchestrator(process.env.HOPX_API_KEY, 3);
  await orchestrator.initializeAgents();

  const tasks = [
      { code: "print('Agent 1 result: 10')" },
      { code: "print('Agent 2 result: 20')" },
      { code: "print('Agent 3 result: 30')" }
  ];

  const results = await orchestrator.executeParallelTasks(tasks);

  const aggregator = new ResultAggregator();
  for (const result of results) {
      aggregator.addResult(result);
  }

  const summary = aggregator.aggregate();
  console.log(`Success rate: ${summary.successRate.toFixed(1)}%`);
  console.log(`Average execution time: ${summary.averageExecutionTime.toFixed(2)}s`);

  await orchestrator.cleanupAll();
  ```
</CodeGroup>

### Step 3: Error Recovery Patterns

Implement error recovery for failed agents:

<CodeGroup>
  ```python Python theme={null}
  class ResilientMultiAgentOrchestrator(MultiAgentOrchestrator):
      def execute_with_retry(self, task: Dict, max_retries: int = 3) -> Dict[str, Any]:
          """Execute task with retry logic"""
          agent = self.agents[0]  # Use first available agent
          
          for attempt in range(max_retries):
              result = agent.execute_task(task["code"], task.get("context"))
              
              if result.get("success"):
                  return result
              
              # Wait before retry
              import time
              time.sleep(1)
          
          return {
              "agent_id": agent.agent_id,
              "success": False,
              "error": f"Failed after {max_retries} attempts"
          }
      
      def execute_with_fallback(self, tasks: List[Dict]) -> List[Dict[str, Any]]:
          """Execute tasks with fallback agents"""
          results = []
          
          for task in tasks:
              # Try primary agent
              primary_result = self.agents[0].execute_task(task["code"], task.get("context"))
              
              if primary_result.get("success"):
                  results.append(primary_result)
              else:
                  # Fallback to secondary agent
                  fallback_result = self.agents[1].execute_task(task["code"], task.get("context"))
                  results.append({
                      **fallback_result,
                      "used_fallback": True,
                      "primary_failed": True
                  })
          
          return results

  # Usage
  orchestrator = ResilientMultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
  orchestrator.initialize_agents()

  # Task that might fail
  task = {"code": "result = 1 / 0\nprint(result)"}  # Will fail

  # Execute with retry
  result = orchestrator.execute_with_retry(task, max_retries=3)
  print(f"Result after retry: {result}")

  orchestrator.cleanup_all()
  ```

  ```javascript JavaScript theme={null}
  class ResilientMultiAgentOrchestrator extends MultiAgentOrchestrator {
      async executeWithRetry(task, maxRetries = 3) {
          const agent = this.agents[0];  // Use first available agent
          
          for (let attempt = 0; attempt < maxRetries; attempt++) {
              const result = await agent.executeTask(task.code, task.context);
              
              if (result.success) {
                  return result;
              }
              
              // Wait before retry
              await new Promise(resolve => setTimeout(resolve, 1000));
          }
          
          return {
              agentId: agent.agentId,
              success: false,
              error: `Failed after ${maxRetries} attempts`
          };
      }
      
      async executeWithFallback(tasks) {
          const results = [];
          
          for (const task of tasks) {
              // Try primary agent
              const primaryResult = await this.agents[0].executeTask(task.code, task.context);
              
              if (primaryResult.success) {
                  results.push(primaryResult);
              } else {
                  // Fallback to secondary agent
                  const fallbackResult = await this.agents[1].executeTask(task.code, task.context);
                  results.push({
                      ...fallbackResult,
                      usedFallback: true,
                      primaryFailed: true
                  });
              }
          }
          
          return results;
      }
  }

  // Usage
  const orchestrator = new ResilientMultiAgentOrchestrator(process.env.HOPX_API_KEY, 3);
  await orchestrator.initializeAgents();

  // Task that might fail
  const task = { code: "result = 1 / 0\nprint(result)" };  // Will fail

  // Execute with retry
  const result = await orchestrator.executeWithRetry(task, 3);
  console.log(`Result after retry: ${JSON.stringify(result)}`);

  await orchestrator.cleanupAll();
  ```
</CodeGroup>

## Best Practices

### Coordination

<Tip>
  Use a message queue or shared state system for agents to communicate results and coordinate tasks.
</Tip>

1. **Task Distribution**: Distribute tasks evenly across agents
2. **State Sharing**: Share necessary state between agents
3. **Result Aggregation**: Aggregate results efficiently
4. **Error Handling**: Handle agent failures gracefully

### Performance

1. **Parallel Execution**: Execute independent tasks in parallel
2. **Resource Management**: Monitor and limit resource usage
3. **Caching**: Cache common results between agents
4. **Load Balancing**: Balance load across agents

### Reliability

1. **Retry Logic**: Implement retry for transient failures
2. **Fallback Agents**: Use fallback agents when primary fails
3. **Health Checks**: Monitor agent health
4. **Graceful Degradation**: Continue with available agents

## Real-World Examples

This pattern is used by:

* **AutoGPT**: Multi-agent AI system
* **LangChain Multi-Agent**: Agent orchestration framework
* **CrewAI**: Multi-agent orchestration platform

## Related Cookbooks

* [AI Code Interpreter Agent](/cookbooks/ai-integration/code-interpreter-agent) - Single agent execution

## Next Steps

1. Implement agent communication protocol
2. Add task queue for agent coordination
3. Create agent health monitoring
4. Implement dynamic agent scaling
5. Add result persistence
