Multi-Threading Guide¶
This guide covers thread-safe patterns for multi-threaded cymongoose applications.
Overview¶
cymongoose supports multi-threading through the wakeup() mechanism, which allows background worker threads to communicate with the event loop thread safely.
Use Cases:
- Offload CPU-intensive work
- Background database queries
- File I/O operations
- External API calls
Basic Pattern¶
import threading
import queue
from cymongoose import Manager, MG_EV_HTTP_MSG, MG_EV_WAKEUP
# Enable wakeup support
manager = Manager(handler, enable_wakeup=True)
# Work queues
work_queue = queue.Queue()
result_queue = queue.Queue()
def worker():
"""Background worker thread."""
while True:
work = work_queue.get()
if work is None:
break
# Process work
result = expensive_computation(work['data'])
# Store result
result_queue.put({
'conn_id': work['conn_id'],
'result': result,
})
# Wake up event loop
manager.wakeup(work['conn_id'], b"result_ready")
# Start worker
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
# Offload to worker
work_queue.put({
'conn_id': conn.id, # Use ID, not Connection object
'data': data.body_bytes,
})
elif ev == MG_EV_WAKEUP:
# Result ready
result = result_queue.get()
conn.reply(200, result['result'])
conn.drain()
Complete Example¶
Image Processing Server¶
import threading
import queue
import time
from cymongoose import Manager, MG_EV_HTTP_MSG, MG_EV_WAKEUP
# Connection tracking
connections = {} # {conn_id: conn}
# Work queues
work_queue = queue.Queue()
result_queue = queue.Queue()
def process_image(image_data):
"""Simulate expensive image processing."""
time.sleep(2) # CPU-intensive work
return b"PROCESSED_" + image_data
def worker():
"""Background image processor."""
while True:
work = work_queue.get()
if work is None:
break
try:
result = process_image(work['data'])
result_queue.put({
'conn_id': work['conn_id'],
'result': result,
'error': None,
})
except Exception as e:
result_queue.put({
'conn_id': work['conn_id'],
'result': None,
'error': str(e),
})
# Wake up event loop
manager.wakeup(work['conn_id'], b"processed")
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
# Track connection
connections[conn.id] = conn
# Offload to worker
work_queue.put({
'conn_id': conn.id,
'data': data.body_bytes,
})
elif ev == MG_EV_WAKEUP:
# Result ready
result = result_queue.get()
# Look up connection
conn = connections.get(result['conn_id'])
if conn:
if result['error']:
conn.reply(500, result['error'].encode())
else:
conn.reply(200, result['result'])
conn.drain()
# Clean up
del connections[conn.id]
# Create manager with wakeup support
manager = Manager(handler, enable_wakeup=True)
# Start worker threads
num_workers = 4
for _ in range(num_workers):
t = threading.Thread(target=worker, daemon=True)
t.start()
manager.listen('http://0.0.0.0:8000', http=True)
while True:
manager.poll(100)
Method Thread-Safety Reference¶
| Method | Thread safety | Notes |
|---|---|---|
poll() |
Poll-thread only | Not safe for concurrent calls. Releases GIL during C call. |
listen() |
Poll-thread only | Call before starting poll loop or from a handler. |
connect() |
Poll-thread only | Same as listen(). |
mqtt_connect() |
Poll-thread only | Same as listen(). |
mqtt_listen() |
Poll-thread only | Same as listen(). |
sntp_connect() |
Poll-thread only | Same as listen(). |
timer_add() |
Poll-thread only | Same as listen(). |
close() |
Poll-thread only | Raises if poll() is active on another thread. |
wakeup() |
Thread-safe | Writes to an internal pipe; safe during poll(). |
Timer.cancel() |
Thread-safe | Deferred to next poll() via internal queue. |
AsyncManager.* |
Any thread | All delegated methods serialised by internal RLock. |
Thread Safety Rules¶
1. Pass Connection IDs, Not Objects¶
# Bad: Pass connection object to thread
work_queue.put({'conn': conn}) # UNSAFE!
# Good: Pass connection ID
work_queue.put({'conn_id': conn.id}) # Safe
2. Wakeup Data Must Be Bytes¶
# Bad: String
manager.wakeup(conn_id, "hello") # ERROR!
# Good: Bytes
manager.wakeup(conn_id, b"hello") # OK
3. Wakeup Payload Size Limit¶
Manager.wakeup() transmits data over a socketpair using a
non-blocking send(). If the payload exceeds the socket send buffer
the call succeeds (returns True) but the data is silently dropped.
The effective limit depends on the OS:
| Platform | Approximate limit |
|---|---|
| macOS | ~9 KB |
| Linux | ~64 KB |
Keep wakeup payloads small -- ideally under 8 KB. For larger data,
pass a key or identifier through wakeup() and store the actual data
in a thread-safe structure (dict with a lock, queue.Queue, etc.):
import threading
import uuid
stash = {}
stash_lock = threading.Lock()
def worker(manager, conn_id, result_bytes):
if len(result_bytes) > 8000:
# Stash the data, send only the key
key = uuid.uuid4().hex
with stash_lock:
stash[key] = result_bytes
manager.wakeup(conn_id, b"KEY:" + key.encode())
else:
# Small enough to send inline
manager.wakeup(conn_id, result_bytes)
def handler(conn, ev, data):
if ev == MG_EV_WAKEUP:
if data.startswith(b"KEY:"):
key = data[4:].decode()
with stash_lock:
result = stash.pop(key)
conn.reply(200, result)
else:
conn.reply(200, data)
Note
The WSGI adapter (cymongoose.wsgi) handles this automatically --
responses over 8 KB are stashed transparently.
4. Track Connections¶
# Store connections by ID
connections = {}
def handler(conn, ev, data):
if ev == MG_EV_ACCEPT or ev == MG_EV_HTTP_MSG:
connections[conn.id] = conn
elif ev == MG_EV_CLOSE:
if conn.id in connections:
del connections[conn.id]
elif ev == MG_EV_WAKEUP:
# Look up by ID
if conn.id in connections:
process_result(conn)
5. Use Queue for Communication¶
import queue
# Thread-safe queues
work_queue = queue.Queue()
result_queue = queue.Queue()
# Don't use lists or dicts without locks
Multiple Worker Pools¶
Different workers for different tasks:
db_queue = queue.Queue()
file_queue = queue.Queue()
compute_queue = queue.Queue()
def db_worker():
while True:
work = db_queue.get()
# Database operations
def file_worker():
while True:
work = file_queue.get()
# File I/O
def compute_worker():
while True:
work = compute_queue.get()
# CPU-intensive work
# Start different worker pools
for _ in range(2):
threading.Thread(target=db_worker, daemon=True).start()
for _ in range(4):
threading.Thread(target=compute_worker, daemon=True).start()
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
if data.uri.startswith("/db/"):
db_queue.put(...)
elif data.uri.startswith("/compute/"):
compute_queue.put(...)
Fast Path vs Slow Path¶
Handle simple requests immediately, offload complex ones:
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
# Fast path: simple request
if data.uri == "/status":
conn.reply(200, b'{"status": "ok"}')
conn.drain()
# Slow path: complex request
elif data.uri == "/process":
connections[conn.id] = conn
work_queue.put({
'conn_id': conn.id,
'data': data.body_bytes,
})
Graceful Shutdown¶
Stop workers cleanly:
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
# Stop workers
for _ in range(num_workers):
work_queue.put(None) # Poison pill
signal.signal(signal.SIGINT, signal_handler)
# Main loop
while not shutdown_requested:
manager.poll(100)
# Wait for workers
for thread in worker_threads:
thread.join(timeout=5.0)
manager.close()
Best Practices¶
- Enable wakeup when creating Manager
- Pass conn.id to threads, not Connection objects
- Use thread-safe queues for communication
- Track connections by ID in a dict
- Clean up on MG_EV_CLOSE
- Limit worker count (2-4x CPU cores)
- Handle errors in worker threads
- Implement timeouts to prevent hangs