Skip to main content
Stream real-time metrics from your sandbox using Server-Sent Events (SSE). This provides continuous monitoring of agent performance metrics without polling.

Overview

SSE monitoring is ideal for:
  • Real-time dashboards and visualizations
  • Continuous agent performance monitoring
  • Alerting on threshold breaches
  • Long-term performance tracking
SSE monitoring is available through the Control Plane API endpoint /v1/sandboxes/:id/metrics/stream. This streams metrics from the VM agent to your application.

Basic Streaming

Connect to the metrics stream and receive real-time updates:
  • Python
  • JavaScript
import requests
from hopx_ai import Sandbox

sandbox = Sandbox.create(template="code-interpreter")
info = sandbox.get_info()

# Get API key and base URL
api_key = sandbox._api_key  # Access internal API key
base_url = "https://api.hopx.dev"

# Stream metrics via SSE
headers = {
    "Authorization": f"Bearer {api_key}",
    "Accept": "text/event-stream"
}

stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"

with requests.get(stream_url, headers=headers, stream=True) as response:
    for line in response.iter_lines():
        if line and line.startswith(b"data: "):
            import json
            data = json.loads(line[6:])  # Skip "data: " prefix
            print(f"Uptime: {data.get('uptime_seconds', 0)}s")
            print(f"Total Executions: {data.get('total_executions', 0)}")
            print(f"Active Executions: {data.get('active_executions', 0)}")
            print(f"Error Count: {data.get('error_count', 0)}")

sandbox.kill()

Processing Stream Events

Handle different event types from the SSE stream:
  • Python
  • JavaScript
import requests
import json
from hopx_ai import Sandbox

sandbox = Sandbox.create(template="code-interpreter")
info = sandbox.get_info()

api_key = sandbox._api_key
base_url = "https://api.hopx.dev"

headers = {
    "Authorization": f"Bearer {api_key}",
    "Accept": "text/event-stream"
}

stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"

with requests.get(stream_url, headers=headers, stream=True) as response:
    for line in response.iter_lines():
        if not line:
            continue
        
        line_str = line.decode('utf-8')
        
        # Handle different SSE event types
        if line_str.startswith("event: "):
            event_type = line_str[7:]
            print(f"Event type: {event_type}")
        
        elif line_str.startswith("data: "):
            data = json.loads(line_str[6:])
            
            # Process metrics data
            print(f"Metrics - Uptime: {data.get('uptime_seconds', 0)}s")
            print(f"  Total Executions: {data.get('total_executions', 0)}")
            print(f"  Active Executions: {data.get('active_executions', 0)}")
            print(f"  Error Count: {data.get('error_count', 0)}")
        
        elif line_str.startswith("id: "):
            event_id = line_str[4:]
            # Use event ID for reconnection logic if needed

sandbox.kill()

Building a Metrics Dashboard

Create a simple dashboard that displays real-time metrics:
  • Python
  • JavaScript
import requests
import json
import time
from datetime import datetime
from hopx_ai import Sandbox

sandbox = Sandbox.create(template="code-interpreter")

api_key = sandbox._api_key
base_url = "https://api.hopx.dev"

headers = {
    "Authorization": f"Bearer {api_key}",
    "Accept": "text/event-stream"
}

stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"

print("=== Real-Time Metrics Dashboard ===")
print("Press Ctrl+C to stop\n")

try:
    with requests.get(stream_url, headers=headers, stream=True) as response:
        for line in response.iter_lines():
            if line and line.startswith(b"data: "):
                data = json.loads(line[6:])
                
                timestamp = datetime.now().strftime("%H:%M:%S")
                uptime = data.get('uptime_seconds', 0)
                total_exec = data.get('total_executions', 0)
                active_exec = data.get('active_executions', 0)
                errors = data.get('error_count', 0)
                
                # Simple text-based dashboard
                print(f"[{timestamp}] Uptime: {uptime:6d}s | Executions: {total_exec:4d} | Active: {active_exec:2d} | Errors: {errors:3d}")
                
                time.sleep(0.1)  # Throttle output

except KeyboardInterrupt:
    print("\nStopping monitoring...")

finally:
    sandbox.kill()

Alerting on Thresholds

Set up alerts when metrics exceed thresholds:
  • Python
  • JavaScript
import requests
import json
from hopx_ai import Sandbox

sandbox = Sandbox.create(template="code-interpreter")

api_key = sandbox._api_key
base_url = "https://api.hopx.dev"

headers = {
    "Authorization": f"Bearer {api_key}",
    "Accept": "text/event-stream"
}

stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"

# Thresholds
ERROR_COUNT_THRESHOLD = 10
ACTIVE_EXECUTIONS_THRESHOLD = 5

print("Monitoring for threshold breaches...")

with requests.get(stream_url, headers=headers, stream=True) as response:
    for line in response.iter_lines():
        if line and line.startswith(b"data: "):
            data = json.loads(line[6:])
            
            error_count = data.get('error_count', 0)
            active_executions = data.get('active_executions', 0)
            
            # Check thresholds
            if error_count > ERROR_COUNT_THRESHOLD:
                print(f"⚠️  ALERT: Error count {error_count} exceeds threshold {ERROR_COUNT_THRESHOLD}")
            
            if active_executions > ACTIVE_EXECUTIONS_THRESHOLD:
                print(f"⚠️  ALERT: Active executions {active_executions} exceeds threshold {ACTIVE_EXECUTIONS_THRESHOLD}")

sandbox.kill()

Best Practices

Use SSE for real-time monitoring when you need continuous updates. For one-time snapshots, use get_metrics_snapshot() instead.
Handle reconnection logic in production applications. SSE connections can drop, and you should reconnect automatically.
SSE streams consume resources. Close connections when no longer needed to free up server resources.
Throttle processing if you’re receiving metrics faster than you can process them. The stream sends updates frequently.

API Reference

Control Plane API

  • GET /v1/sandboxes/:id/metrics/stream - Stream metrics via SSE
    • Headers: Authorization: Bearer {token}, Accept: text/event-stream
    • Response: Server-Sent Events stream with JSON metrics data

SSE Event Format

event: message
data: {"system": {"cpu": {"usage_percent": 25.5}, ...}}

id: 12345

Next Steps