> ## Documentation Index
> Fetch the complete documentation index at: https://docs.hopx.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Analysis Pipeline Service

> Build automated data analysis services with multi-step workflows, data transformation chains, result export, and scheduled jobs

Build a production-ready data analysis pipeline service that processes data through multiple steps, transforms it, and exports results. This cookbook demonstrates how to create automated analysis workflows using HopX.

## Overview

Data analysis pipelines automate the process of ingesting, transforming, analyzing, and exporting data. The service executes multi-step workflows, handles data transformations, and generates reports. This pattern is used in data processing platforms and analytics services.

## Prerequisites

* HopX API key ([Get one here](https://console.hopx.dev/api-keys))
* Python 3.8+ or Node.js 16+
* Understanding of data pipelines
* Basic knowledge of data transformation

## Architecture

```
┌──────────────┐
│  Data Input  │ Raw data
└──────┬───────┘
       │
       ▼
┌─────────────────┐
│  Pipeline       │ Step 1 → Step 2 → Step 3
│   Engine        │
└──────┬──────────┘
       │
       ▼
┌─────────────────┐
│  HopX Sandbox   │ Execute each step
└──────┬──────────┘
       │
       ▼
┌─────────────────┐
│  Results Export │ Reports, visualizations
└─────────────────┘
```

## Implementation

### Step 1: Pipeline Definition

Define and execute multi-step pipelines:

<CodeGroup>
  ```python Python theme={null}
  from hopx_ai import Sandbox
  import os
  from typing import Dict, List, Any

  class PipelineStep:
      def __init__(self, step_id: str, code: str, dependencies: List[str] = None):
          self.step_id = step_id
          self.code = code
          self.dependencies = dependencies or []

  class DataAnalysisPipeline:
      def __init__(self, api_key: str):
          self.api_key = api_key
          self.sandbox = None
          self.steps = []
      
      def initialize(self):
          """Initialize pipeline sandbox"""
          self.sandbox = Sandbox.create(
              template="code-interpreter",
              api_key=self.api_key,
              timeout_seconds=3600
          )
      
      def add_step(self, step: PipelineStep):
          """Add pipeline step"""
          self.steps.append(step)
      
      def execute_pipeline(self, input_data: str = None) -> Dict[str, Any]:
          """Execute entire pipeline"""
          results = []
          
          # Upload input data if provided
          if input_data:
              self.sandbox.files.write("/workspace/input.csv", input_data)
          
          # Execute steps in order
          for step in self.steps:
              result = self.sandbox.run_code(step.code, timeout=60)
              results.append({
                  "step_id": step.step_id,
                  "success": result.success,
                  "stdout": result.stdout,
                  "stderr": result.stderr
              })
              
              if not result.success:
                  break  # Stop on failure
          
          # Export results
          export_result = self._export_results()
          
          return {
              "success": all(r["success"] for r in results),
              "steps_executed": len(results),
              "step_results": results,
              "export": export_result
          }
      
      def _export_results(self) -> Dict[str, Any]:
          """Export pipeline results"""
          try:
              # Check for output files
              files = self.sandbox.files.list("/workspace")
              output_files = [f for f in files if f.name.endswith(('.csv', '.json', '.png'))]
              
              exports = {}
              for file in output_files:
                  content = self.sandbox.files.read(file.path)
                  exports[file.name] = {
                      "size": len(content),
                      "type": file.name.split('.')[-1]
                  }
              
              return {
                  "success": True,
                  "files": exports
              }
          except Exception as e:
              return {
                  "success": False,
                  "error": str(e)
              }
      
      def cleanup(self):
          """Clean up pipeline resources"""
          if self.sandbox:
              self.sandbox.kill()
              self.sandbox = None

  # Usage
  pipeline = DataAnalysisPipeline(api_key=os.getenv("HOPX_API_KEY"))
  pipeline.initialize()

  # Define pipeline steps
  pipeline.add_step(PipelineStep(
      step_id="load_data",
      code="""
  import pandas as pd
  df = pd.read_csv('/workspace/input.csv')
  print(f"Loaded {len(df)} rows")
  """
  ))

  pipeline.add_step(PipelineStep(
      step_id="analyze",
      code="""
  import pandas as pd
  df = pd.read_csv('/workspace/input.csv')
  summary = df.describe()
  summary.to_csv('/workspace/summary.csv')
  print("Analysis complete")
  """
  ))

  # Execute
  result = pipeline.execute_pipeline(input_data="name,age\nAlice,25\nBob,30")
  print(result)

  pipeline.cleanup()
  ```

  ```javascript JavaScript theme={null}
  import { Sandbox } from '@hopx-ai/sdk';

  class PipelineStep {
      constructor(stepId, code, dependencies = []) {
          this.stepId = stepId;
          this.code = code;
          this.dependencies = dependencies;
      }
  }

  class DataAnalysisPipeline {
      constructor(apiKey) {
          this.apiKey = apiKey;
          this.sandbox = null;
          this.steps = [];
      }
      
      async initialize() {
          this.sandbox = await Sandbox.create({
              template: 'code-interpreter',
              apiKey: this.apiKey,
              timeoutSeconds: 3600
          });
      }
      
      addStep(step) {
          this.steps.push(step);
      }
      
      async executePipeline(inputData = null) {
          const results = [];
          
          // Upload input data if provided
          if (inputData) {
              await this.sandbox.files.write('/workspace/input.csv', inputData);
          }
          
          // Execute steps in order
          for (const step of this.steps) {
              const result = await this.sandbox.runCode(step.code, { timeout: 60 });
              results.push({
                  stepId: step.stepId,
                  success: result.success,
                  stdout: result.stdout,
                  stderr: result.stderr
              });
              
              if (!result.success) {
                  break;  // Stop on failure
              }
          }
          
          // Export results
          const exportResult = await this.exportResults();
          
          return {
              success: results.every(r => r.success),
              stepsExecuted: results.length,
              stepResults: results,
              export: exportResult
          };
      }
      
      async exportResults() {
          try {
              // Check for output files
              const files = await this.sandbox.files.list('/workspace');
              const outputFiles = files.filter(f => 
                  f.name.endsWith('.csv') || 
                  f.name.endsWith('.json') || 
                  f.name.endsWith('.png')
              );
              
              const exports = {};
              for (const file of outputFiles) {
                  const content = await this.sandbox.files.read(file.path);
                  exports[file.name] = {
                      size: content.length,
                      type: file.name.split('.').pop()
                  };
              }
              
              return {
                  success: true,
                  files: exports
              };
          } catch (error) {
              return {
                  success: false,
                  error: error.message
              };
          }
      }
      
      async cleanup() {
          if (this.sandbox) {
              await this.sandbox.kill();
              this.sandbox = null;
          }
      }
  }

  // Usage
  const pipeline = new DataAnalysisPipeline(process.env.HOPX_API_KEY);
  await pipeline.initialize();

  // Define pipeline steps
  pipeline.addStep(new PipelineStep(
      'load_data',
      `
  import pandas as pd
  df = pd.read_csv('/workspace/input.csv')
  print(f"Loaded {len(df)} rows")
  `
  ));

  pipeline.addStep(new PipelineStep(
      'analyze',
      `
  import pandas as pd
  df = pd.read_csv('/workspace/input.csv')
  summary = df.describe()
  summary.to_csv('/workspace/summary.csv')
  print("Analysis complete")
  `
  ));

  // Execute
  const result = await pipeline.executePipeline('name,age\nAlice,25\nBob,30');
  console.log(result);

  await pipeline.cleanup();
  ```
</CodeGroup>

## Best Practices

1. **Step Isolation**: Each step should be independent and testable
2. **Error Handling**: Handle errors gracefully and provide recovery options
3. **Data Validation**: Validate data between steps
4. **Result Caching**: Cache intermediate results for efficiency

## Related Cookbooks

* [Cloud Jupyter Notebook](/cookbooks/data-science/cloud-jupyter) - Notebook execution
* [ML Model Training Service](/cookbooks/data-science/ml-training-service) - Machine learning workflows

## Next Steps

1. Implement pipeline scheduling
2. Add data validation between steps
3. Create pipeline visualization
4. Implement result caching
5. Add monitoring and alerting
