Streaming execution provides real-time output as your code runs. Instead of waiting for completion, you receive stdout and stderr messages as theyโre generated, perfect for monitoring long-running tasks.
Overview
Streaming execution is ideal for:
- Long-running tasks where you want real-time progress
- Interactive code that produces output over time
- Monitoring execution progress
- Building real-time dashboards or UIs
Streaming execution uses WebSocket connections for real-time communication. It requires async/await support in your code.
Basic Streaming
Stream execution output in real-time:
from hopx_ai import Sandbox
import asyncio
async def stream_execution():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
for i in range(5):
print(f"Step {i+1}/5")
time.sleep(1)
print("Done!")
'''
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"ERROR: {message['data']}", end='', file=sys.stderr)
elif message['type'] == 'result':
print(f"\nExit code: {message['exit_code']}")
print(f"Execution time: {message['execution_time']}s")
elif message['type'] == 'complete':
print("\nโ
Execution completed")
break
sandbox.kill()
asyncio.run(stream_execution())
Message Types
Stream messages have different types:
from hopx_ai import Sandbox
import asyncio
async def handle_all_message_types():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import sys
print("This is stdout")
print("This is also stdout", file=sys.stdout)
print("This is stderr", file=sys.stderr)
'''
async for message in sandbox.run_code_stream(code):
msg_type = message['type']
if msg_type == 'stdout':
print(f"[STDOUT] {message['data']}", end='')
elif msg_type == 'stderr':
print(f"[STDERR] {message['data']}", end='')
elif msg_type == 'result':
print(f"\n[RESULT] Exit: {message['exit_code']}, Time: {message['execution_time']}s")
elif msg_type == 'complete':
print("[COMPLETE] Execution finished")
break
sandbox.kill()
asyncio.run(handle_all_message_types())
Progress Monitoring
Monitor progress of long-running tasks:
from hopx_ai import Sandbox
import asyncio
async def monitor_progress():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
total_steps = 20
for i in range(total_steps):
progress = (i + 1) / total_steps * 100
print(f"Progress: {progress:.1f}% ({i+1}/{total_steps})")
time.sleep(0.5)
print("All steps completed!")
'''
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
# Display progress in real-time
print(message['data'], end='')
elif message['type'] == 'complete':
print("\nโ
Task completed successfully")
break
sandbox.kill()
asyncio.run(monitor_progress())
Environment Variables
Pass environment variables to streaming execution:
from hopx_ai import Sandbox
import asyncio
async def stream_with_env():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import os
print(f"API key: {os.getenv('API_KEY', 'not set')}")
print(f"Debug mode: {os.getenv('DEBUG', 'false')}")
'''
async for message in sandbox.run_code_stream(
code,
env={"API_KEY": "sk-123", "DEBUG": "true"}
):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'complete':
break
sandbox.kill()
asyncio.run(stream_with_env())
Error Handling
Handle errors in streaming execution:
from hopx_ai import Sandbox
import asyncio
async def handle_streaming_errors():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
print("Starting...")
print(undefined_variable) # This will cause an error
print("This won't be reached")
'''
try:
async for message in sandbox.run_code_stream(code):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"ERROR: {message['data']}", end='')
elif message['type'] == 'result':
if message['exit_code'] != 0:
print(f"\nโ Execution failed with exit code: {message['exit_code']}")
elif message['type'] == 'complete':
if not message.get('success', False):
print("\nโ Execution completed with errors")
break
except Exception as e:
print(f"Streaming error: {e}")
finally:
sandbox.kill()
asyncio.run(handle_streaming_errors())
Complete Example
Hereโs a complete example with real-time progress:
from hopx_ai import Sandbox
import asyncio
async def complete_streaming_example():
sandbox = Sandbox.create(template="code-interpreter")
code = '''
import time
import random
print("Starting data processing...")
total_items = 100
for i in range(total_items):
# Simulate processing
time.sleep(0.1)
# Generate some output
if i % 10 == 0:
print(f"Processed {i}/{total_items} items")
# Simulate occasional errors
if random.random() < 0.05:
print(f"Warning: Issue with item {i}", file=sys.stderr)
print("Processing complete!")
print(f"Total items processed: {total_items}")
'''
print("๐ Starting streaming execution...\n")
async for message in sandbox.run_code_stream(
code,
language='python',
timeout=120,
env={"BATCH_SIZE": "10"}
):
if message['type'] == 'stdout':
print(message['data'], end='')
elif message['type'] == 'stderr':
print(f"โ ๏ธ {message['data']}", end='')
elif message['type'] == 'result':
print(f"\n\n๐ Execution Summary:")
print(f" Exit code: {message['exit_code']}")
print(f" Execution time: {message['execution_time']:.2f}s")
elif message['type'] == 'complete':
print("\nโ
Streaming completed")
break
sandbox.kill()
asyncio.run(complete_streaming_example())
Best Practices
1. Use Async/Await
Streaming execution requires async/await. Make sure your code uses async functions and async for loops.
2. Handle All Message Types
Process stdout, stderr, result, and complete messages appropriately for a complete streaming experience.
3. Break on Complete
Always break the loop when you receive a โcompleteโ message to avoid unnecessary processing.
4. Handle Errors
Check exit codes and success flags to handle execution failures gracefully.
5. Use for Real-Time Feedback
Streaming is best for tasks where real-time output is valuable. For simple scripts, synchronous execution may be simpler.
Next Steps