from hopx_ai import Sandbox
import os
from typing import Dict, Any
class EdgeFunctionPlatform:
def __init__(self, api_key: str):
self.api_key = api_key
self.edge_sandboxes = {} # Region -> sandbox mapping
def get_edge_sandbox(self, region: str) -> Sandbox:
"""Get or create edge sandbox for region"""
if region not in self.edge_sandboxes:
self.edge_sandboxes[region] = Sandbox.create(
template="code-interpreter",
api_key=self.api_key,
timeout_seconds=300,
region=region # Specify region if supported
)
return self.edge_sandboxes[region]
def execute_at_edge(self, function_code: str, region: str, event: Dict[str, Any]) -> Dict[str, Any]:
"""Execute function at edge location"""
try:
sandbox = self.get_edge_sandbox(region)
# Prepare function with event
import json
full_code = f"""
import json
event = {json.dumps(event)}
{function_code}
"""
result = sandbox.run_code(full_code, timeout=10) # Fast execution for edge
return {
"success": result.success,
"result": result.stdout,
"region": region,
"execution_time": result.execution_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"region": region
}
def route_to_nearest(self, user_location: str, function_code: str, event: Dict[str, Any]) -> Dict[str, Any]:
"""Route request to nearest edge location"""
# Simple routing logic (in production, use geolocation)
region_map = {
"us": "us-east-1",
"eu": "eu-west-1",
"asia": "ap-southeast-1"
}
region = region_map.get(user_location, "us-east-1")
return self.execute_at_edge(function_code, region, event)
def cleanup_all(self):
"""Clean up all edge sandboxes"""
for sandbox in self.edge_sandboxes.values():
sandbox.kill()
self.edge_sandboxes.clear()
# Usage
platform = EdgeFunctionPlatform(api_key=os.getenv("HOPX_API_KEY"))
result = platform.route_to_nearest(
user_location="us",
function_code="print('Hello from edge!')",
event={}
)
print(result)
platform.cleanup_all()