Stream real-time metrics from your sandbox using Server-Sent Events (SSE). This provides continuous monitoring of agent performance metrics without polling.
Overview
SSE monitoring is ideal for:
- Real-time dashboards and visualizations
- Continuous agent performance monitoring
- Alerting on threshold breaches
- Long-term performance tracking
SSE monitoring is available through the Control Plane API endpoint /v1/sandboxes/:id/metrics/stream. This streams metrics from the VM agent to your application.
Basic Streaming
Connect to the metrics stream and receive real-time updates:
import requests
from hopx_ai import Sandbox
sandbox = Sandbox.create(template="code-interpreter")
info = sandbox.get_info()
# Get API key and base URL
api_key = sandbox._api_key # Access internal API key
base_url = "https://api.hopx.dev"
# Stream metrics via SSE
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream"
}
stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"
with requests.get(stream_url, headers=headers, stream=True) as response:
for line in response.iter_lines():
if line and line.startswith(b"data: "):
import json
data = json.loads(line[6:]) # Skip "data: " prefix
print(f"Uptime: {data.get('uptime_seconds', 0)}s")
print(f"Total Executions: {data.get('total_executions', 0)}")
print(f"Active Executions: {data.get('active_executions', 0)}")
print(f"Error Count: {data.get('error_count', 0)}")
sandbox.kill()
Processing Stream Events
Handle different event types from the SSE stream:
import requests
import json
from hopx_ai import Sandbox
sandbox = Sandbox.create(template="code-interpreter")
info = sandbox.get_info()
api_key = sandbox._api_key
base_url = "https://api.hopx.dev"
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream"
}
stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"
with requests.get(stream_url, headers=headers, stream=True) as response:
for line in response.iter_lines():
if not line:
continue
line_str = line.decode('utf-8')
# Handle different SSE event types
if line_str.startswith("event: "):
event_type = line_str[7:]
print(f"Event type: {event_type}")
elif line_str.startswith("data: "):
data = json.loads(line_str[6:])
# Process metrics data
print(f"Metrics - Uptime: {data.get('uptime_seconds', 0)}s")
print(f" Total Executions: {data.get('total_executions', 0)}")
print(f" Active Executions: {data.get('active_executions', 0)}")
print(f" Error Count: {data.get('error_count', 0)}")
elif line_str.startswith("id: "):
event_id = line_str[4:]
# Use event ID for reconnection logic if needed
sandbox.kill()
Building a Metrics Dashboard
Create a simple dashboard that displays real-time metrics:
import requests
import json
import time
from datetime import datetime
from hopx_ai import Sandbox
sandbox = Sandbox.create(template="code-interpreter")
api_key = sandbox._api_key
base_url = "https://api.hopx.dev"
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream"
}
stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"
print("=== Real-Time Metrics Dashboard ===")
print("Press Ctrl+C to stop\n")
try:
with requests.get(stream_url, headers=headers, stream=True) as response:
for line in response.iter_lines():
if line and line.startswith(b"data: "):
data = json.loads(line[6:])
timestamp = datetime.now().strftime("%H:%M:%S")
uptime = data.get('uptime_seconds', 0)
total_exec = data.get('total_executions', 0)
active_exec = data.get('active_executions', 0)
errors = data.get('error_count', 0)
# Simple text-based dashboard
print(f"[{timestamp}] Uptime: {uptime:6d}s | Executions: {total_exec:4d} | Active: {active_exec:2d} | Errors: {errors:3d}")
time.sleep(0.1) # Throttle output
except KeyboardInterrupt:
print("\nStopping monitoring...")
finally:
sandbox.kill()
Alerting on Thresholds
Set up alerts when metrics exceed thresholds:
import requests
import json
from hopx_ai import Sandbox
sandbox = Sandbox.create(template="code-interpreter")
api_key = sandbox._api_key
base_url = "https://api.hopx.dev"
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream"
}
stream_url = f"{base_url}/v1/sandboxes/{sandbox.sandbox_id}/metrics/stream"
# Thresholds
ERROR_COUNT_THRESHOLD = 10
ACTIVE_EXECUTIONS_THRESHOLD = 5
print("Monitoring for threshold breaches...")
with requests.get(stream_url, headers=headers, stream=True) as response:
for line in response.iter_lines():
if line and line.startswith(b"data: "):
data = json.loads(line[6:])
error_count = data.get('error_count', 0)
active_executions = data.get('active_executions', 0)
# Check thresholds
if error_count > ERROR_COUNT_THRESHOLD:
print(f"⚠️ ALERT: Error count {error_count} exceeds threshold {ERROR_COUNT_THRESHOLD}")
if active_executions > ACTIVE_EXECUTIONS_THRESHOLD:
print(f"⚠️ ALERT: Active executions {active_executions} exceeds threshold {ACTIVE_EXECUTIONS_THRESHOLD}")
sandbox.kill()
Best Practices
Use SSE for real-time monitoring when you need continuous updates. For one-time snapshots, use get_metrics_snapshot() instead.
Handle reconnection logic in production applications. SSE connections can drop, and you should reconnect automatically.
SSE streams consume resources. Close connections when no longer needed to free up server resources.
Throttle processing if you’re receiving metrics faster than you can process them. The stream sends updates frequently.
API Reference
Control Plane API
- GET
/v1/sandboxes/:id/metrics/stream - Stream metrics via SSE
- Headers:
Authorization: Bearer {token}, Accept: text/event-stream
- Response: Server-Sent Events stream with JSON metrics data
event: message
data: {"system": {"cpu": {"usage_percent": 25.5}, ...}}
id: 12345
Next Steps