from hopx_ai import Sandbox
import os
import json
from typing import Dict, Any, Callable
class ServerlessFunction:
def __init__(self, function_id: str, code: str, runtime: str = "python"):
self.function_id = function_id
self.code = code
self.runtime = runtime
self.sandbox = None
def initialize(self, api_key: str):
"""Initialize function sandbox"""
self.sandbox = Sandbox.create(
template="code-interpreter",
api_key=api_key,
timeout_seconds=300 # 5 minute timeout
)
def invoke(self, event: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Invoke function with event"""
try:
# Prepare function code with event
function_code = f"""
import json
# Event data
event = {json.dumps(event)}
# Context data
context = {json.dumps(context or {})}
# User function
{self.code}
"""
# Execute function
result = self.sandbox.run_code(function_code, timeout=30)
return {
"success": result.success,
"result": result.stdout,
"error": result.stderr if not result.success else None,
"execution_time": result.execution_time
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def cleanup(self):
"""Clean up function resources"""
if self.sandbox:
self.sandbox.kill()
self.sandbox = None
class ServerlessPlatform:
def __init__(self, api_key: str):
self.api_key = api_key
self.functions = {}
def register_function(self, function_id: str, code: str, runtime: str = "python"):
"""Register a serverless function"""
func = ServerlessFunction(function_id, code, runtime)
func.initialize(self.api_key)
self.functions[function_id] = func
def invoke_function(self, function_id: str, event: Dict[str, Any]) -> Dict[str, Any]:
"""Invoke registered function"""
if function_id not in self.functions:
return {
"success": False,
"error": f"Function {function_id} not found"
}
return self.functions[function_id].invoke(event)
def cleanup_all(self):
"""Clean up all functions"""
for func in self.functions.values():
func.cleanup()
# Usage
platform = ServerlessPlatform(api_key=os.getenv("HOPX_API_KEY"))
# Register function
platform.register_function(
"hello_world",
"""
def handler(event, context):
name = event.get('name', 'World')
return f"Hello, {name}!"
""",
"python"
)
# Invoke function
result = platform.invoke_function("hello_world", {"name": "HopX"})
print(result)
platform.cleanup_all()