Skip to content

Multi-Threading Guide

This guide covers thread-safe patterns for multi-threaded cymongoose applications.

Overview

cymongoose supports multi-threading through the wakeup() mechanism, which allows background worker threads to communicate with the event loop thread safely.

Use Cases:

  • Offload CPU-intensive work
  • Background database queries
  • File I/O operations
  • External API calls

Basic Pattern

import threading
import queue
from cymongoose import Manager, MG_EV_HTTP_MSG, MG_EV_WAKEUP

# Enable wakeup support
manager = Manager(handler, enable_wakeup=True)

# Work queues
work_queue = queue.Queue()
result_queue = queue.Queue()

def worker():
    """Background worker thread."""
    while True:
        work = work_queue.get()
        if work is None:
            break

        # Process work
        result = expensive_computation(work['data'])

        # Store result
        result_queue.put({
            'conn_id': work['conn_id'],
            'result': result,
        })

        # Wake up event loop
        manager.wakeup(work['conn_id'], b"result_ready")

# Start worker
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        # Offload to worker
        work_queue.put({
            'conn_id': conn.id,  # Use ID, not Connection object
            'data': data.body_bytes,
        })

    elif ev == MG_EV_WAKEUP:
        # Result ready
        result = result_queue.get()
        conn.reply(200, result['result'])
        conn.drain()

Complete Example

Image Processing Server

import threading
import queue
import time
from cymongoose import Manager, MG_EV_HTTP_MSG, MG_EV_WAKEUP

# Connection tracking
connections = {}  # {conn_id: conn}

# Work queues
work_queue = queue.Queue()
result_queue = queue.Queue()

def process_image(image_data):
    """Simulate expensive image processing."""
    time.sleep(2)  # CPU-intensive work
    return b"PROCESSED_" + image_data

def worker():
    """Background image processor."""
    while True:
        work = work_queue.get()
        if work is None:
            break

        try:
            result = process_image(work['data'])
            result_queue.put({
                'conn_id': work['conn_id'],
                'result': result,
                'error': None,
            })
        except Exception as e:
            result_queue.put({
                'conn_id': work['conn_id'],
                'result': None,
                'error': str(e),
            })

        # Wake up event loop
        manager.wakeup(work['conn_id'], b"processed")

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        # Track connection
        connections[conn.id] = conn

        # Offload to worker
        work_queue.put({
            'conn_id': conn.id,
            'data': data.body_bytes,
        })

    elif ev == MG_EV_WAKEUP:
        # Result ready
        result = result_queue.get()

        # Look up connection
        conn = connections.get(result['conn_id'])
        if conn:
            if result['error']:
                conn.reply(500, result['error'].encode())
            else:
                conn.reply(200, result['result'])
            conn.drain()

            # Clean up
            del connections[conn.id]

# Create manager with wakeup support
manager = Manager(handler, enable_wakeup=True)

# Start worker threads
num_workers = 4
for _ in range(num_workers):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

manager.listen('http://0.0.0.0:8000', http=True)

while True:
    manager.poll(100)

Method Thread-Safety Reference

Method Thread safety Notes
poll() Poll-thread only Not safe for concurrent calls. Releases GIL during C call.
listen() Poll-thread only Call before starting poll loop or from a handler.
connect() Poll-thread only Same as listen().
mqtt_connect() Poll-thread only Same as listen().
mqtt_listen() Poll-thread only Same as listen().
sntp_connect() Poll-thread only Same as listen().
timer_add() Poll-thread only Same as listen().
close() Poll-thread only Raises if poll() is active on another thread.
wakeup() Thread-safe Writes to an internal pipe; safe during poll().
Timer.cancel() Thread-safe Deferred to next poll() via internal queue.
AsyncManager.* Any thread All delegated methods serialised by internal RLock.

Thread Safety Rules

1. Pass Connection IDs, Not Objects

# Bad: Pass connection object to thread
work_queue.put({'conn': conn})  # UNSAFE!

# Good: Pass connection ID
work_queue.put({'conn_id': conn.id})  # Safe

2. Wakeup Data Must Be Bytes

# Bad: String
manager.wakeup(conn_id, "hello")  # ERROR!

# Good: Bytes
manager.wakeup(conn_id, b"hello")  # OK

3. Wakeup Payload Size Limit

Manager.wakeup() transmits data over a socketpair using a non-blocking send(). If the payload exceeds the socket send buffer the call succeeds (returns True) but the data is silently dropped.

The effective limit depends on the OS:

Platform Approximate limit
macOS ~9 KB
Linux ~64 KB

Keep wakeup payloads small -- ideally under 8 KB. For larger data, pass a key or identifier through wakeup() and store the actual data in a thread-safe structure (dict with a lock, queue.Queue, etc.):

import threading
import uuid

stash = {}
stash_lock = threading.Lock()

def worker(manager, conn_id, result_bytes):
    if len(result_bytes) > 8000:
        # Stash the data, send only the key
        key = uuid.uuid4().hex
        with stash_lock:
            stash[key] = result_bytes
        manager.wakeup(conn_id, b"KEY:" + key.encode())
    else:
        # Small enough to send inline
        manager.wakeup(conn_id, result_bytes)

def handler(conn, ev, data):
    if ev == MG_EV_WAKEUP:
        if data.startswith(b"KEY:"):
            key = data[4:].decode()
            with stash_lock:
                result = stash.pop(key)
            conn.reply(200, result)
        else:
            conn.reply(200, data)

Note

The WSGI adapter (cymongoose.wsgi) handles this automatically -- responses over 8 KB are stashed transparently.

4. Track Connections

# Store connections by ID
connections = {}

def handler(conn, ev, data):
    if ev == MG_EV_ACCEPT or ev == MG_EV_HTTP_MSG:
        connections[conn.id] = conn

    elif ev == MG_EV_CLOSE:
        if conn.id in connections:
            del connections[conn.id]

    elif ev == MG_EV_WAKEUP:
        # Look up by ID
        if conn.id in connections:
            process_result(conn)

5. Use Queue for Communication

import queue

# Thread-safe queues
work_queue = queue.Queue()
result_queue = queue.Queue()

# Don't use lists or dicts without locks

Multiple Worker Pools

Different workers for different tasks:

db_queue = queue.Queue()
file_queue = queue.Queue()
compute_queue = queue.Queue()

def db_worker():
    while True:
        work = db_queue.get()
        # Database operations

def file_worker():
    while True:
        work = file_queue.get()
        # File I/O

def compute_worker():
    while True:
        work = compute_queue.get()
        # CPU-intensive work

# Start different worker pools
for _ in range(2):
    threading.Thread(target=db_worker, daemon=True).start()

for _ in range(4):
    threading.Thread(target=compute_worker, daemon=True).start()

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        if data.uri.startswith("/db/"):
            db_queue.put(...)
        elif data.uri.startswith("/compute/"):
            compute_queue.put(...)

Fast Path vs Slow Path

Handle simple requests immediately, offload complex ones:

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        # Fast path: simple request
        if data.uri == "/status":
            conn.reply(200, b'{"status": "ok"}')
            conn.drain()

        # Slow path: complex request
        elif data.uri == "/process":
            connections[conn.id] = conn
            work_queue.put({
                'conn_id': conn.id,
                'data': data.body_bytes,
            })

Graceful Shutdown

Stop workers cleanly:

shutdown_requested = False

def signal_handler(sig, frame):
    global shutdown_requested
    shutdown_requested = True

    # Stop workers
    for _ in range(num_workers):
        work_queue.put(None)  # Poison pill

signal.signal(signal.SIGINT, signal_handler)

# Main loop
while not shutdown_requested:
    manager.poll(100)

# Wait for workers
for thread in worker_threads:
    thread.join(timeout=5.0)

manager.close()

Best Practices

  1. Enable wakeup when creating Manager
  2. Pass conn.id to threads, not Connection objects
  3. Use thread-safe queues for communication
  4. Track connections by ID in a dict
  5. Clean up on MG_EV_CLOSE
  6. Limit worker count (2-4x CPU cores)
  7. Handle errors in worker threads
  8. Implement timeouts to prevent hangs

See Also