Skip to content

AsyncManager Class

cymongoose.AsyncManager

Asyncio-compatible wrapper around Manager.

Runs the mongoose event loop in a daemon thread. poll() releases the GIL, so the asyncio event loop runs concurrently without blocking. The thread-safe wakeup() method enables asyncio -> mongoose communication.

Usage::

async with AsyncManager(handler) as am:
    am.listen("http://0.0.0.0:8080")
    # ... do async work ...
Source code in src/cymongoose/aio.py
class AsyncManager:
    """Asyncio-compatible wrapper around Manager.

    Runs the mongoose event loop in a daemon thread.  ``poll()`` releases
    the GIL, so the asyncio event loop runs concurrently without blocking.
    The thread-safe ``wakeup()`` method enables asyncio -> mongoose
    communication.

    Usage::

        async with AsyncManager(handler) as am:
            am.listen("http://0.0.0.0:8080")
            # ... do async work ...
    """

    def __init__(
        self,
        handler: Optional[Callable[..., Any]] = None,
        poll_interval: int = 100,
        error_handler: Optional[Callable[[Exception], Any]] = None,
        shutdown_timeout: float = 30,
    ) -> None:
        """Create an AsyncManager.

        Args:
            handler: Default event handler for all connections.
            poll_interval: Milliseconds between poll() calls (default 100).
            error_handler: Called when a handler raises an exception.
            shutdown_timeout: Hard limit in seconds for ``__aexit__`` to wait
                for the poll thread to stop (default 30).  Shutdown proceeds
                as follows:

                1. ``__aexit__`` signals the thread to stop and sends a wakeup.
                2. Waits 5 seconds for the thread to join.
                3. If still alive: emits a ``RuntimeWarning``, retries the
                   wakeup, and waits another 5 seconds.
                4. Repeats step 3 until ``shutdown_timeout`` is reached.
                5. At the hard limit: emits a final "abandoning thread"
                   warning and moves on without calling ``Manager.close()``.

                The warnings surface in logs so operators can identify
                blocked handlers.  Retrying the wakeup at each interval
                guards against a lost initial wakeup that raced with the
                handler entering ``poll()``.
        """
        self._handler = handler
        self._poll_interval = poll_interval
        self._error_handler = error_handler
        self._shutdown_timeout = shutdown_timeout
        self._manager: Optional[Manager] = None
        self._thread: Optional[threading.Thread] = None
        self._stop = threading.Event()
        self._lock = threading.RLock()
        self._loop: Optional[asyncio.AbstractEventLoop] = None
        self._wake_id: int = 0  # connection ID used to interrupt poll()

    # -- async context manager -----------------------------------------------

    async def __aenter__(self) -> "AsyncManager":
        self._loop = asyncio.get_running_loop()
        self._manager = Manager(
            self._handler,
            enable_wakeup=True,
            error_handler=self._error_handler,
        )
        self._stop.clear()
        # The wakeup pipe connection is always created by enable_wakeup=True,
        # so _wake_poll() can interrupt poll() even with no user connections.
        self._wake_id = self._manager.wakeup_id
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
        return self

    async def __aexit__(self, *exc: Any) -> None:
        self._stop.set()
        self._wake_poll()
        if self._thread is not None:
            loop = asyncio.get_running_loop()
            interval = 5.0  # seconds between retry attempts
            elapsed = 0.0
            while self._thread.is_alive():
                await loop.run_in_executor(
                    None,
                    self._thread.join,
                    min(interval, self._shutdown_timeout - elapsed),
                )
                elapsed += interval
                if self._thread.is_alive():
                    if elapsed >= self._shutdown_timeout:
                        warnings.warn(
                            f"AsyncManager poll thread did not stop within "
                            f"{self._shutdown_timeout}s; abandoning thread. "
                            f"A handler is likely blocked.",
                            RuntimeWarning,
                            stacklevel=2,
                        )
                        break
                    warnings.warn(
                        f"AsyncManager poll thread has not stopped after "
                        f"{elapsed:.0f}s; a handler may be blocking. "
                        f"Retrying shutdown...",
                        RuntimeWarning,
                        stacklevel=2,
                    )
                    self._wake_poll()
            if not self._thread.is_alive():
                self._thread = None
        if self._manager is not None and self._thread is None:
            self._manager.close()
            self._manager = None
        self._loop = None

    # -- poll loop (runs in background thread) -------------------------------

    _MAX_POLL_MS = 200  # cap so the lock is never held too long

    def _run(self) -> None:
        while not self._stop.is_set():
            with self._lock:
                if self._manager is not None:
                    self._manager.poll(min(self._poll_interval, self._MAX_POLL_MS))
            # Yield between polls so other threads can acquire the lock.
            # Without this, the poll thread can starve callers of
            # listen()/connect()/etc. on platforms with unfair mutexes.
            self._stop.wait(0.001)

    # -- internal helpers ----------------------------------------------------

    def _wake_poll(self) -> None:
        """Best-effort interrupt of poll() to reduce lock acquisition latency.

        Writes to the wakeup pipe so ``select()``/``epoll_wait()`` returns
        immediately, causing ``poll()`` to release the lock sooner.
        """
        if self._wake_id > 0 and self._manager is not None:
            try:
                self._manager.wakeup(self._wake_id, b"")
            except Exception:
                pass  # manager may be closing

    def _track_conn(self, conn: Connection) -> None:
        """Remember a connection ID for future _wake_poll() calls."""
        if self._wake_id == 0 and conn.id > 0:
            self._wake_id = conn.id

    # -- delegated methods ---------------------------------------------------

    def listen(
        self,
        url: str,
        handler: Optional[Callable[..., Any]] = None,
        *,
        http: Optional[bool] = None,
    ) -> Connection:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            conn = self._manager.listen(url, handler=handler, http=http)
        self._track_conn(conn)
        return conn

    def connect(
        self,
        url: str,
        handler: Optional[Callable[..., Any]] = None,
        *,
        http: Optional[bool] = None,
    ) -> Connection:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            conn = self._manager.connect(url, handler=handler, http=http)
        self._track_conn(conn)
        return conn

    def mqtt_connect(self, url: str, **kwargs: Any) -> Connection:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            conn = self._manager.mqtt_connect(url, **kwargs)
        self._track_conn(conn)
        return conn

    def mqtt_listen(
        self,
        url: str,
        handler: Optional[Callable[..., Any]] = None,
    ) -> Connection:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            conn = self._manager.mqtt_listen(url, handler=handler)
        self._track_conn(conn)
        return conn

    def sntp_connect(
        self,
        url: str,
        handler: Optional[Callable[..., Any]] = None,
    ) -> Connection:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            conn = self._manager.sntp_connect(url, handler=handler)
        self._track_conn(conn)
        return conn

    def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
        """Thread-safe: wakeup does not need the lock."""
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        return self._manager.wakeup(connection_id, data)

    def timer_add(
        self,
        ms: int,
        callback: Callable[..., Any],
        *,
        repeat: bool = False,
        run_now: bool = False,
    ) -> Timer:
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        self._wake_poll()
        with self._lock:
            return self._manager.timer_add(
                ms,
                callback,
                repeat=repeat,
                run_now=run_now,
            )

    # -- asyncio helper ------------------------------------------------------

    def schedule(self, coro_or_callback: Any) -> None:
        """Schedule a coroutine or callback on the asyncio event loop.

        This is thread-safe and intended to be called from the mongoose
        poll thread (i.e. from inside event handlers) to push work back
        onto the asyncio loop.
        """
        if self._loop is None:
            raise RuntimeError("AsyncManager is not started")
        if asyncio.iscoroutine(coro_or_callback):
            asyncio.run_coroutine_threadsafe(coro_or_callback, self._loop)
        else:
            self._loop.call_soon_threadsafe(coro_or_callback)

    # -- properties ----------------------------------------------------------

    @property
    def manager(self) -> Manager:
        """Access the underlying Manager."""
        if self._manager is None:
            raise RuntimeError("AsyncManager is not started")
        return self._manager

    @property
    def running(self) -> bool:
        return self._thread is not None and self._thread.is_alive() and not self._stop.is_set()

_handler = handler instance-attribute

_poll_interval = poll_interval instance-attribute

_error_handler = error_handler instance-attribute

_shutdown_timeout = shutdown_timeout instance-attribute

_manager = None instance-attribute

_thread = None instance-attribute

_stop = threading.Event() instance-attribute

_lock = threading.RLock() instance-attribute

_loop = None instance-attribute

_wake_id = 0 instance-attribute

_MAX_POLL_MS = 200 class-attribute instance-attribute

manager property

Access the underlying Manager.

running property

__init__(handler=None, poll_interval=100, error_handler=None, shutdown_timeout=30)

Create an AsyncManager.

Parameters:

Name Type Description Default
handler Optional[Callable[..., Any]]

Default event handler for all connections.

None
poll_interval int

Milliseconds between poll() calls (default 100).

100
error_handler Optional[Callable[[Exception], Any]]

Called when a handler raises an exception.

None
shutdown_timeout float

Hard limit in seconds for __aexit__ to wait for the poll thread to stop (default 30). Shutdown proceeds as follows:

  1. __aexit__ signals the thread to stop and sends a wakeup.
  2. Waits 5 seconds for the thread to join.
  3. If still alive: emits a RuntimeWarning, retries the wakeup, and waits another 5 seconds.
  4. Repeats step 3 until shutdown_timeout is reached.
  5. At the hard limit: emits a final "abandoning thread" warning and moves on without calling Manager.close().

The warnings surface in logs so operators can identify blocked handlers. Retrying the wakeup at each interval guards against a lost initial wakeup that raced with the handler entering poll().

30
Source code in src/cymongoose/aio.py
def __init__(
    self,
    handler: Optional[Callable[..., Any]] = None,
    poll_interval: int = 100,
    error_handler: Optional[Callable[[Exception], Any]] = None,
    shutdown_timeout: float = 30,
) -> None:
    """Create an AsyncManager.

    Args:
        handler: Default event handler for all connections.
        poll_interval: Milliseconds between poll() calls (default 100).
        error_handler: Called when a handler raises an exception.
        shutdown_timeout: Hard limit in seconds for ``__aexit__`` to wait
            for the poll thread to stop (default 30).  Shutdown proceeds
            as follows:

            1. ``__aexit__`` signals the thread to stop and sends a wakeup.
            2. Waits 5 seconds for the thread to join.
            3. If still alive: emits a ``RuntimeWarning``, retries the
               wakeup, and waits another 5 seconds.
            4. Repeats step 3 until ``shutdown_timeout`` is reached.
            5. At the hard limit: emits a final "abandoning thread"
               warning and moves on without calling ``Manager.close()``.

            The warnings surface in logs so operators can identify
            blocked handlers.  Retrying the wakeup at each interval
            guards against a lost initial wakeup that raced with the
            handler entering ``poll()``.
    """
    self._handler = handler
    self._poll_interval = poll_interval
    self._error_handler = error_handler
    self._shutdown_timeout = shutdown_timeout
    self._manager: Optional[Manager] = None
    self._thread: Optional[threading.Thread] = None
    self._stop = threading.Event()
    self._lock = threading.RLock()
    self._loop: Optional[asyncio.AbstractEventLoop] = None
    self._wake_id: int = 0  # connection ID used to interrupt poll()

__aenter__() async

Source code in src/cymongoose/aio.py
async def __aenter__(self) -> "AsyncManager":
    self._loop = asyncio.get_running_loop()
    self._manager = Manager(
        self._handler,
        enable_wakeup=True,
        error_handler=self._error_handler,
    )
    self._stop.clear()
    # The wakeup pipe connection is always created by enable_wakeup=True,
    # so _wake_poll() can interrupt poll() even with no user connections.
    self._wake_id = self._manager.wakeup_id
    self._thread = threading.Thread(target=self._run, daemon=True)
    self._thread.start()
    return self

__aexit__(*exc) async

Source code in src/cymongoose/aio.py
async def __aexit__(self, *exc: Any) -> None:
    self._stop.set()
    self._wake_poll()
    if self._thread is not None:
        loop = asyncio.get_running_loop()
        interval = 5.0  # seconds between retry attempts
        elapsed = 0.0
        while self._thread.is_alive():
            await loop.run_in_executor(
                None,
                self._thread.join,
                min(interval, self._shutdown_timeout - elapsed),
            )
            elapsed += interval
            if self._thread.is_alive():
                if elapsed >= self._shutdown_timeout:
                    warnings.warn(
                        f"AsyncManager poll thread did not stop within "
                        f"{self._shutdown_timeout}s; abandoning thread. "
                        f"A handler is likely blocked.",
                        RuntimeWarning,
                        stacklevel=2,
                    )
                    break
                warnings.warn(
                    f"AsyncManager poll thread has not stopped after "
                    f"{elapsed:.0f}s; a handler may be blocking. "
                    f"Retrying shutdown...",
                    RuntimeWarning,
                    stacklevel=2,
                )
                self._wake_poll()
        if not self._thread.is_alive():
            self._thread = None
    if self._manager is not None and self._thread is None:
        self._manager.close()
        self._manager = None
    self._loop = None

_run()

Source code in src/cymongoose/aio.py
def _run(self) -> None:
    while not self._stop.is_set():
        with self._lock:
            if self._manager is not None:
                self._manager.poll(min(self._poll_interval, self._MAX_POLL_MS))
        # Yield between polls so other threads can acquire the lock.
        # Without this, the poll thread can starve callers of
        # listen()/connect()/etc. on platforms with unfair mutexes.
        self._stop.wait(0.001)

_wake_poll()

Best-effort interrupt of poll() to reduce lock acquisition latency.

Writes to the wakeup pipe so select()/epoll_wait() returns immediately, causing poll() to release the lock sooner.

Source code in src/cymongoose/aio.py
def _wake_poll(self) -> None:
    """Best-effort interrupt of poll() to reduce lock acquisition latency.

    Writes to the wakeup pipe so ``select()``/``epoll_wait()`` returns
    immediately, causing ``poll()`` to release the lock sooner.
    """
    if self._wake_id > 0 and self._manager is not None:
        try:
            self._manager.wakeup(self._wake_id, b"")
        except Exception:
            pass  # manager may be closing

_track_conn(conn)

Remember a connection ID for future _wake_poll() calls.

Source code in src/cymongoose/aio.py
def _track_conn(self, conn: Connection) -> None:
    """Remember a connection ID for future _wake_poll() calls."""
    if self._wake_id == 0 and conn.id > 0:
        self._wake_id = conn.id

listen(url, handler=None, *, http=None)

Source code in src/cymongoose/aio.py
def listen(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
    *,
    http: Optional[bool] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.listen(url, handler=handler, http=http)
    self._track_conn(conn)
    return conn

connect(url, handler=None, *, http=None)

Source code in src/cymongoose/aio.py
def connect(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
    *,
    http: Optional[bool] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.connect(url, handler=handler, http=http)
    self._track_conn(conn)
    return conn

mqtt_connect(url, **kwargs)

Source code in src/cymongoose/aio.py
def mqtt_connect(self, url: str, **kwargs: Any) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.mqtt_connect(url, **kwargs)
    self._track_conn(conn)
    return conn

mqtt_listen(url, handler=None)

Source code in src/cymongoose/aio.py
def mqtt_listen(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.mqtt_listen(url, handler=handler)
    self._track_conn(conn)
    return conn

sntp_connect(url, handler=None)

Source code in src/cymongoose/aio.py
def sntp_connect(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.sntp_connect(url, handler=handler)
    self._track_conn(conn)
    return conn

wakeup(connection_id, data=b'')

Thread-safe: wakeup does not need the lock.

Source code in src/cymongoose/aio.py
def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
    """Thread-safe: wakeup does not need the lock."""
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    return self._manager.wakeup(connection_id, data)

timer_add(ms, callback, *, repeat=False, run_now=False)

Source code in src/cymongoose/aio.py
def timer_add(
    self,
    ms: int,
    callback: Callable[..., Any],
    *,
    repeat: bool = False,
    run_now: bool = False,
) -> Timer:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        return self._manager.timer_add(
            ms,
            callback,
            repeat=repeat,
            run_now=run_now,
        )

schedule(coro_or_callback)

Schedule a coroutine or callback on the asyncio event loop.

This is thread-safe and intended to be called from the mongoose poll thread (i.e. from inside event handlers) to push work back onto the asyncio loop.

Source code in src/cymongoose/aio.py
def schedule(self, coro_or_callback: Any) -> None:
    """Schedule a coroutine or callback on the asyncio event loop.

    This is thread-safe and intended to be called from the mongoose
    poll thread (i.e. from inside event handlers) to push work back
    onto the asyncio loop.
    """
    if self._loop is None:
        raise RuntimeError("AsyncManager is not started")
    if asyncio.iscoroutine(coro_or_callback):
        asyncio.run_coroutine_threadsafe(coro_or_callback, self._loop)
    else:
        self._loop.call_soon_threadsafe(coro_or_callback)

Overview

AsyncManager wraps Manager for use with Python's asyncio. It runs the mongoose event loop in a daemon thread while the asyncio event loop runs concurrently. Since poll() releases the GIL, both loops make progress without blocking each other.

Basic Usage

import asyncio
from cymongoose import AsyncManager, MG_EV_HTTP_MSG

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        conn.reply(200, b"Hello from async!")

async def main():
    async with AsyncManager(handler) as am:
        am.listen("http://0.0.0.0:8080")
        # Server is running -- do async work here
        await asyncio.sleep(60)

asyncio.run(main())

Constructor

cymongoose.AsyncManager.__init__(handler=None, poll_interval=100, error_handler=None, shutdown_timeout=30)

Create an AsyncManager.

Parameters:

Name Type Description Default
handler Optional[Callable[..., Any]]

Default event handler for all connections.

None
poll_interval int

Milliseconds between poll() calls (default 100).

100
error_handler Optional[Callable[[Exception], Any]]

Called when a handler raises an exception.

None
shutdown_timeout float

Hard limit in seconds for __aexit__ to wait for the poll thread to stop (default 30). Shutdown proceeds as follows:

  1. __aexit__ signals the thread to stop and sends a wakeup.
  2. Waits 5 seconds for the thread to join.
  3. If still alive: emits a RuntimeWarning, retries the wakeup, and waits another 5 seconds.
  4. Repeats step 3 until shutdown_timeout is reached.
  5. At the hard limit: emits a final "abandoning thread" warning and moves on without calling Manager.close().

The warnings surface in logs so operators can identify blocked handlers. Retrying the wakeup at each interval guards against a lost initial wakeup that raced with the handler entering poll().

30
Source code in src/cymongoose/aio.py
def __init__(
    self,
    handler: Optional[Callable[..., Any]] = None,
    poll_interval: int = 100,
    error_handler: Optional[Callable[[Exception], Any]] = None,
    shutdown_timeout: float = 30,
) -> None:
    """Create an AsyncManager.

    Args:
        handler: Default event handler for all connections.
        poll_interval: Milliseconds between poll() calls (default 100).
        error_handler: Called when a handler raises an exception.
        shutdown_timeout: Hard limit in seconds for ``__aexit__`` to wait
            for the poll thread to stop (default 30).  Shutdown proceeds
            as follows:

            1. ``__aexit__`` signals the thread to stop and sends a wakeup.
            2. Waits 5 seconds for the thread to join.
            3. If still alive: emits a ``RuntimeWarning``, retries the
               wakeup, and waits another 5 seconds.
            4. Repeats step 3 until ``shutdown_timeout`` is reached.
            5. At the hard limit: emits a final "abandoning thread"
               warning and moves on without calling ``Manager.close()``.

            The warnings surface in logs so operators can identify
            blocked handlers.  Retrying the wakeup at each interval
            guards against a lost initial wakeup that raced with the
            handler entering ``poll()``.
    """
    self._handler = handler
    self._poll_interval = poll_interval
    self._error_handler = error_handler
    self._shutdown_timeout = shutdown_timeout
    self._manager: Optional[Manager] = None
    self._thread: Optional[threading.Thread] = None
    self._stop = threading.Event()
    self._lock = threading.RLock()
    self._loop: Optional[asyncio.AbstractEventLoop] = None
    self._wake_id: int = 0  # connection ID used to interrupt poll()

Parameters

Parameter Type Default Description
handler Callable or None None Default event handler (conn, ev, data) -> None
poll_interval int 100 Milliseconds between poll() calls
error_handler Callable or None None Called with (exc: Exception) when a handler raises
shutdown_timeout float 30 Seconds to wait for the poll thread on exit

Listening for Connections

async with AsyncManager(handler) as am:
    # HTTP server
    am.listen("http://0.0.0.0:8080")

    # With per-listener handler
    am.listen("http://0.0.0.0:9090", handler=api_handler)

    # MQTT broker
    am.mqtt_listen("mqtt://0.0.0.0:1883")

Methods

cymongoose.AsyncManager.listen(url, handler=None, *, http=None)

Source code in src/cymongoose/aio.py
def listen(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
    *,
    http: Optional[bool] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.listen(url, handler=handler, http=http)
    self._track_conn(conn)
    return conn

cymongoose.AsyncManager.mqtt_listen(url, handler=None)

Source code in src/cymongoose/aio.py
def mqtt_listen(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.mqtt_listen(url, handler=handler)
    self._track_conn(conn)
    return conn

Making Connections

async with AsyncManager(handler) as am:
    # HTTP client
    am.connect("http://example.com:80", handler=client_handler)

    # MQTT client
    am.mqtt_connect("mqtt://broker.com:1883", clean_session=True)

    # SNTP client
    am.sntp_connect("udp://time.google.com:123", handler=time_handler)

Methods

cymongoose.AsyncManager.connect(url, handler=None, *, http=None)

Source code in src/cymongoose/aio.py
def connect(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
    *,
    http: Optional[bool] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.connect(url, handler=handler, http=http)
    self._track_conn(conn)
    return conn

cymongoose.AsyncManager.mqtt_connect(url, **kwargs)

Source code in src/cymongoose/aio.py
def mqtt_connect(self, url: str, **kwargs: Any) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.mqtt_connect(url, **kwargs)
    self._track_conn(conn)
    return conn

cymongoose.AsyncManager.sntp_connect(url, handler=None)

Source code in src/cymongoose/aio.py
def sntp_connect(
    self,
    url: str,
    handler: Optional[Callable[..., Any]] = None,
) -> Connection:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        conn = self._manager.sntp_connect(url, handler=handler)
    self._track_conn(conn)
    return conn

Timers

async with AsyncManager() as am:
    # One-shot timer
    am.timer_add(5000, lambda: print("fired"))

    # Repeating timer
    timer = am.timer_add(1000, heartbeat, repeat=True)
    # ...
    timer.cancel()

cymongoose.AsyncManager.timer_add(ms, callback, *, repeat=False, run_now=False)

Source code in src/cymongoose/aio.py
def timer_add(
    self,
    ms: int,
    callback: Callable[..., Any],
    *,
    repeat: bool = False,
    run_now: bool = False,
) -> Timer:
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    self._wake_poll()
    with self._lock:
        return self._manager.timer_add(
            ms,
            callback,
            repeat=repeat,
            run_now=run_now,
        )

Thread-Safe Communication

Wakeup

wakeup() is thread-safe and does not require the internal lock:

async with AsyncManager(handler, enable_wakeup=True) as am:
    listener = am.listen("http://0.0.0.0:8080")
    # From any thread or coroutine:
    am.wakeup(conn_id, b"data")

cymongoose.AsyncManager.wakeup(connection_id, data=b'')

Thread-safe: wakeup does not need the lock.

Source code in src/cymongoose/aio.py
def wakeup(self, connection_id: int, data: bytes = b"") -> bool:
    """Thread-safe: wakeup does not need the lock."""
    if self._manager is None:
        raise RuntimeError("AsyncManager is not started")
    return self._manager.wakeup(connection_id, data)

Scheduling Asyncio Work from Handlers

Use schedule() to push work from the mongoose poll thread back onto the asyncio event loop:

async def process_request(data):
    result = await some_async_operation(data)
    print(f"Processed: {result}")

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        conn.reply(200, b"Accepted")
        # Schedule async work from the handler (runs on poll thread)
        am.schedule(process_request(data.body_text))

async with AsyncManager(handler) as am:
    am.listen("http://0.0.0.0:8080")
    await asyncio.sleep(3600)

cymongoose.AsyncManager.schedule(coro_or_callback)

Schedule a coroutine or callback on the asyncio event loop.

This is thread-safe and intended to be called from the mongoose poll thread (i.e. from inside event handlers) to push work back onto the asyncio loop.

Source code in src/cymongoose/aio.py
def schedule(self, coro_or_callback: Any) -> None:
    """Schedule a coroutine or callback on the asyncio event loop.

    This is thread-safe and intended to be called from the mongoose
    poll thread (i.e. from inside event handlers) to push work back
    onto the asyncio loop.
    """
    if self._loop is None:
        raise RuntimeError("AsyncManager is not started")
    if asyncio.iscoroutine(coro_or_callback):
        asyncio.run_coroutine_threadsafe(coro_or_callback, self._loop)
    else:
        self._loop.call_soon_threadsafe(coro_or_callback)

Properties

cymongoose.AsyncManager.manager property

Access the underlying Manager.

cymongoose.AsyncManager.running property

Shutdown Behavior

When the async with block exits, __aexit__ shuts down the poll thread:

  1. Signals the thread to stop and sends a wakeup.
  2. Waits 5 seconds for the thread to join.
  3. If still alive: emits a RuntimeWarning, retries the wakeup, waits another 5 seconds.
  4. Repeats step 3 until shutdown_timeout is reached.
  5. At the hard limit: emits a final warning and moves on without calling Manager.close().
# Tune the timeout for your application
async with AsyncManager(handler, shutdown_timeout=10) as am:
    am.listen("http://0.0.0.0:8080")
    # ...
# __aexit__ handles shutdown automatically

If the poll thread exits cleanly, Manager.close() is called and all resources are freed. If the thread is abandoned (handler blocked beyond the timeout), the daemon thread dies at process exit.

See Graceful Shutdown for more patterns.

Differences from Manager

Feature Manager AsyncManager
Event loop Manual poll() calls Automatic in daemon thread
Concurrency Single-threaded or manual threading Runs alongside asyncio
Shutdown Explicit close() Automatic via __aexit__
Thread safety poll() not reentrant Serialised by internal RLock
Wakeup Opt-in via enable_wakeup=True Always enabled

See Also