Streaming execution provides real-time output as your code runs. Instead of waiting for completion, you receive stdout and stderr messages as theyโre generated, perfect for monitoring long-running tasks.
Overview
Streaming execution is ideal for:
- Long-running tasks where you want real-time progress
- Interactive code that produces output over time
- Monitoring execution progress
- Building real-time dashboards or UIs
Streaming execution uses WebSocket connections for real-time communication. It requires async/await support in your code.
Basic Streaming
Stream execution output in real-time:
from hopx_ai import Sandbox
import asyncio
async def stream_execution():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
for i in range(5):
print(f"Step {i+1}/5")
time.sleep(1)
print("Done!")
'''
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"ERROR: {message['data']}", end='', file=sys.stderr)
elif message['type'] == 'result':
print(f"\nExit code: {message['exit_code']}")
print(f"Execution time: {message['execution_time']}s")
elif message['type'] == 'complete':
print("\nโ
Execution completed")
break
sandbox.kill()
asyncio.run(stream_execution())
import { Sandbox } from '@hopx-ai/sdk';
async function streamExecution() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
import time
for i in range(5):
print(f"Step {i+1}/5")
time.sleep(1)
print("Done!")
`;
for await (const message of sandbox.runCodeStream(code)) {
if (message.type === 'stdout') {
process.stdout.write(message.data);
} else if (message.type === 'stderr') {
process.stderr.write(`ERROR: ${message.data}`);
} else if (message.type === 'result') {
console.log(`\nExit code: ${message.exit_code}`);
console.log(`Execution time: ${message.execution_time}s`);
} else if (message.type === 'complete') {
console.log('\nโ
Execution completed');
break;
}
}
await sandbox.kill();
}
streamExecution();
Message Types
Stream messages have different types:
from hopx_ai import Sandbox
import asyncio
async def handle_all_message_types():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import sys
print("This is stdout")
print("This is also stdout", file=sys.stdout)
print("This is stderr", file=sys.stderr)
'''
async for message in sandbox.run_code_stream(code):
msg_type = message['type']
if msg_type == 'stdout':
print(f"[STDOUT] {message['data']}", end='')
elif msg_type == 'stderr':
print(f"[STDERR] {message['data']}", end='')
elif msg_type == 'result':
print(f"\n[RESULT] Exit: {message['exit_code']}, Time: {message['execution_time']}s")
elif msg_type == 'complete':
print("[COMPLETE] Execution finished")
break
sandbox.kill()
asyncio.run(handle_all_message_types())
import { Sandbox } from '@hopx-ai/sdk';
async function handleAllMessageTypes() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
import sys
print("This is stdout")
print("This is also stdout", file=sys.stdout)
print("This is stderr", file=sys.stderr)
`;
for await (const message of sandbox.runCodeStream(code)) {
const msgType = message.type;
if (msgType === 'stdout') {
process.stdout.write(`[STDOUT] ${message.data}`);
} else if (msgType === 'stderr') {
process.stderr.write(`[STDERR] ${message.data}`);
} else if (msgType === 'result') {
console.log(`\n[RESULT] Exit: ${message.exit_code}, Time: ${message.execution_time}s`);
} else if (msgType === 'complete') {
console.log('[COMPLETE] Execution finished');
break;
}
}
await sandbox.kill();
}
handleAllMessageTypes();
Progress Monitoring
Monitor progress of long-running tasks:
from hopx_ai import Sandbox
import asyncio
async def monitor_progress():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
total_steps = 20
for i in range(total_steps):
progress = (i + 1) / total_steps * 100
print(f"Progress: {progress:.1f}% ({i+1}/{total_steps})")
time.sleep(0.5)
print("All steps completed!")
'''
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
# Display progress in real-time
print(message['data'], end='')
elif message['type'] == 'complete':
print("\nโ
Task completed successfully")
break
sandbox.kill()
asyncio.run(monitor_progress())
import { Sandbox } from '@hopx-ai/sdk';
async function monitorProgress() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
import time
total_steps = 20
for i in range(total_steps):
progress = (i + 1) / total_steps * 100
print(f"Progress: {progress:.1f}% ({i+1}/{total_steps})")
time.sleep(0.5)
print("All steps completed!")
`;
for await (const message of sandbox.runCodeStream(code)) {
if (message.type === 'stdout') {
// Display progress in real-time
process.stdout.write(message.data);
} else if (message.type === 'complete') {
console.log('\nโ
Task completed successfully');
break;
}
}
await sandbox.kill();
}
monitorProgress();
Environment Variables
Pass environment variables to streaming execution:
from hopx_ai import Sandbox
import asyncio
async def stream_with_env():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import os
print(f"API key: {os.getenv('API_KEY', 'not set')}")
print(f"Debug mode: {os.getenv('DEBUG', 'false')}")
'''
async for message in sandbox.run_code_stream(
code,
env={"API_KEY": "sk-123", "DEBUG": "true"}
):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'complete':
break
sandbox.kill()
asyncio.run(stream_with_env())
import { Sandbox } from '@hopx-ai/sdk';
async function streamWithEnv() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
import os
print(f"API key: {os.getenv('API_KEY', 'not set')}")
print(f"Debug mode: {os.getenv('DEBUG', 'false')}")
`;
for await (const message of sandbox.runCodeStream(code, {
env: { API_KEY: 'sk-123', DEBUG: 'true' }
})) {
if (message.type === 'stdout') {
process.stdout.write(message.data);
} else if (message.type === 'complete') {
break;
}
}
await sandbox.kill();
}
streamWithEnv();
Error Handling
Handle errors in streaming execution:
from hopx_ai import Sandbox
import asyncio
async def handle_streaming_errors():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
print("Starting...")
print(undefined_variable) # This will cause an error
print("This won't be reached")
'''
try:
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"ERROR: {message['data']}", end='')
elif message['type'] == 'result':
if message['exit_code'] != 0:
print(f"\nโ Execution failed with exit code: {message['exit_code']}")
elif message['type'] == 'complete':
if not message.get('success', False):
print("\nโ Execution completed with errors")
break
except Exception as e:
print(f"Streaming error: {e}")
finally:
sandbox.kill()
asyncio.run(handle_streaming_errors())
import { Sandbox } from '@hopx-ai/sdk';
async function handleStreamingErrors() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
print("Starting...")
print(undefined_variable) # This will cause an error
print("This won't be reached")
`;
try {
for await (const message of sandbox.runCodeStream(code)) {
if (message.type === 'stdout') {
process.stdout.write(message.data);
} else if (message.type === 'stderr') {
process.stderr.write(`ERROR: ${message.data}`);
} else if (message.type === 'result') {
if (message.exit_code !== 0) {
console.log(`\nโ Execution failed with exit code: ${message.exit_code}`);
}
} else if (message.type === 'complete') {
if (!message.success) {
console.log('\nโ Execution completed with errors');
}
break;
}
}
} catch (error) {
console.error(`Streaming error: ${error.message}`);
} finally {
await sandbox.kill();
}
}
handleStreamingErrors();
Complete Example
Hereโs a complete example with real-time progress:
from hopx_ai import Sandbox
import asyncio
async def complete_streaming_example():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
import random
print("Starting data processing...")
total_items = 100
for i in range(total_items):
# Simulate processing
time.sleep(0.1)
# Generate some output
if i % 10 == 0:
print(f"Processed {i}/{total_items} items")
# Simulate occasional errors
if random.random() < 0.05:
print(f"Warning: Issue with item {i}", file=sys.stderr)
print("Processing complete!")
print(f"Total items processed: {total_items}")
'''
print("๐ Starting streaming execution...\n")
async for message in sandbox.run_code_stream(
code,
language='python',
timeout=120,
env={"BATCH_SIZE": "10"}
):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"โ ๏ธ {message['data']}", end='')
elif message['type'] == 'result':
print(f"\n\n๐ Execution Summary:")
print(f" Exit code: {message['exit_code']}")
print(f" Execution time: {message['execution_time']:.2f}s")
elif message['type'] == 'complete':
print("\nโ
Streaming completed")
break
sandbox.kill()
asyncio.run(complete_streaming_example())
import { Sandbox } from '@hopx-ai/sdk';
async function completeStreamingExample() {
const sandbox = await Sandbox.create({ template: 'code-interpreter' });
const code = `
import time
import random
print("Starting data processing...")
total_items = 100
for i in range(total_items):
# Simulate processing
time.sleep(0.1)
# Generate some output
if i % 10 == 0:
print(f"Processed {i}/{total_items} items")
# Simulate occasional errors
if random.random() < 0.05:
print(f"Warning: Issue with item {i}", file=sys.stderr)
print("Processing complete!")
print(f"Total items processed: {total_items}")
`;
console.log('๐ Starting streaming execution...\n');
for await (const message of sandbox.runCodeStream(code, {
language: 'python',
timeout: 120,
env: { BATCH_SIZE: '10' }
})) {
if (message.type === 'stdout') {
process.stdout.write(message.data);
} else if (message.type === 'stderr') {
process.stderr.write(`โ ๏ธ ${message.data}`);
} else if (message.type === 'result') {
console.log(`\n\n๐ Execution Summary:`);
console.log(` Exit code: ${message.exit_code}`);
console.log(` Execution time: ${message.execution_time.toFixed(2)}s`);
} else if (message.type === 'complete') {
console.log('\nโ
Streaming completed');
break;
}
}
await sandbox.kill();
}
completeStreamingExample();
Best Practices
1. Use Async/Await
Streaming execution requires async/await. Make sure your code uses async functions and async for loops.
2. Handle All Message Types
Process stdout, stderr, result, and complete messages appropriately for a complete streaming experience.
3. Break on Complete
Always break the loop when you receive a โcompleteโ message to avoid unnecessary processing.
4. Handle Errors
Check exit codes and success flags to handle execution failures gracefully.
5. Use for Real-Time Feedback
Streaming is best for tasks where real-time output is valuable. For simple scripts, synchronous execution may be simpler.
Next Steps