from hopx_ai import Sandbox
import os
from typing import Dict, Any, List
class UserFunction:
def __init__(self, function_id: str, code: str, inputs: List[str], outputs: List[str]):
self.function_id = function_id
self.code = code
self.inputs = inputs
self.outputs = outputs
class UserFunctionPlatform:
def __init__(self, api_key: str):
self.api_key = api_key
self.functions = {}
self.sandbox = None
def initialize(self):
"""Initialize platform"""
self.sandbox = Sandbox.create(
template="code-interpreter",
api_key=self.api_key,
timeout_seconds=3600
)
def register_function(self, function: UserFunction):
"""Register user function"""
self.functions[function.function_id] = function
def execute_function(self, function_id: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute user function with inputs"""
if function_id not in self.functions:
return {
"success": False,
"error": f"Function {function_id} not found"
}
function = self.functions[function_id]
try:
# Prepare function code with inputs
import json
function_code = f"""
import json
# Inputs
inputs = {json.dumps(inputs)}
# User function
{function.code}
"""
result = self.sandbox.run_code(function_code, timeout=30)
return {
"success": result.success,
"output": result.stdout,
"error": result.stderr if not result.success else None
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def cleanup(self):
"""Clean up platform resources"""
if self.sandbox:
self.sandbox.kill()
self.sandbox = None
# Usage
platform = UserFunctionPlatform(api_key=os.getenv("HOPX_API_KEY"))
platform.initialize()
# Register function
platform.register_function(UserFunction(
function_id="add_numbers",
code="result = inputs['a'] + inputs['b']\nprint(result)",
inputs=["a", "b"],
outputs=["result"]
))
# Execute
result = platform.execute_function("add_numbers", {"a": 5, "b": 3})
print(result)
platform.cleanup()