Skip to main content
Build a production-ready data analysis pipeline service that processes data through multiple steps, transforms it, and exports results. This cookbook demonstrates how to create automated analysis workflows using HopX.

Overview

Data analysis pipelines automate the process of ingesting, transforming, analyzing, and exporting data. The service executes multi-step workflows, handles data transformations, and generates reports. This pattern is used in data processing platforms and analytics services.

Prerequisites

  • HopX API key (Get one here)
  • Python 3.8+ or Node.js 16+
  • Understanding of data pipelines
  • Basic knowledge of data transformation

Architecture

┌──────────────┐
│  Data Input  │ Raw data
└──────┬───────┘


┌─────────────────┐
│  Pipeline       │ Step 1 → Step 2 → Step 3
│   Engine        │
└──────┬──────────┘


┌─────────────────┐
│  HopX Sandbox   │ Execute each step
└──────┬──────────┘


┌─────────────────┐
│  Results Export │ Reports, visualizations
└─────────────────┘

Implementation

Step 1: Pipeline Definition

Define and execute multi-step pipelines:
from hopx_ai import Sandbox
import os
from typing import Dict, List, Any

class PipelineStep:
    def __init__(self, step_id: str, code: str, dependencies: List[str] = None):
        self.step_id = step_id
        self.code = code
        self.dependencies = dependencies or []

class DataAnalysisPipeline:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.sandbox = None
        self.steps = []
    
    def initialize(self):
        """Initialize pipeline sandbox"""
        self.sandbox = Sandbox.create(
            template="code-interpreter",
            api_key=self.api_key,
            timeout_seconds=3600
        )
    
    def add_step(self, step: PipelineStep):
        """Add pipeline step"""
        self.steps.append(step)
    
    def execute_pipeline(self, input_data: str = None) -> Dict[str, Any]:
        """Execute entire pipeline"""
        results = []
        
        # Upload input data if provided
        if input_data:
            self.sandbox.files.write("/workspace/input.csv", input_data)
        
        # Execute steps in order
        for step in self.steps:
            result = self.sandbox.run_code(step.code, timeout=60)
            results.append({
                "step_id": step.step_id,
                "success": result.success,
                "stdout": result.stdout,
                "stderr": result.stderr
            })
            
            if not result.success:
                break  # Stop on failure
        
        # Export results
        export_result = self._export_results()
        
        return {
            "success": all(r["success"] for r in results),
            "steps_executed": len(results),
            "step_results": results,
            "export": export_result
        }
    
    def _export_results(self) -> Dict[str, Any]:
        """Export pipeline results"""
        try:
            # Check for output files
            files = self.sandbox.files.list("/workspace")
            output_files = [f for f in files if f.name.endswith(('.csv', '.json', '.png'))]
            
            exports = {}
            for file in output_files:
                content = self.sandbox.files.read(file.path)
                exports[file.name] = {
                    "size": len(content),
                    "type": file.name.split('.')[-1]
                }
            
            return {
                "success": True,
                "files": exports
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def cleanup(self):
        """Clean up pipeline resources"""
        if self.sandbox:
            self.sandbox.kill()
            self.sandbox = None

# Usage
pipeline = DataAnalysisPipeline(api_key=os.getenv("HOPX_API_KEY"))
pipeline.initialize()

# Define pipeline steps
pipeline.add_step(PipelineStep(
    step_id="load_data",
    code="""
import pandas as pd
df = pd.read_csv('/workspace/input.csv')
print(f"Loaded {len(df)} rows")
"""
))

pipeline.add_step(PipelineStep(
    step_id="analyze",
    code="""
import pandas as pd
df = pd.read_csv('/workspace/input.csv')
summary = df.describe()
summary.to_csv('/workspace/summary.csv')
print("Analysis complete")
"""
))

# Execute
result = pipeline.execute_pipeline(input_data="name,age\nAlice,25\nBob,30")
print(result)

pipeline.cleanup()

Best Practices

  1. Step Isolation: Each step should be independent and testable
  2. Error Handling: Handle errors gracefully and provide recovery options
  3. Data Validation: Validate data between steps
  4. Result Caching: Cache intermediate results for efficiency

Next Steps

  1. Implement pipeline scheduling
  2. Add data validation between steps
  3. Create pipeline visualization
  4. Implement result caching
  5. Add monitoring and alerting