from hopx_ai import Sandbox
import os
from typing import Dict, List, Any
class PipelineStep:
def __init__(self, step_id: str, code: str, dependencies: List[str] = None):
self.step_id = step_id
self.code = code
self.dependencies = dependencies or []
class DataAnalysisPipeline:
def __init__(self, api_key: str):
self.api_key = api_key
self.sandbox = None
self.steps = []
def initialize(self):
"""Initialize pipeline sandbox"""
self.sandbox = Sandbox.create(
template="code-interpreter",
api_key=self.api_key,
timeout_seconds=3600
)
def add_step(self, step: PipelineStep):
"""Add pipeline step"""
self.steps.append(step)
def execute_pipeline(self, input_data: str = None) -> Dict[str, Any]:
"""Execute entire pipeline"""
results = []
# Upload input data if provided
if input_data:
self.sandbox.files.write("/workspace/input.csv", input_data)
# Execute steps in order
for step in self.steps:
result = self.sandbox.run_code(step.code, timeout=60)
results.append({
"step_id": step.step_id,
"success": result.success,
"stdout": result.stdout,
"stderr": result.stderr
})
if not result.success:
break # Stop on failure
# Export results
export_result = self._export_results()
return {
"success": all(r["success"] for r in results),
"steps_executed": len(results),
"step_results": results,
"export": export_result
}
def _export_results(self) -> Dict[str, Any]:
"""Export pipeline results"""
try:
# Check for output files
files = self.sandbox.files.list("/workspace")
output_files = [f for f in files if f.name.endswith(('.csv', '.json', '.png'))]
exports = {}
for file in output_files:
content = self.sandbox.files.read(file.path)
exports[file.name] = {
"size": len(content),
"type": file.name.split('.')[-1]
}
return {
"success": True,
"files": exports
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def cleanup(self):
"""Clean up pipeline resources"""
if self.sandbox:
self.sandbox.kill()
self.sandbox = None
# Usage
pipeline = DataAnalysisPipeline(api_key=os.getenv("HOPX_API_KEY"))
pipeline.initialize()
# Define pipeline steps
pipeline.add_step(PipelineStep(
step_id="load_data",
code="""
import pandas as pd
df = pd.read_csv('/workspace/input.csv')
print(f"Loaded {len(df)} rows")
"""
))
pipeline.add_step(PipelineStep(
step_id="analyze",
code="""
import pandas as pd
df = pd.read_csv('/workspace/input.csv')
summary = df.describe()
summary.to_csv('/workspace/summary.csv')
print("Analysis complete")
"""
))
# Execute
result = pipeline.execute_pipeline(input_data="name,age\nAlice,25\nBob,30")
print(result)
pipeline.cleanup()