from hopx_ai import Sandbox
import os
import asyncio
from typing import Dict, List, Any
from concurrent.futures import ThreadPoolExecutor
class Agent:
def __init__(self, agent_id: str, api_key: str):
self.agent_id = agent_id
self.api_key = api_key
self.sandbox = None
def initialize(self):
"""Initialize agent sandbox"""
self.sandbox = Sandbox.create(
template="code-interpreter",
api_key=self.api_key,
timeout_seconds=600
)
def execute_task(self, task_code: str, context: Dict = None) -> Dict[str, Any]:
"""Execute task assigned to this agent"""
try:
# Set context if provided
if context:
for key, value in context.items():
self.sandbox.env.set(key, str(value))
# Execute task
result = self.sandbox.run_code(task_code, timeout=30)
return {
"agent_id": self.agent_id,
"success": result.success,
"stdout": result.stdout,
"stderr": result.stderr,
"execution_time": result.execution_time
}
except Exception as e:
return {
"agent_id": self.agent_id,
"success": False,
"error": str(e)
}
def cleanup(self):
"""Clean up agent resources"""
if self.sandbox:
self.sandbox.kill()
self.sandbox = None
class MultiAgentOrchestrator:
def __init__(self, api_key: str, num_agents: int = 3):
self.api_key = api_key
self.agents = [
Agent(f"agent_{i}", api_key)
for i in range(num_agents)
]
def initialize_agents(self):
"""Initialize all agents"""
for agent in self.agents:
agent.initialize()
def execute_parallel_tasks(self, tasks: List[Dict]) -> List[Dict[str, Any]]:
"""Execute tasks in parallel across agents"""
results = []
with ThreadPoolExecutor(max_workers=len(self.agents)) as executor:
# Assign tasks to agents
future_to_agent = {}
for i, task in enumerate(tasks):
agent = self.agents[i % len(self.agents)]
future = executor.submit(agent.execute_task, task["code"], task.get("context"))
future_to_agent[future] = (agent, task)
# Collect results
for future in future_to_agent:
agent, task = future_to_agent[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({
"agent_id": agent.agent_id,
"success": False,
"error": str(e)
})
return results
def cleanup_all(self):
"""Clean up all agents"""
for agent in self.agents:
agent.cleanup()
# Usage
orchestrator = MultiAgentOrchestrator(api_key=os.getenv("HOPX_API_KEY"), num_agents=3)
orchestrator.initialize_agents()
tasks = [
{"code": "result = 2 + 2\nprint(f'Result: {result}')", "context": None},
{"code": "result = 3 * 4\nprint(f'Result: {result}')", "context": None},
{"code": "result = 10 / 2\nprint(f'Result: {result}')", "context": None}
]
results = orchestrator.execute_parallel_tasks(tasks)
for result in results:
print(f"Agent {result['agent_id']}: {result.get('stdout', result.get('error'))}")
orchestrator.cleanup_all()