Examples¶
This page contains complete, runnable examples for common use cases. All examples include proper signal handling for production use.
Basic Examples¶
Simple JSON API Server¶
"""Simple REST API with JSON responses."""
import signal
import json
from cymongoose import Manager, MG_EV_HTTP_MSG
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
# In-memory database
users = {
"1": {"id": "1", "name": "Alice", "email": "alice@example.com"},
"2": {"id": "2", "name": "Bob", "email": "bob@example.com"},
}
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
method = data.method
uri = data.uri
# GET /api/users - List all users
if method == "GET" and uri == "/api/users":
response = json.dumps(list(users.values())).encode()
conn.reply(200, response,
headers={"Content-Type": "application/json"})
# GET /api/users/{id} - Get specific user
elif method == "GET" and uri.startswith("/api/users/"):
user_id = uri.split("/")[-1]
if user_id in users:
response = json.dumps(users[user_id]).encode()
conn.reply(200, response,
headers={"Content-Type": "application/json"})
else:
conn.reply(404, b'{"error": "User not found"}',
headers={"Content-Type": "application/json"})
# POST /api/users - Create user
elif method == "POST" and uri == "/api/users":
try:
new_user = json.loads(data.body_text)
user_id = str(len(users) + 1)
new_user["id"] = user_id
users[user_id] = new_user
response = json.dumps(new_user).encode()
conn.reply(201, response,
headers={"Content-Type": "application/json"})
except json.JSONDecodeError:
conn.reply(400, b'{"error": "Invalid JSON"}',
headers={"Content-Type": "application/json"})
else:
conn.reply(404, b'{"error": "Not found"}',
headers={"Content-Type": "application/json"})
conn.drain()
def main():
global shutdown_requested
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
manager = Manager(handler)
manager.listen('http://0.0.0.0:8000', http=True)
print("JSON API server running on http://0.0.0.0:8000")
print("Try: curl http://localhost:8000/api/users")
try:
while not shutdown_requested:
manager.poll(100)
print("Shutting down...")
finally:
manager.close()
if __name__ == "__main__":
main()
File Upload Server¶
"""HTTP server with file upload support."""
import signal
import os
from cymongoose import Manager, MG_EV_HTTP_MSG, http_parse_multipart
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
if data.method == "POST" and data.uri == "/upload":
# Parse multipart form data
offset = 0
while True:
offset, part = http_parse_multipart(data.body_bytes, offset)
if part is None:
break
filename = part['filename']
if filename:
# Save uploaded file
filepath = os.path.join("uploads", filename)
os.makedirs("uploads", exist_ok=True)
with open(filepath, "wb") as f:
f.write(part['body'])
print(f"Saved: {filepath}")
conn.reply(200, b'{"status": "uploaded"}',
headers={"Content-Type": "application/json"})
else:
# Serve upload form
html = b"""
<html>
<body>
<h1>File Upload</h1>
<form method="POST" action="/upload" enctype="multipart/form-data">
<input type="file" name="file">
<button type="submit">Upload</button>
</form>
</body>
</html>
"""
conn.reply(200, html,
headers={"Content-Type": "text/html"})
conn.drain()
# ... main() function similar to previous examples
Server-Sent Events (SSE)¶
"""Real-time updates using Server-Sent Events."""
import signal
import time
from cymongoose import Manager, MG_EV_HTTP_MSG
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
# Track SSE connections
sse_connections = []
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
if data.uri == "/events":
# Start SSE stream
conn.reply(200, "",
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
})
sse_connections.append(conn)
print(f"SSE client connected: {len(sse_connections)} total")
else:
# Serve test page
html = b"""
<html>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="events"></div>
<script>
const events = new EventSource('/events');
events.addEventListener('update', e => {
document.getElementById('events').innerHTML +=
'<p>' + e.data + '</p>';
});
</script>
</body>
</html>
"""
conn.reply(200, html,
headers={"Content-Type": "text/html"})
conn.drain()
def broadcast_updates():
"""Send updates to all SSE clients."""
import random
data = f"Update at {time.strftime('%H:%M:%S')} - Value: {random.randint(1, 100)}"
for conn in sse_connections[:]: # Copy list to avoid modification during iteration
try:
conn.http_sse("update", data)
except RuntimeError:
# Connection closed
sse_connections.remove(conn)
def main():
global shutdown_requested
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
manager = Manager(handler)
manager.listen('http://0.0.0.0:8000', http=True)
# Timer to broadcast updates every 2 seconds
manager.timer_add(2000, broadcast_updates, repeat=True)
print("SSE server running on http://0.0.0.0:8000")
try:
while not shutdown_requested:
manager.poll(100)
finally:
manager.close()
if __name__ == "__main__":
main()
WebSocket Examples¶
Chat Room Server¶
"""WebSocket-based chat room."""
import signal
from cymongoose import (
Manager,
MG_EV_HTTP_MSG,
MG_EV_WS_MSG,
MG_EV_CLOSE,
WEBSOCKET_OP_TEXT,
)
shutdown_requested = False
clients = []
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
def broadcast(message, exclude=None):
"""Send message to all clients except excluded one."""
for client in clients[:]:
if client != exclude:
try:
client.ws_send(message, WEBSOCKET_OP_TEXT)
except RuntimeError:
clients.remove(client)
def handler(conn, ev, data):
if ev == MG_EV_HTTP_MSG:
if data.uri == "/ws":
conn.ws_upgrade(data)
clients.append(conn)
conn.userdata = {"name": f"User{len(clients)}"}
broadcast(f"{conn.userdata['name']} joined", exclude=conn)
print(f"Client connected: {len(clients)} total")
else:
# Serve chat interface
html = b"""
<html><body>
<div id="messages" style="height:400px;overflow:auto;border:1px solid"></div>
<input id="input" type="text" style="width:300px">
<button onclick="send()">Send</button>
<script>
const ws = new WebSocket('ws://' + location.host + '/ws');
ws.onmessage = e => {
document.getElementById('messages').innerHTML +=
'<div>' + e.data + '</div>';
};
function send() {
const msg = document.getElementById('input').value;
ws.send(msg);
document.getElementById('input').value = '';
}
document.getElementById('input').onkeypress = e => {
if (e.key === 'Enter') send();
};
</script>
</body></html>
"""
conn.reply(200, html,
headers={"Content-Type": "text/html"})
conn.drain()
elif ev == MG_EV_WS_MSG:
# Broadcast message to all clients
message = f"{conn.userdata['name']}: {data.text}"
broadcast(message)
elif ev == MG_EV_CLOSE:
if conn in clients:
clients.remove(conn)
broadcast(f"{conn.userdata.get('name', 'User')} left")
print(f"Client disconnected: {len(clients)} remaining")
# ... main() function
MQTT Examples¶
Temperature Monitor¶
"""MQTT temperature monitoring system."""
import signal
import random
import time
from cymongoose import (
Manager,
MG_EV_MQTT_OPEN,
MG_EV_MQTT_MSG,
)
shutdown_requested = False
def signal_handler(sig, frame):
global shutdown_requested
shutdown_requested = True
def handler(conn, ev, data):
if ev == MG_EV_MQTT_OPEN:
print(f"Connected to broker, status={data}")
# Subscribe to temperature sensors
conn.mqtt_sub("sensors/+/temperature", qos=1)
conn.mqtt_sub("sensors/+/humidity", qos=1)
elif ev == MG_EV_MQTT_MSG:
topic = data.topic
value = data.text
print(f"{topic}: {value}")
# Trigger alert if temperature too high
if "temperature" in topic and float(value) > 30:
alert = f"HIGH TEMP ALERT: {value} C on {topic}"
conn.mqtt_pub("alerts/temperature", alert, qos=1)
def publish_readings(conn):
"""Publish simulated sensor readings."""
sensors = ["sensor1", "sensor2", "sensor3"]
for sensor in sensors:
temp = random.uniform(20, 35)
humidity = random.uniform(40, 80)
conn.mqtt_pub(f"sensors/{sensor}/temperature",
f"{temp:.1f}", qos=1)
conn.mqtt_pub(f"sensors/{sensor}/humidity",
f"{humidity:.1f}", qos=1)
def main():
global shutdown_requested
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
manager = Manager(handler)
conn = manager.mqtt_connect(
"mqtt://broker.hivemq.com:1883",
client_id="temp-monitor",
clean_session=True,
keepalive=60,
)
# Publish readings every 5 seconds
manager.timer_add(5000, lambda: publish_readings(conn), repeat=True)
print("MQTT temperature monitor started")
try:
while not shutdown_requested:
manager.poll(100)
finally:
manager.close()
if __name__ == "__main__":
main()
Web Framework¶
Micro-Framework with Decorator Routing¶
cymongoose ships with a minimal Flask/Bottle-style micro-framework example
that layers decorator-based routing on top of the raw event loop. It lives
in tests/examples/http/http_web_framework.py and demonstrates how little
glue is needed to get a familiar web-framework feel while keeping the
performance of a C event loop.
Features:
@app.get,@app.post,@app.put,@app.deletedecorators- Path parameters with type conversion (
/items/<int:id>) - JSON request parsing (
req.json()) andjson_response()helper - Return-value coercion -- handlers can return
str,dict,tuple,Response, orNone - Before/after request hooks
- Custom 404 handler via
@app.not_found
"""Micro-framework CRUD API."""
from http_web_framework import App, Response, json_response
app = App()
items = []
@app.get("/")
def index(req):
return "<h1>Hello from cymongoose</h1>"
@app.get("/items")
def list_items(req):
return json_response(items)
@app.post("/items")
def create_item(req):
body = req.json()
if body is None:
return Response("Invalid JSON", status=400)
items.append(body)
return json_response(body, status=201)
@app.get("/items/<int:id>")
def get_item(req, id):
if 0 <= id < len(items):
return json_response(items[id])
return json_response({"error": "not found"}, status=404)
@app.delete("/items/<int:id>")
def delete_item(req, id):
if 0 <= id < len(items):
return json_response(items.pop(id))
return json_response({"error": "not found"}, status=404)
if __name__ == "__main__":
app.run() # http://0.0.0.0:8000
Under the hood the App class compiles each route pattern into a regex,
wraps the incoming HttpMessage in a lightweight Request object, and
coerces handler return values into a Response that is sent via
conn.reply(). The entire routing layer is ~120 lines of pure Python.
Framework Routing Overhead¶
Benchmarked with wrk -t4 -c100 -d10s on Apple Silicon
(see tests/benchmarks/bench_web_framework.py):
| Configuration | Req/sec | Avg Latency | vs Raw |
|---|---|---|---|
| Raw handler (no framework) | 119,857 | 0.83 ms | baseline |
| Framework -- static route | 102,255 | 0.97 ms | 85% |
| Framework -- parameterised route | 84,602 | 1.18 ms | 71% |
The routing layer adds 15--29% overhead depending on whether path parameters and JSON serialisation are involved. Even the slowest configuration (84k req/s) is 8x faster than FastAPI and 52x faster than Flask.
# Run the benchmark yourself
uv run python tests/benchmarks/bench_web_framework.py
# Or use wrk for accurate throughput numbers
uv run python tests/benchmarks/bench_web_framework.py --serve framework
wrk -t4 -c100 -d10s http://localhost:8765/
Advanced Examples¶
Multi-threaded Request Handler¶
See Threading Guide for complete example of offloading work to background threads.
HTTPS Reverse Proxy¶
See tests/examples/advanced/http_proxy_client.py for a complete HTTP
proxy implementation.
TLS/HTTPS Server¶
See tests/examples/advanced/tls_https_server.py for production TLS
configuration.
More Examples¶
The tests/examples/ directory contains 17 complete, tested examples
covering all protocols:
- HTTP: Server, client, streaming, file upload, RESTful API, SSE
- WebSocket: Server, broadcasting
- MQTT: Client, server/broker
- Network: TCP echo, UDP echo, DNS resolution, SNTP time sync
- Advanced: TLS/HTTPS, HTTP proxy, multi-threading
All examples are runnable and include comprehensive tests:
# Run HTTP server example
python tests/examples/http/http_server.py
# Run WebSocket chat
python tests/examples/websocket/websocket_server.py
# Run MQTT client
python tests/examples/mqtt/mqtt_client.py
See Also¶
- Quickstart - Basic tutorial
- User Guide - Protocol guides
- API Reference - API reference