Skip to main content
Streaming execution provides real-time output as your code runs. Instead of waiting for completion, you receive stdout and stderr messages as theyโ€™re generated, perfect for monitoring long-running tasks.

Overview

Streaming execution is ideal for:
  • Long-running tasks where you want real-time progress
  • Interactive code that produces output over time
  • Monitoring execution progress
  • Building real-time dashboards or UIs
Streaming execution uses WebSocket connections for real-time communication. It requires async/await support in your code.

Basic Streaming

Stream execution output in real-time:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def stream_execution():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
import time
for i in range(5):
    print(f"Step {i+1}/5")
    time.sleep(1)
print("Done!")
'''
    
    async for message in sandbox.run_code_stream(code):
        if message['type'] == 'stdout':
            print(message['data'], end='')
        elif message['type'] == 'stderr':
            print(f"ERROR: {message['data']}", end='', file=sys.stderr)
        elif message['type'] == 'result':
            print(f"\nExit code: {message['exit_code']}")
            print(f"Execution time: {message['execution_time']}s")
        elif message['type'] == 'complete':
            print("\nโœ… Execution completed")
            break
    
    sandbox.kill()

asyncio.run(stream_execution())

Message Types

Stream messages have different types:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def handle_all_message_types():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
import sys
print("This is stdout")
print("This is also stdout", file=sys.stdout)
print("This is stderr", file=sys.stderr)
'''
    
    async for message in sandbox.run_code_stream(code):
        msg_type = message['type']
        
        if msg_type == 'stdout':
            print(f"[STDOUT] {message['data']}", end='')
        elif msg_type == 'stderr':
            print(f"[STDERR] {message['data']}", end='')
        elif msg_type == 'result':
            print(f"\n[RESULT] Exit: {message['exit_code']}, Time: {message['execution_time']}s")
        elif msg_type == 'complete':
            print("[COMPLETE] Execution finished")
            break
    
    sandbox.kill()

asyncio.run(handle_all_message_types())

Progress Monitoring

Monitor progress of long-running tasks:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def monitor_progress():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
import time
total_steps = 20

for i in range(total_steps):
    progress = (i + 1) / total_steps * 100
    print(f"Progress: {progress:.1f}% ({i+1}/{total_steps})")
    time.sleep(0.5)
print("All steps completed!")
'''
    
    async for message in sandbox.run_code_stream(code):
        if message['type'] == 'stdout':
            # Display progress in real-time
            print(message['data'], end='')
        elif message['type'] == 'complete':
            print("\nโœ… Task completed successfully")
            break
    
    sandbox.kill()

asyncio.run(monitor_progress())

Environment Variables

Pass environment variables to streaming execution:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def stream_with_env():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
import os
print(f"API key: {os.getenv('API_KEY', 'not set')}")
print(f"Debug mode: {os.getenv('DEBUG', 'false')}")
'''
    
    async for message in sandbox.run_code_stream(
        code,
        env={"API_KEY": "sk-123", "DEBUG": "true"}
    ):
        if message['type'] == 'stdout':
            print(message['data'], end='')
        elif message['type'] == 'complete':
            break
    
    sandbox.kill()

asyncio.run(stream_with_env())

Error Handling

Handle errors in streaming execution:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def handle_streaming_errors():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
print("Starting...")
print(undefined_variable)  # This will cause an error
print("This won't be reached")
'''
    
    try:
        async for message in sandbox.run_code_stream(code):
            if message['type'] == 'stdout':
                print(message['data'], end='')
            elif message['type'] == 'stderr':
                print(f"ERROR: {message['data']}", end='')
            elif message['type'] == 'result':
                if message['exit_code'] != 0:
                    print(f"\nโŒ Execution failed with exit code: {message['exit_code']}")
            elif message['type'] == 'complete':
                if not message.get('success', False):
                    print("\nโŒ Execution completed with errors")
                break
    except Exception as e:
        print(f"Streaming error: {e}")
    finally:
        sandbox.kill()

asyncio.run(handle_streaming_errors())

Complete Example

Hereโ€™s a complete example with real-time progress:
  • Python
  • JavaScript
from hopx_ai import Sandbox
import asyncio

async def complete_streaming_example():
    sandbox = Sandbox.create(template="code-interpreter")
    
    code = '''
import time
import random

print("Starting data processing...")
total_items = 100

for i in range(total_items):
    # Simulate processing
    time.sleep(0.1)
    
    # Generate some output
    if i % 10 == 0:
        print(f"Processed {i}/{total_items} items")
    
    # Simulate occasional errors
    if random.random() < 0.05:
        print(f"Warning: Issue with item {i}", file=sys.stderr)

print("Processing complete!")
print(f"Total items processed: {total_items}")
'''
    
    print("๐Ÿ”„ Starting streaming execution...\n")
    
    async for message in sandbox.run_code_stream(
        code,
        language='python',
        timeout=120,
        env={"BATCH_SIZE": "10"}
    ):
        if message['type'] == 'stdout':
            print(message['data'], end='')
        elif message['type'] == 'stderr':
            print(f"โš ๏ธ  {message['data']}", end='')
        elif message['type'] == 'result':
            print(f"\n\n๐Ÿ“Š Execution Summary:")
            print(f"   Exit code: {message['exit_code']}")
            print(f"   Execution time: {message['execution_time']:.2f}s")
        elif message['type'] == 'complete':
            print("\nโœ… Streaming completed")
            break
    
    sandbox.kill()

asyncio.run(complete_streaming_example())

Best Practices

1

1. Use Async/Await

Streaming execution requires async/await. Make sure your code uses async functions and async for loops.
2

2. Handle All Message Types

Process stdout, stderr, result, and complete messages appropriately for a complete streaming experience.
3

3. Break on Complete

Always break the loop when you receive a โ€˜completeโ€™ message to avoid unnecessary processing.
4

4. Handle Errors

Check exit codes and success flags to handle execution failures gracefully.
5

5. Use for Real-Time Feedback

Streaming is best for tasks where real-time output is valuable. For simple scripts, synchronous execution may be simpler.

Next Steps