Skip to content

Manager Class

cymongoose.Manager

Manage Mongoose event loop and provide Python callbacks.

Source code in src/cymongoose/_mongoose.pyi
class Manager:
    """Manage Mongoose event loop and provide Python callbacks."""

    def __init__(
        self,
        handler: Optional[EventHandler] = None,
        enable_wakeup: bool = False,
        error_handler: Optional[Callable[[Exception], Any]] = None,
    ) -> None:
        """Initialize event manager.

        Args:
            handler: Default event handler for all connections
            enable_wakeup: Enable wakeup support for multi-threaded scenarios
            error_handler: Optional callback invoked with the exception when
                an event handler raises. Defaults to traceback.print_exc().
        """
        ...

    def __enter__(self) -> "Manager": ...
    def __exit__(self, *exc: Any) -> bool: ...
    def poll(self, timeout_ms: int = 0) -> None:
        """Drive the event loop once.

        Args:
            timeout_ms: Timeout in milliseconds (0 = non-blocking)

        Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
        use close() only after all polling threads have stopped to avoid race conditions.

        Raises:
            RuntimeError: If manager has been freed
        """
        ...

    def listen(
        self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
    ) -> Connection:
        """Listen on a URL; handler is optional per-listener override.

        The ``http`` flag is inferred from the URL scheme when not provided
        explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
        enable HTTP protocol parsing.  Pass ``http=False`` to override.

        To listen on an OS-assigned port, pass port 0 and read the actual
        port from the returned connection::

            listener = mgr.listen("http://0.0.0.0:0")
            port = listener.local_addr[1]

        Args:
            url: URL to listen on (e.g., "http://0.0.0.0:8000", "tcp://0.0.0.0:1234")
            handler: Optional per-connection handler (overrides default)
            http: Enable HTTP protocol handler (inferred from scheme if None)

        Returns:
            Listener connection object

        Raises:
            RuntimeError: If failed to listen on URL
        """
        ...

    def connect(
        self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
    ) -> Connection:
        """Create an outbound connection and return immediately.

        The ``http`` flag is inferred from the URL scheme when not provided
        explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
        enable HTTP protocol parsing.  Pass ``http=False`` to override.

        Args:
            url: URL to connect to (e.g., "http://example.com", "tcp://example.com:1234")
            handler: Optional per-connection handler (overrides default)
            http: Enable HTTP protocol handler (inferred from scheme if None)

        Returns:
            Connection object

        Raises:
            RuntimeError: If failed to connect to URL
        """
        ...

    def ws_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
        """Create an outbound WebSocket connection.

        Connects and automatically sends the WebSocket upgrade handshake.
        The handler will receive MG_EV_WS_OPEN when the handshake completes,
        then MG_EV_WS_MSG for incoming frames.

        Args:
            url: WebSocket URL (e.g., 'ws://example.com/ws', 'wss://example.com/ws')
            handler: Optional per-connection handler (overrides default)

        Returns:
            Connection object

        Raises:
            RuntimeError: If failed to connect
        """
        ...

    def mqtt_connect(
        self,
        url: str,
        handler: Optional[EventHandler] = None,
        client_id: str = "",
        username: str = "",
        password: str = "",
        clean_session: bool = True,
        keepalive: int = 60,
    ) -> Connection:
        """Connect to an MQTT broker.

        Args:
            url: Broker URL (e.g., 'mqtt://broker.hivemq.com:1883')
            handler: Event handler callback
            client_id: MQTT client ID (autogenerated if empty)
            username: MQTT username (optional)
            password: MQTT password (optional)
            clean_session: Clean session flag
            keepalive: Keep-alive interval in seconds

        Returns:
            Connection object

        Raises:
            RuntimeError: If failed to connect to broker
        """
        ...

    def mqtt_listen(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
        """Listen for MQTT connections (broker mode).

        Args:
            url: Listen URL (e.g., 'mqtt://0.0.0.0:1883')
            handler: Event handler callback

        Returns:
            Listener connection object

        Raises:
            RuntimeError: If failed to listen for MQTT
        """
        ...

    def sntp_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
        """Connect to an SNTP (time) server.

        Triggers MG_EV_SNTP_TIME event when time is received.

        Args:
            url: SNTP server URL (e.g., 'udp://time.google.com:123')
            handler: Event handler callback

        Returns:
            Connection object

        Raises:
            RuntimeError: If failed to connect to SNTP server

        Example:
            def time_handler(conn, ev, data):
                if ev == MG_EV_SNTP_TIME:
                    # data is uint64_t epoch milliseconds
                    print(f"Time: {data} ms since epoch")

            conn = manager.sntp_connect("udp://time.google.com:123", time_handler)
            conn.sntp_request()  # Request time
        """
        ...

    def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
        """Send a wakeup notification to a specific connection (thread-safe).

        Args:
            connection_id: The connection ID to wake up
            data: Optional data payload (delivered via MG_EV_WAKEUP event)

        Returns:
            True if wakeup was sent successfully

        Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
        use close() only after all polling threads have stopped to avoid race conditions.

        Raises:
            RuntimeError: If manager has been freed
        """
        ...

    def timer_add(
        self,
        milliseconds: int,
        callback: Callable[[], None],
        *,
        repeat: bool = False,
        run_now: bool = False,
    ) -> "Timer":
        """Add a timer that calls a Python callback periodically.

        Args:
            milliseconds: Timer interval in milliseconds
            callback: Python callable (takes no arguments)
            repeat: If True, timer repeats; if False, runs once
            run_now: If True, callback is called immediately

        Returns:
            Timer object (caller must keep a reference to it)

        Note: Timers are automatically freed when they complete (MG_TIMER_AUTODELETE flag).
        The Timer object's __dealloc__ only releases the Python callback reference.

        Warning: The returned Timer must be kept alive (e.g., stored in a variable)
        for as long as the timer is active. If the Timer object is garbage collected
        while Mongoose still holds the timer, the callback pointer becomes dangling.

        Raises:
            RuntimeError: If manager has been freed or timer creation failed

        Example:
            def heartbeat():
                print("ping")

            timer = manager.timer_add(1000, heartbeat, repeat=True)
        """
        ...

    def run(self, poll_ms: int = 100) -> None:
        """Run the event loop, blocking until SIGINT or SIGTERM.

        Installs signal handlers for graceful shutdown, runs the poll loop,
        and calls close() when done. Original signal handlers are restored.

        Args:
            poll_ms: Poll timeout in milliseconds (default: 100)
        """
        ...

    @property
    def connections(self) -> Tuple[Connection, ...]:
        """Return a snapshot of all active connections as a tuple."""
        ...

    @property
    def wakeup_id(self) -> int:
        """Connection ID of the internal wakeup pipe, or 0 if wakeup is not enabled."""
        ...

    def close(self) -> None:
        """Free the underlying manager and release resources."""
        ...

connections property

Return a snapshot of all active connections as a tuple.

wakeup_id property

Connection ID of the internal wakeup pipe, or 0 if wakeup is not enabled.

__init__(handler=None, enable_wakeup=False, error_handler=None)

Initialize event manager.

Parameters:

Name Type Description Default
handler Optional[EventHandler]

Default event handler for all connections

None
enable_wakeup bool

Enable wakeup support for multi-threaded scenarios

False
error_handler Optional[Callable[[Exception], Any]]

Optional callback invoked with the exception when an event handler raises. Defaults to traceback.print_exc().

None
Source code in src/cymongoose/_mongoose.pyi
def __init__(
    self,
    handler: Optional[EventHandler] = None,
    enable_wakeup: bool = False,
    error_handler: Optional[Callable[[Exception], Any]] = None,
) -> None:
    """Initialize event manager.

    Args:
        handler: Default event handler for all connections
        enable_wakeup: Enable wakeup support for multi-threaded scenarios
        error_handler: Optional callback invoked with the exception when
            an event handler raises. Defaults to traceback.print_exc().
    """
    ...

__enter__()

Source code in src/cymongoose/_mongoose.pyi
def __enter__(self) -> "Manager": ...

__exit__(*exc)

Source code in src/cymongoose/_mongoose.pyi
def __exit__(self, *exc: Any) -> bool: ...

poll(timeout_ms=0)

Drive the event loop once.

Parameters:

Name Type Description Default
timeout_ms int

Timeout in milliseconds (0 = non-blocking)

0

Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios, use close() only after all polling threads have stopped to avoid race conditions.

Raises:

Type Description
RuntimeError

If manager has been freed

Source code in src/cymongoose/_mongoose.pyi
def poll(self, timeout_ms: int = 0) -> None:
    """Drive the event loop once.

    Args:
        timeout_ms: Timeout in milliseconds (0 = non-blocking)

    Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
    use close() only after all polling threads have stopped to avoid race conditions.

    Raises:
        RuntimeError: If manager has been freed
    """
    ...

listen(url, handler=None, *, http=None)

Listen on a URL; handler is optional per-listener override.

The http flag is inferred from the URL scheme when not provided explicitly: http://, https://, ws://, wss:// all enable HTTP protocol parsing. Pass http=False to override.

To listen on an OS-assigned port, pass port 0 and read the actual port from the returned connection::

listener = mgr.listen("http://0.0.0.0:0")
port = listener.local_addr[1]

Parameters:

Name Type Description Default
url str

URL to listen on (e.g., "http://0.0.0.0:8000", "tcp://0.0.0.0:1234")

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None
http Optional[bool]

Enable HTTP protocol handler (inferred from scheme if None)

None

Returns:

Type Description
Connection

Listener connection object

Raises:

Type Description
RuntimeError

If failed to listen on URL

Source code in src/cymongoose/_mongoose.pyi
def listen(
    self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
) -> Connection:
    """Listen on a URL; handler is optional per-listener override.

    The ``http`` flag is inferred from the URL scheme when not provided
    explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
    enable HTTP protocol parsing.  Pass ``http=False`` to override.

    To listen on an OS-assigned port, pass port 0 and read the actual
    port from the returned connection::

        listener = mgr.listen("http://0.0.0.0:0")
        port = listener.local_addr[1]

    Args:
        url: URL to listen on (e.g., "http://0.0.0.0:8000", "tcp://0.0.0.0:1234")
        handler: Optional per-connection handler (overrides default)
        http: Enable HTTP protocol handler (inferred from scheme if None)

    Returns:
        Listener connection object

    Raises:
        RuntimeError: If failed to listen on URL
    """
    ...

connect(url, handler=None, *, http=None)

Create an outbound connection and return immediately.

The http flag is inferred from the URL scheme when not provided explicitly: http://, https://, ws://, wss:// all enable HTTP protocol parsing. Pass http=False to override.

Parameters:

Name Type Description Default
url str

URL to connect to (e.g., "http://example.com", "tcp://example.com:1234")

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None
http Optional[bool]

Enable HTTP protocol handler (inferred from scheme if None)

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to URL

Source code in src/cymongoose/_mongoose.pyi
def connect(
    self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
) -> Connection:
    """Create an outbound connection and return immediately.

    The ``http`` flag is inferred from the URL scheme when not provided
    explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
    enable HTTP protocol parsing.  Pass ``http=False`` to override.

    Args:
        url: URL to connect to (e.g., "http://example.com", "tcp://example.com:1234")
        handler: Optional per-connection handler (overrides default)
        http: Enable HTTP protocol handler (inferred from scheme if None)

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to URL
    """
    ...

ws_connect(url, handler=None)

Create an outbound WebSocket connection.

Connects and automatically sends the WebSocket upgrade handshake. The handler will receive MG_EV_WS_OPEN when the handshake completes, then MG_EV_WS_MSG for incoming frames.

Parameters:

Name Type Description Default
url str

WebSocket URL (e.g., 'ws://example.com/ws', 'wss://example.com/ws')

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect

Source code in src/cymongoose/_mongoose.pyi
def ws_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Create an outbound WebSocket connection.

    Connects and automatically sends the WebSocket upgrade handshake.
    The handler will receive MG_EV_WS_OPEN when the handshake completes,
    then MG_EV_WS_MSG for incoming frames.

    Args:
        url: WebSocket URL (e.g., 'ws://example.com/ws', 'wss://example.com/ws')
        handler: Optional per-connection handler (overrides default)

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect
    """
    ...

mqtt_connect(url, handler=None, client_id='', username='', password='', clean_session=True, keepalive=60)

Connect to an MQTT broker.

Parameters:

Name Type Description Default
url str

Broker URL (e.g., 'mqtt://broker.hivemq.com:1883')

required
handler Optional[EventHandler]

Event handler callback

None
client_id str

MQTT client ID (autogenerated if empty)

''
username str

MQTT username (optional)

''
password str

MQTT password (optional)

''
clean_session bool

Clean session flag

True
keepalive int

Keep-alive interval in seconds

60

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to broker

Source code in src/cymongoose/_mongoose.pyi
def mqtt_connect(
    self,
    url: str,
    handler: Optional[EventHandler] = None,
    client_id: str = "",
    username: str = "",
    password: str = "",
    clean_session: bool = True,
    keepalive: int = 60,
) -> Connection:
    """Connect to an MQTT broker.

    Args:
        url: Broker URL (e.g., 'mqtt://broker.hivemq.com:1883')
        handler: Event handler callback
        client_id: MQTT client ID (autogenerated if empty)
        username: MQTT username (optional)
        password: MQTT password (optional)
        clean_session: Clean session flag
        keepalive: Keep-alive interval in seconds

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to broker
    """
    ...

mqtt_listen(url, handler=None)

Listen for MQTT connections (broker mode).

Parameters:

Name Type Description Default
url str

Listen URL (e.g., 'mqtt://0.0.0.0:1883')

required
handler Optional[EventHandler]

Event handler callback

None

Returns:

Type Description
Connection

Listener connection object

Raises:

Type Description
RuntimeError

If failed to listen for MQTT

Source code in src/cymongoose/_mongoose.pyi
def mqtt_listen(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Listen for MQTT connections (broker mode).

    Args:
        url: Listen URL (e.g., 'mqtt://0.0.0.0:1883')
        handler: Event handler callback

    Returns:
        Listener connection object

    Raises:
        RuntimeError: If failed to listen for MQTT
    """
    ...

sntp_connect(url, handler=None)

Connect to an SNTP (time) server.

Triggers MG_EV_SNTP_TIME event when time is received.

Parameters:

Name Type Description Default
url str

SNTP server URL (e.g., 'udp://time.google.com:123')

required
handler Optional[EventHandler]

Event handler callback

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to SNTP server

Example

def time_handler(conn, ev, data): if ev == MG_EV_SNTP_TIME: # data is uint64_t epoch milliseconds print(f"Time: {data} ms since epoch")

conn = manager.sntp_connect("udp://time.google.com:123", time_handler) conn.sntp_request() # Request time

Source code in src/cymongoose/_mongoose.pyi
def sntp_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Connect to an SNTP (time) server.

    Triggers MG_EV_SNTP_TIME event when time is received.

    Args:
        url: SNTP server URL (e.g., 'udp://time.google.com:123')
        handler: Event handler callback

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to SNTP server

    Example:
        def time_handler(conn, ev, data):
            if ev == MG_EV_SNTP_TIME:
                # data is uint64_t epoch milliseconds
                print(f"Time: {data} ms since epoch")

        conn = manager.sntp_connect("udp://time.google.com:123", time_handler)
        conn.sntp_request()  # Request time
    """
    ...

wakeup(connection_id, data=b'')

Send a wakeup notification to a specific connection (thread-safe).

Parameters:

Name Type Description Default
connection_id int

The connection ID to wake up

required
data bytes

Optional data payload (delivered via MG_EV_WAKEUP event)

b''

Returns:

Type Description
bool

True if wakeup was sent successfully

Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios, use close() only after all polling threads have stopped to avoid race conditions.

Raises:

Type Description
RuntimeError

If manager has been freed

Source code in src/cymongoose/_mongoose.pyi
def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
    """Send a wakeup notification to a specific connection (thread-safe).

    Args:
        connection_id: The connection ID to wake up
        data: Optional data payload (delivered via MG_EV_WAKEUP event)

    Returns:
        True if wakeup was sent successfully

    Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
    use close() only after all polling threads have stopped to avoid race conditions.

    Raises:
        RuntimeError: If manager has been freed
    """
    ...

timer_add(milliseconds, callback, *, repeat=False, run_now=False)

Add a timer that calls a Python callback periodically.

Parameters:

Name Type Description Default
milliseconds int

Timer interval in milliseconds

required
callback Callable[[], None]

Python callable (takes no arguments)

required
repeat bool

If True, timer repeats; if False, runs once

False
run_now bool

If True, callback is called immediately

False

Returns:

Type Description
Timer

Timer object (caller must keep a reference to it)

Note: Timers are automatically freed when they complete (MG_TIMER_AUTODELETE flag). The Timer object's dealloc only releases the Python callback reference.

Warning: The returned Timer must be kept alive (e.g., stored in a variable) for as long as the timer is active. If the Timer object is garbage collected while Mongoose still holds the timer, the callback pointer becomes dangling.

Raises:

Type Description
RuntimeError

If manager has been freed or timer creation failed

Example

def heartbeat(): print("ping")

timer = manager.timer_add(1000, heartbeat, repeat=True)

Source code in src/cymongoose/_mongoose.pyi
def timer_add(
    self,
    milliseconds: int,
    callback: Callable[[], None],
    *,
    repeat: bool = False,
    run_now: bool = False,
) -> "Timer":
    """Add a timer that calls a Python callback periodically.

    Args:
        milliseconds: Timer interval in milliseconds
        callback: Python callable (takes no arguments)
        repeat: If True, timer repeats; if False, runs once
        run_now: If True, callback is called immediately

    Returns:
        Timer object (caller must keep a reference to it)

    Note: Timers are automatically freed when they complete (MG_TIMER_AUTODELETE flag).
    The Timer object's __dealloc__ only releases the Python callback reference.

    Warning: The returned Timer must be kept alive (e.g., stored in a variable)
    for as long as the timer is active. If the Timer object is garbage collected
    while Mongoose still holds the timer, the callback pointer becomes dangling.

    Raises:
        RuntimeError: If manager has been freed or timer creation failed

    Example:
        def heartbeat():
            print("ping")

        timer = manager.timer_add(1000, heartbeat, repeat=True)
    """
    ...

run(poll_ms=100)

Run the event loop, blocking until SIGINT or SIGTERM.

Installs signal handlers for graceful shutdown, runs the poll loop, and calls close() when done. Original signal handlers are restored.

Parameters:

Name Type Description Default
poll_ms int

Poll timeout in milliseconds (default: 100)

100
Source code in src/cymongoose/_mongoose.pyi
def run(self, poll_ms: int = 100) -> None:
    """Run the event loop, blocking until SIGINT or SIGTERM.

    Installs signal handlers for graceful shutdown, runs the poll loop,
    and calls close() when done. Original signal handlers are restored.

    Args:
        poll_ms: Poll timeout in milliseconds (default: 100)
    """
    ...

close()

Free the underlying manager and release resources.

Source code in src/cymongoose/_mongoose.pyi
def close(self) -> None:
    """Free the underlying manager and release resources."""
    ...

Overview

The Manager class is the core of cymongoose. It manages the Mongoose event loop and all network connections.

Creating a Manager

from cymongoose import Manager

# With default handler for all connections
def handler(conn, ev, data):
    print(f"Event {ev} on connection {conn.id}")

manager = Manager(handler)

# Without default handler (use per-connection handlers)
manager = Manager()

# With wakeup support for multi-threading
manager = Manager(handler, enable_wakeup=True)

# With error handler for exceptions in event handlers
def on_error(exc):
    print(f"Handler error: {exc}")

manager = Manager(handler, error_handler=on_error)

Constructor

cymongoose.Manager.__init__(handler=None, enable_wakeup=False, error_handler=None)

Initialize event manager.

Parameters:

Name Type Description Default
handler Optional[EventHandler]

Default event handler for all connections

None
enable_wakeup bool

Enable wakeup support for multi-threaded scenarios

False
error_handler Optional[Callable[[Exception], Any]]

Optional callback invoked with the exception when an event handler raises. Defaults to traceback.print_exc().

None
Source code in src/cymongoose/_mongoose.pyi
def __init__(
    self,
    handler: Optional[EventHandler] = None,
    enable_wakeup: bool = False,
    error_handler: Optional[Callable[[Exception], Any]] = None,
) -> None:
    """Initialize event manager.

    Args:
        handler: Default event handler for all connections
        enable_wakeup: Enable wakeup support for multi-threaded scenarios
        error_handler: Optional callback invoked with the exception when
            an event handler raises. Defaults to traceback.print_exc().
    """
    ...

Listening for Connections

Create server sockets that accept incoming connections.

HTTP/HTTPS Server

# HTTP server (http= auto-inferred from scheme)
listener = manager.listen('http://0.0.0.0:8000')

# HTTPS server (requires TLS initialization)
listener = manager.listen('https://0.0.0.0:8443')

def handler(conn, ev, data):
    if ev == MG_EV_ACCEPT and conn.is_tls:
        # Initialize TLS on accepted connection
        opts = TlsOpts(cert=cert, key=key)
        conn.tls_init(opts)

TCP/UDP Server

# TCP server
tcp_listener = manager.listen('tcp://0.0.0.0:1234')

# UDP server
udp_listener = manager.listen('udp://0.0.0.0:5678')

MQTT Broker

# MQTT broker
mqtt_listener = manager.mqtt_listen('mqtt://0.0.0.0:1883')

Per-Listener Handler

Override the default handler for specific listeners:

def api_handler(conn, ev, data):
    # Handle API requests
    pass

def ws_handler(conn, ev, data):
    # Handle WebSocket connections
    pass

manager = Manager(default_handler)
manager.listen('http://0.0.0.0:8000', handler=api_handler)
manager.listen('http://0.0.0.0:9000', handler=ws_handler)

Methods

cymongoose.Manager.listen(url, handler=None, *, http=None)

Listen on a URL; handler is optional per-listener override.

The http flag is inferred from the URL scheme when not provided explicitly: http://, https://, ws://, wss:// all enable HTTP protocol parsing. Pass http=False to override.

To listen on an OS-assigned port, pass port 0 and read the actual port from the returned connection::

listener = mgr.listen("http://0.0.0.0:0")
port = listener.local_addr[1]

Parameters:

Name Type Description Default
url str

URL to listen on (e.g., "http://0.0.0.0:8000", "tcp://0.0.0.0:1234")

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None
http Optional[bool]

Enable HTTP protocol handler (inferred from scheme if None)

None

Returns:

Type Description
Connection

Listener connection object

Raises:

Type Description
RuntimeError

If failed to listen on URL

Source code in src/cymongoose/_mongoose.pyi
def listen(
    self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
) -> Connection:
    """Listen on a URL; handler is optional per-listener override.

    The ``http`` flag is inferred from the URL scheme when not provided
    explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
    enable HTTP protocol parsing.  Pass ``http=False`` to override.

    To listen on an OS-assigned port, pass port 0 and read the actual
    port from the returned connection::

        listener = mgr.listen("http://0.0.0.0:0")
        port = listener.local_addr[1]

    Args:
        url: URL to listen on (e.g., "http://0.0.0.0:8000", "tcp://0.0.0.0:1234")
        handler: Optional per-connection handler (overrides default)
        http: Enable HTTP protocol handler (inferred from scheme if None)

    Returns:
        Listener connection object

    Raises:
        RuntimeError: If failed to listen on URL
    """
    ...

cymongoose.Manager.mqtt_listen(url, handler=None)

Listen for MQTT connections (broker mode).

Parameters:

Name Type Description Default
url str

Listen URL (e.g., 'mqtt://0.0.0.0:1883')

required
handler Optional[EventHandler]

Event handler callback

None

Returns:

Type Description
Connection

Listener connection object

Raises:

Type Description
RuntimeError

If failed to listen for MQTT

Source code in src/cymongoose/_mongoose.pyi
def mqtt_listen(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Listen for MQTT connections (broker mode).

    Args:
        url: Listen URL (e.g., 'mqtt://0.0.0.0:1883')
        handler: Event handler callback

    Returns:
        Listener connection object

    Raises:
        RuntimeError: If failed to listen for MQTT
    """
    ...

Making Connections

Create outbound client connections.

HTTP/HTTPS Client

def client_handler(conn, ev, data):
    if ev == MG_EV_CONNECT:
        # Send HTTP request
        conn.send(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
    elif ev == MG_EV_HTTP_MSG:
        print(f"Status: {data.status()}")
        print(f"Body: {data.body_text}")
        conn.close()

# HTTP client (http= auto-inferred from scheme)
conn = manager.connect('http://example.com:80', client_handler)

# HTTPS client (TLS auto-initialized)
conn = manager.connect('https://example.com:443', client_handler)

WebSocket Client

def ws_handler(conn, ev, data):
    if ev == MG_EV_WS_OPEN:
        print("WebSocket connected")
        conn.ws_send("Hello!", WEBSOCKET_OP_TEXT)
    elif ev == MG_EV_WS_MSG:
        print(f"Received: {data.text}")

# WebSocket client (handles upgrade automatically)
conn = manager.ws_connect('ws://example.com/ws', ws_handler)

MQTT Client

def mqtt_handler(conn, ev, data):
    if ev == MG_EV_MQTT_OPEN:
        print("Connected to broker")
        conn.mqtt_sub("sensors/#", qos=1)
    elif ev == MG_EV_MQTT_MSG:
        print(f"{data.topic}: {data.text}")

conn = manager.mqtt_connect(
    'mqtt://broker.hivemq.com:1883',
    handler=mqtt_handler,
    client_id='my-client',
    clean_session=True,
    keepalive=60,
)

SNTP Client

def sntp_handler(conn, ev, data):
    if ev == MG_EV_SNTP_TIME:
        # data is milliseconds since epoch
        print(f"Time: {data} ms")

conn = manager.sntp_connect('udp://time.google.com:123', sntp_handler)
conn.sntp_request()

Methods

cymongoose.Manager.connect(url, handler=None, *, http=None)

Create an outbound connection and return immediately.

The http flag is inferred from the URL scheme when not provided explicitly: http://, https://, ws://, wss:// all enable HTTP protocol parsing. Pass http=False to override.

Parameters:

Name Type Description Default
url str

URL to connect to (e.g., "http://example.com", "tcp://example.com:1234")

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None
http Optional[bool]

Enable HTTP protocol handler (inferred from scheme if None)

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to URL

Source code in src/cymongoose/_mongoose.pyi
def connect(
    self, url: str, handler: Optional[EventHandler] = None, *, http: Optional[bool] = None
) -> Connection:
    """Create an outbound connection and return immediately.

    The ``http`` flag is inferred from the URL scheme when not provided
    explicitly: ``http://``, ``https://``, ``ws://``, ``wss://`` all
    enable HTTP protocol parsing.  Pass ``http=False`` to override.

    Args:
        url: URL to connect to (e.g., "http://example.com", "tcp://example.com:1234")
        handler: Optional per-connection handler (overrides default)
        http: Enable HTTP protocol handler (inferred from scheme if None)

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to URL
    """
    ...

cymongoose.Manager.ws_connect(url, handler=None)

Create an outbound WebSocket connection.

Connects and automatically sends the WebSocket upgrade handshake. The handler will receive MG_EV_WS_OPEN when the handshake completes, then MG_EV_WS_MSG for incoming frames.

Parameters:

Name Type Description Default
url str

WebSocket URL (e.g., 'ws://example.com/ws', 'wss://example.com/ws')

required
handler Optional[EventHandler]

Optional per-connection handler (overrides default)

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect

Source code in src/cymongoose/_mongoose.pyi
def ws_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Create an outbound WebSocket connection.

    Connects and automatically sends the WebSocket upgrade handshake.
    The handler will receive MG_EV_WS_OPEN when the handshake completes,
    then MG_EV_WS_MSG for incoming frames.

    Args:
        url: WebSocket URL (e.g., 'ws://example.com/ws', 'wss://example.com/ws')
        handler: Optional per-connection handler (overrides default)

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect
    """
    ...

cymongoose.Manager.mqtt_connect(url, handler=None, client_id='', username='', password='', clean_session=True, keepalive=60)

Connect to an MQTT broker.

Parameters:

Name Type Description Default
url str

Broker URL (e.g., 'mqtt://broker.hivemq.com:1883')

required
handler Optional[EventHandler]

Event handler callback

None
client_id str

MQTT client ID (autogenerated if empty)

''
username str

MQTT username (optional)

''
password str

MQTT password (optional)

''
clean_session bool

Clean session flag

True
keepalive int

Keep-alive interval in seconds

60

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to broker

Source code in src/cymongoose/_mongoose.pyi
def mqtt_connect(
    self,
    url: str,
    handler: Optional[EventHandler] = None,
    client_id: str = "",
    username: str = "",
    password: str = "",
    clean_session: bool = True,
    keepalive: int = 60,
) -> Connection:
    """Connect to an MQTT broker.

    Args:
        url: Broker URL (e.g., 'mqtt://broker.hivemq.com:1883')
        handler: Event handler callback
        client_id: MQTT client ID (autogenerated if empty)
        username: MQTT username (optional)
        password: MQTT password (optional)
        clean_session: Clean session flag
        keepalive: Keep-alive interval in seconds

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to broker
    """
    ...

cymongoose.Manager.sntp_connect(url, handler=None)

Connect to an SNTP (time) server.

Triggers MG_EV_SNTP_TIME event when time is received.

Parameters:

Name Type Description Default
url str

SNTP server URL (e.g., 'udp://time.google.com:123')

required
handler Optional[EventHandler]

Event handler callback

None

Returns:

Type Description
Connection

Connection object

Raises:

Type Description
RuntimeError

If failed to connect to SNTP server

Example

def time_handler(conn, ev, data): if ev == MG_EV_SNTP_TIME: # data is uint64_t epoch milliseconds print(f"Time: {data} ms since epoch")

conn = manager.sntp_connect("udp://time.google.com:123", time_handler) conn.sntp_request() # Request time

Source code in src/cymongoose/_mongoose.pyi
def sntp_connect(self, url: str, handler: Optional[EventHandler] = None) -> Connection:
    """Connect to an SNTP (time) server.

    Triggers MG_EV_SNTP_TIME event when time is received.

    Args:
        url: SNTP server URL (e.g., 'udp://time.google.com:123')
        handler: Event handler callback

    Returns:
        Connection object

    Raises:
        RuntimeError: If failed to connect to SNTP server

    Example:
        def time_handler(conn, ev, data):
            if ev == MG_EV_SNTP_TIME:
                # data is uint64_t epoch milliseconds
                print(f"Time: {data} ms since epoch")

        conn = manager.sntp_connect("udp://time.google.com:123", time_handler)
        conn.sntp_request()  # Request time
    """
    ...

Event Loop

The event loop drives all I/O operations.

Simple Event Loop

The run() method handles signal setup, polling, and cleanup:

manager = Manager(handler)
manager.listen('http://0.0.0.0:8000')
manager.run()  # Blocks until SIGINT/SIGTERM, then cleans up

Manual Polling

For more control, use poll() directly:

import signal

shutdown_requested = False

def signal_handler(sig, frame):
    global shutdown_requested
    shutdown_requested = True

signal.signal(signal.SIGINT, signal_handler)

manager = Manager(handler)
manager.listen('http://0.0.0.0:8000')

# Poll with 100ms timeout
while not shutdown_requested:
    manager.poll(100)

manager.close()

Timeout Guidelines

  • 100ms - Recommended default (responsive shutdown, low CPU)
  • 0ms - Non-blocking (busy loop, 100% CPU)
  • 1000ms+ - Longer timeouts (slow shutdown response)

See Performance for details.

Methods

cymongoose.Manager.run(poll_ms=100)

Run the event loop, blocking until SIGINT or SIGTERM.

Installs signal handlers for graceful shutdown, runs the poll loop, and calls close() when done. Original signal handlers are restored.

Parameters:

Name Type Description Default
poll_ms int

Poll timeout in milliseconds (default: 100)

100
Source code in src/cymongoose/_mongoose.pyi
def run(self, poll_ms: int = 100) -> None:
    """Run the event loop, blocking until SIGINT or SIGTERM.

    Installs signal handlers for graceful shutdown, runs the poll loop,
    and calls close() when done. Original signal handlers are restored.

    Args:
        poll_ms: Poll timeout in milliseconds (default: 100)
    """
    ...

cymongoose.Manager.poll(timeout_ms=0)

Drive the event loop once.

Parameters:

Name Type Description Default
timeout_ms int

Timeout in milliseconds (0 = non-blocking)

0

Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios, use close() only after all polling threads have stopped to avoid race conditions.

Raises:

Type Description
RuntimeError

If manager has been freed

Source code in src/cymongoose/_mongoose.pyi
def poll(self, timeout_ms: int = 0) -> None:
    """Drive the event loop once.

    Args:
        timeout_ms: Timeout in milliseconds (0 = non-blocking)

    Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
    use close() only after all polling threads have stopped to avoid race conditions.

    Raises:
        RuntimeError: If manager has been freed
    """
    ...

Active Connections

The connections property returns a snapshot of all active connections:

# Broadcast to all WebSocket clients
for conn in manager.connections:
    if conn.is_websocket:
        conn.ws_send("broadcast message", WEBSOCKET_OP_TEXT)

cymongoose.Manager.connections property

Return a snapshot of all active connections as a tuple.

Timers

Execute callbacks periodically.

One-Shot Timer

def callback():
    print("Timer fired!")

# Fire once after 5 seconds
timer = manager.timer_add(5000, callback)

Repeating Timer

def heartbeat():
    print(f"Alive at {time.time()}")

# Fire every second
timer = manager.timer_add(1000, heartbeat, repeat=True)

Immediate Execution

# Run immediately, then repeat every 1 second
timer = manager.timer_add(1000, callback, repeat=True, run_now=True)

Cancelling a Timer

Call timer.cancel() to stop a repeating timer early. Cancellation is thread-safe -- it can be called from any thread, not just the poll thread. The actual removal from the event loop is deferred to the next poll() call.

timer = manager.timer_add(1000, heartbeat, repeat=True)

# Later, from any thread:
timer.cancel()
print(timer.active)  # False

Timer Lifecycle

  • One-shot timers are automatically freed after firing (MG_TIMER_AUTODELETE flag).
  • Repeating timers run until cancel() is called or manager.close() frees them.
  • The manager keeps an internal reference to all live timers, so discarding the return value of timer_add() is safe (the callback will not dangle).

Timer Properties

Property Type Description
active bool True if the timer has not been cancelled or completed

Methods

cymongoose.Manager.timer_add(milliseconds, callback, *, repeat=False, run_now=False)

Add a timer that calls a Python callback periodically.

Parameters:

Name Type Description Default
milliseconds int

Timer interval in milliseconds

required
callback Callable[[], None]

Python callable (takes no arguments)

required
repeat bool

If True, timer repeats; if False, runs once

False
run_now bool

If True, callback is called immediately

False

Returns:

Type Description
Timer

Timer object (caller must keep a reference to it)

Note: Timers are automatically freed when they complete (MG_TIMER_AUTODELETE flag). The Timer object's dealloc only releases the Python callback reference.

Warning: The returned Timer must be kept alive (e.g., stored in a variable) for as long as the timer is active. If the Timer object is garbage collected while Mongoose still holds the timer, the callback pointer becomes dangling.

Raises:

Type Description
RuntimeError

If manager has been freed or timer creation failed

Example

def heartbeat(): print("ping")

timer = manager.timer_add(1000, heartbeat, repeat=True)

Source code in src/cymongoose/_mongoose.pyi
def timer_add(
    self,
    milliseconds: int,
    callback: Callable[[], None],
    *,
    repeat: bool = False,
    run_now: bool = False,
) -> "Timer":
    """Add a timer that calls a Python callback periodically.

    Args:
        milliseconds: Timer interval in milliseconds
        callback: Python callable (takes no arguments)
        repeat: If True, timer repeats; if False, runs once
        run_now: If True, callback is called immediately

    Returns:
        Timer object (caller must keep a reference to it)

    Note: Timers are automatically freed when they complete (MG_TIMER_AUTODELETE flag).
    The Timer object's __dealloc__ only releases the Python callback reference.

    Warning: The returned Timer must be kept alive (e.g., stored in a variable)
    for as long as the timer is active. If the Timer object is garbage collected
    while Mongoose still holds the timer, the callback pointer becomes dangling.

    Raises:
        RuntimeError: If manager has been freed or timer creation failed

    Example:
        def heartbeat():
            print("ping")

        timer = manager.timer_add(1000, heartbeat, repeat=True)
    """
    ...

Multi-threading Support

The wakeup() method enables thread-safe communication with the event loop.

Setup

Enable wakeup support when creating the manager:

manager = Manager(handler, enable_wakeup=True)

Background Worker Pattern

import threading
import queue

# Work queue
work_queue = queue.Queue()
result_queue = queue.Queue()

def worker():
    """Background worker thread."""
    while True:
        work = work_queue.get()
        if work is None:
            break

        # Process work
        result = expensive_computation(work['data'])

        # Send result back via wakeup
        result_queue.put({
            'conn_id': work['conn_id'],
            'result': result,
        })
        manager.wakeup(work['conn_id'], b"result_ready")

# Start worker thread
worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        # Offload to worker
        work_queue.put({
            'conn_id': conn.id,
            'data': data.body_bytes,
        })

    elif ev == MG_EV_WAKEUP:
        # Result ready
        result = result_queue.get()
        conn.reply(200, result['result'])
        conn.drain()

See Threading for complete example.

Methods

cymongoose.Manager.wakeup(connection_id, data=b'')

Send a wakeup notification to a specific connection (thread-safe).

Parameters:

Name Type Description Default
connection_id int

The connection ID to wake up

required
data bytes

Optional data payload (delivered via MG_EV_WAKEUP event)

b''

Returns:

Type Description
bool

True if wakeup was sent successfully

Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios, use close() only after all polling threads have stopped to avoid race conditions.

Raises:

Type Description
RuntimeError

If manager has been freed

Source code in src/cymongoose/_mongoose.pyi
def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
    """Send a wakeup notification to a specific connection (thread-safe).

    Args:
        connection_id: The connection ID to wake up
        data: Optional data payload (delivered via MG_EV_WAKEUP event)

    Returns:
        True if wakeup was sent successfully

    Thread safety note: _freed flag is checked without lock. In multi-threaded scenarios,
    use close() only after all polling threads have stopped to avoid race conditions.

    Raises:
        RuntimeError: If manager has been freed
    """
    ...

Cleanup

Always clean up resources when done.

try:
    while not shutdown_requested:
        manager.poll(100)
finally:
    manager.close()  # Free all resources

Methods

cymongoose.Manager.close()

Free the underlying manager and release resources.

Source code in src/cymongoose/_mongoose.pyi
def close(self) -> None:
    """Free the underlying manager and release resources."""
    ...

See Also

  • Connection - Connection management
  • Guide - Protocol-specific guides
  • Threading - Multi-threading patterns