Skip to content

Connection Class

cymongoose.Connection

Wrapper around mg_connection pointers.

Source code in src/cymongoose/_mongoose.pyi
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
class Connection:
    """Wrapper around mg_connection pointers."""

    @property
    def handler(self) -> Optional[EventHandler]:
        """Per-connection event handler."""
        ...

    def set_handler(self, handler: Optional[EventHandler]) -> None:
        """Assign a per-connection event handler.

        Args:
            handler: Callable that takes (Connection, event, data)
        """
        ...

    @property
    def userdata(self) -> Any:
        """User-defined data attached to this connection."""
        ...

    @userdata.setter
    def userdata(self, value: Any) -> None: ...
    @property
    def id(self) -> int:
        """Return connection ID."""
        ...

    @property
    def is_listening(self) -> bool:
        """True if this is a listening connection."""
        ...

    @property
    def is_closing(self) -> bool:
        """True if connection is closing."""
        ...

    @property
    def local_addr(self) -> Optional[Tuple[str, int, bool]]:
        """Return local address as (ip, port, is_ipv6) tuple."""
        ...

    @property
    def remote_addr(self) -> Optional[Tuple[str, int, bool]]:
        """Return remote address as (ip, port, is_ipv6) tuple."""
        ...

    @property
    def is_client(self) -> bool:
        """Return True if this is a client connection."""
        ...

    @property
    def is_tls(self) -> bool:
        """Return True if this connection uses TLS."""
        ...

    @property
    def is_udp(self) -> bool:
        """Return True if this is a UDP connection."""
        ...

    @property
    def is_websocket(self) -> bool:
        """Return True if this is a WebSocket connection."""
        ...

    @property
    def is_readable(self) -> bool:
        """Return True if connection has data to read."""
        ...

    @property
    def is_writable(self) -> bool:
        """Return True if connection can be written to."""
        ...

    @property
    def is_full(self) -> bool:
        """Return True if receive buffer is full (backpressure - stop reads)."""
        ...

    @property
    def is_draining(self) -> bool:
        """Return True if connection is draining (sending remaining data before close)."""
        ...

    @property
    def recv_len(self) -> int:
        """Return number of bytes in receive buffer."""
        ...

    @property
    def send_len(self) -> int:
        """Return number of bytes in send buffer."""
        ...

    @property
    def recv_size(self) -> int:
        """Return total allocated size of receive buffer."""
        ...

    @property
    def send_size(self) -> int:
        """Return total allocated size of send buffer."""
        ...

    def send(self, data: Union[str, bytes]) -> None:
        """Send raw bytes to the peer.

        Args:
            data: Data to send (str will be UTF-8 encoded)

        Raises:
            RuntimeError: If send fails or connection is closed
        """
        ...

    def reply(
        self,
        status_code: int,
        body: Union[str, bytes] = b"",
        headers: Optional[Dict[str, str]] = None,
    ) -> None:
        """Send a HTTP reply (final response).

        ``Content-Type: text/plain`` is added automatically when *headers*
        is ``None`` or when the provided dict does not contain a
        ``Content-Type`` key (case-insensitive).

        Args:
            status_code: HTTP status code (e.g., 200, 404)
            body: Response body (str will be UTF-8 encoded)
            headers: Optional dict of headers
        """
        ...

    def reply_json(
        self,
        data: Any,
        status_code: int = 200,
        headers: Optional[Dict[str, str]] = None,
    ) -> None:
        """Send a JSON HTTP reply.

        Serialises *data* with ``json.dumps`` and sets
        ``Content-Type: application/json``.

        Args:
            data: JSON-serialisable object
            status_code: HTTP status code (default 200)
            headers: Optional extra headers (Content-Type is always set)
        """
        ...

    def serve_dir(
        self,
        message: HttpMessage,
        root_dir: str,
        extra_headers: str = "",
        mime_types: str = "",
        page404: str = "",
    ) -> None:
        """Serve files from a directory using Mongoose's built-in static handler.

        Args:
            message: HttpMessage from MG_EV_HTTP_MSG event
            root_dir: Root directory path
            extra_headers: Additional headers to include
            mime_types: Custom MIME type mappings
            page404: Custom 404 page path

        Raises:
            ValueError: If HttpMessage is not valid for this event
        """
        ...

    def serve_file(
        self, message: HttpMessage, path: str, extra_headers: str = "", mime_types: str = ""
    ) -> None:
        """Serve a single file using Mongoose's built-in static handler.

        Args:
            message: HttpMessage from MG_EV_HTTP_MSG event
            path: File path to serve
            extra_headers: Additional headers to include
            mime_types: Custom MIME type mappings

        Raises:
            ValueError: If HttpMessage is not valid for this event
        """
        ...

    def ws_upgrade(
        self, message: HttpMessage, extra_headers: Optional[Dict[str, str]] = None
    ) -> None:
        """Upgrade HTTP connection to WebSocket.

        Args:
            message: The HttpMessage from MG_EV_HTTP_MSG event
            extra_headers: Optional dict of extra headers to send in upgrade response

        Raises:
            ValueError: If HttpMessage is not valid for this event
        """
        ...

    def ws_send(self, data: Union[str, bytes], op: int = WEBSOCKET_OP_TEXT) -> None:
        """Send a WebSocket frame.

        Args:
            data: Frame data (str will be UTF-8 encoded)
            op: WebSocket opcode (default: WEBSOCKET_OP_TEXT)
        """
        ...

    def mqtt_pub(
        self, topic: str, message: Union[str, bytes], qos: int = 0, retain: bool = False
    ) -> int:
        """Publish an MQTT message.

        Args:
            topic: MQTT topic
            message: Message payload (str or bytes)
            qos: Quality of service (0, 1, or 2)
            retain: Retain flag

        Returns:
            Message ID
        """
        ...

    def mqtt_sub(self, topic: str, qos: int = 0) -> None:
        """Subscribe to an MQTT topic.

        Args:
            topic: MQTT topic (can include wildcards)
            qos: Quality of service (0, 1, or 2)
        """
        ...

    def mqtt_ping(self) -> None:
        """Send MQTT ping."""
        ...

    def mqtt_pong(self) -> None:
        """Send MQTT pong."""
        ...

    def mqtt_disconnect(self) -> None:
        """Send MQTT disconnect message.

        Gracefully disconnects from MQTT broker by sending a DISCONNECT packet.
        """
        ...

    def error(self, message: str) -> None:
        """Trigger an error event on this connection.

        Args:
            message: Error message
        """
        ...

    def recv_data(self, length: int = -1) -> bytes:
        """Read data from receive buffer without consuming it.

        Args:
            length: Number of bytes to read, or -1 for all

        Returns:
            Data from receive buffer
        """
        ...

    def send_data(self, length: int = -1) -> bytes:
        """Read data from send buffer without consuming it.

        Args:
            length: Number of bytes to read, or -1 for all

        Returns:
            Data from send buffer
        """
        ...

    def resolve(self, url: str) -> None:
        """Resolve a hostname asynchronously.

        Triggers MG_EV_RESOLVE event when DNS lookup completes.

        Args:
            url: URL to resolve (e.g., "google.com" or "tcp://example.com:80")
        """
        ...

    def resolve_cancel(self) -> None:
        """Cancel an ongoing DNS resolution."""
        ...

    def http_basic_auth(self, username: str, password: str) -> None:
        """Send HTTP Basic Authentication credentials.

        Typically used on client connections to authenticate with a server.

        Args:
            username: Username for basic auth
            password: Password for basic auth
        """
        ...

    def sntp_request(self) -> None:
        """Send an SNTP time request.

        Use on a connection created with Manager.sntp_connect().
        Triggers MG_EV_SNTP_TIME event when response is received.
        """
        ...

    def http_chunk(self, data: Union[str, bytes]) -> None:
        """Send an HTTP chunked transfer encoding chunk.

        Used for streaming HTTP responses. Must call with empty data to end.

        Args:
            data: Chunk data (str or bytes). Empty to signal end of chunks.

        Example:
            def handler(conn, ev, data):
                if ev == MG_EV_HTTP_MSG:
                    # Start chunked response
                    conn.reply(200, "", headers={"Transfer-Encoding": "chunked"})
                    conn.http_chunk("First chunk\\n")
                    conn.http_chunk("Second chunk\\n")
                    conn.http_chunk("")  # End chunks
        """
        ...

    def http_sse(self, event_type: str, data: str) -> None:
        """Send Server-Sent Events (SSE) formatted message.

        SSE is used for real-time server push over HTTP. Must start with appropriate headers.

        Args:
            event_type: Event type name (e.g., "message", "update")
            data: Event data payload

        Example:
            def handler(conn, ev, data):
                if ev == MG_EV_HTTP_MSG:
                    # Start SSE stream
                    conn.reply(200, "", headers={
                        "Content-Type": "text/event-stream",
                        "Cache-Control": "no-cache"
                    })
                    conn.http_sse("message", "Hello from server")
                    conn.http_sse("update", "Status: OK")
        """
        ...

    def close(self) -> None:
        """Immediately close the connection.

        For graceful shutdown, use drain() instead to let buffered data flush first.
        """
        ...

    def drain(self) -> None:
        """Mark connection for graceful closure.

        Sets is_draining=1, which tells Mongoose to:
        1. Stop reading from the socket
        2. Flush any buffered outgoing data
        3. Close the connection after send buffer is empty

        This is the recommended way to close connections from the server side,
        as it ensures response data is fully sent before closing.

        Example:
            def handler(conn, ev, data):
                if ev == MG_EV_HTTP_MSG:
                    conn.reply(200, b"Goodbye!")
                    conn.drain()  # Close after response is sent
        """
        ...

    def tls_init(self, opts: TlsOpts) -> None:
        """Initialize TLS/SSL on this connection.

        Args:
            opts: TlsOpts object with certificate, key, CA, etc.

        Example:
            # HTTPS server with custom certificate
            opts = TlsOpts(
                cert=open('server.crt', 'rb').read(),
                key=open('server.key', 'rb').read()
            )
            listener = manager.listen("https://0.0.0.0:443")
            listener.tls_init(opts)

            # HTTPS client with custom CA
            opts = TlsOpts(ca=open('custom-ca.crt', 'rb').read())
            conn = manager.connect("https://example.com")
            conn.tls_init(opts)
        """
        ...

    def tls_free(self) -> None:
        """Free TLS/SSL resources on this connection.

        Typically not needed as TLS is automatically freed when connection closes.
        """
        ...

    def __repr__(self) -> str: ...

handler property

Per-connection event handler.

userdata property writable

User-defined data attached to this connection.

id property

Return connection ID.

is_listening property

True if this is a listening connection.

is_closing property

True if connection is closing.

local_addr property

Return local address as (ip, port, is_ipv6) tuple.

remote_addr property

Return remote address as (ip, port, is_ipv6) tuple.

is_client property

Return True if this is a client connection.

is_tls property

Return True if this connection uses TLS.

is_udp property

Return True if this is a UDP connection.

is_websocket property

Return True if this is a WebSocket connection.

is_readable property

Return True if connection has data to read.

is_writable property

Return True if connection can be written to.

is_full property

Return True if receive buffer is full (backpressure - stop reads).

is_draining property

Return True if connection is draining (sending remaining data before close).

recv_len property

Return number of bytes in receive buffer.

send_len property

Return number of bytes in send buffer.

recv_size property

Return total allocated size of receive buffer.

send_size property

Return total allocated size of send buffer.

set_handler(handler)

Assign a per-connection event handler.

Parameters:

Name Type Description Default
handler Optional[EventHandler]

Callable that takes (Connection, event, data)

required
Source code in src/cymongoose/_mongoose.pyi
def set_handler(self, handler: Optional[EventHandler]) -> None:
    """Assign a per-connection event handler.

    Args:
        handler: Callable that takes (Connection, event, data)
    """
    ...

send(data)

Send raw bytes to the peer.

Parameters:

Name Type Description Default
data Union[str, bytes]

Data to send (str will be UTF-8 encoded)

required

Raises:

Type Description
RuntimeError

If send fails or connection is closed

Source code in src/cymongoose/_mongoose.pyi
def send(self, data: Union[str, bytes]) -> None:
    """Send raw bytes to the peer.

    Args:
        data: Data to send (str will be UTF-8 encoded)

    Raises:
        RuntimeError: If send fails or connection is closed
    """
    ...

reply(status_code, body=b'', headers=None)

Send a HTTP reply (final response).

Content-Type: text/plain is added automatically when headers is None or when the provided dict does not contain a Content-Type key (case-insensitive).

Parameters:

Name Type Description Default
status_code int

HTTP status code (e.g., 200, 404)

required
body Union[str, bytes]

Response body (str will be UTF-8 encoded)

b''
headers Optional[Dict[str, str]]

Optional dict of headers

None
Source code in src/cymongoose/_mongoose.pyi
def reply(
    self,
    status_code: int,
    body: Union[str, bytes] = b"",
    headers: Optional[Dict[str, str]] = None,
) -> None:
    """Send a HTTP reply (final response).

    ``Content-Type: text/plain`` is added automatically when *headers*
    is ``None`` or when the provided dict does not contain a
    ``Content-Type`` key (case-insensitive).

    Args:
        status_code: HTTP status code (e.g., 200, 404)
        body: Response body (str will be UTF-8 encoded)
        headers: Optional dict of headers
    """
    ...

reply_json(data, status_code=200, headers=None)

Send a JSON HTTP reply.

Serialises data with json.dumps and sets Content-Type: application/json.

Parameters:

Name Type Description Default
data Any

JSON-serialisable object

required
status_code int

HTTP status code (default 200)

200
headers Optional[Dict[str, str]]

Optional extra headers (Content-Type is always set)

None
Source code in src/cymongoose/_mongoose.pyi
def reply_json(
    self,
    data: Any,
    status_code: int = 200,
    headers: Optional[Dict[str, str]] = None,
) -> None:
    """Send a JSON HTTP reply.

    Serialises *data* with ``json.dumps`` and sets
    ``Content-Type: application/json``.

    Args:
        data: JSON-serialisable object
        status_code: HTTP status code (default 200)
        headers: Optional extra headers (Content-Type is always set)
    """
    ...

serve_dir(message, root_dir, extra_headers='', mime_types='', page404='')

Serve files from a directory using Mongoose's built-in static handler.

Parameters:

Name Type Description Default
message HttpMessage

HttpMessage from MG_EV_HTTP_MSG event

required
root_dir str

Root directory path

required
extra_headers str

Additional headers to include

''
mime_types str

Custom MIME type mappings

''
page404 str

Custom 404 page path

''

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def serve_dir(
    self,
    message: HttpMessage,
    root_dir: str,
    extra_headers: str = "",
    mime_types: str = "",
    page404: str = "",
) -> None:
    """Serve files from a directory using Mongoose's built-in static handler.

    Args:
        message: HttpMessage from MG_EV_HTTP_MSG event
        root_dir: Root directory path
        extra_headers: Additional headers to include
        mime_types: Custom MIME type mappings
        page404: Custom 404 page path

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

serve_file(message, path, extra_headers='', mime_types='')

Serve a single file using Mongoose's built-in static handler.

Parameters:

Name Type Description Default
message HttpMessage

HttpMessage from MG_EV_HTTP_MSG event

required
path str

File path to serve

required
extra_headers str

Additional headers to include

''
mime_types str

Custom MIME type mappings

''

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def serve_file(
    self, message: HttpMessage, path: str, extra_headers: str = "", mime_types: str = ""
) -> None:
    """Serve a single file using Mongoose's built-in static handler.

    Args:
        message: HttpMessage from MG_EV_HTTP_MSG event
        path: File path to serve
        extra_headers: Additional headers to include
        mime_types: Custom MIME type mappings

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

ws_upgrade(message, extra_headers=None)

Upgrade HTTP connection to WebSocket.

Parameters:

Name Type Description Default
message HttpMessage

The HttpMessage from MG_EV_HTTP_MSG event

required
extra_headers Optional[Dict[str, str]]

Optional dict of extra headers to send in upgrade response

None

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def ws_upgrade(
    self, message: HttpMessage, extra_headers: Optional[Dict[str, str]] = None
) -> None:
    """Upgrade HTTP connection to WebSocket.

    Args:
        message: The HttpMessage from MG_EV_HTTP_MSG event
        extra_headers: Optional dict of extra headers to send in upgrade response

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

ws_send(data, op=WEBSOCKET_OP_TEXT)

Send a WebSocket frame.

Parameters:

Name Type Description Default
data Union[str, bytes]

Frame data (str will be UTF-8 encoded)

required
op int

WebSocket opcode (default: WEBSOCKET_OP_TEXT)

WEBSOCKET_OP_TEXT
Source code in src/cymongoose/_mongoose.pyi
def ws_send(self, data: Union[str, bytes], op: int = WEBSOCKET_OP_TEXT) -> None:
    """Send a WebSocket frame.

    Args:
        data: Frame data (str will be UTF-8 encoded)
        op: WebSocket opcode (default: WEBSOCKET_OP_TEXT)
    """
    ...

mqtt_pub(topic, message, qos=0, retain=False)

Publish an MQTT message.

Parameters:

Name Type Description Default
topic str

MQTT topic

required
message Union[str, bytes]

Message payload (str or bytes)

required
qos int

Quality of service (0, 1, or 2)

0
retain bool

Retain flag

False

Returns:

Type Description
int

Message ID

Source code in src/cymongoose/_mongoose.pyi
def mqtt_pub(
    self, topic: str, message: Union[str, bytes], qos: int = 0, retain: bool = False
) -> int:
    """Publish an MQTT message.

    Args:
        topic: MQTT topic
        message: Message payload (str or bytes)
        qos: Quality of service (0, 1, or 2)
        retain: Retain flag

    Returns:
        Message ID
    """
    ...

mqtt_sub(topic, qos=0)

Subscribe to an MQTT topic.

Parameters:

Name Type Description Default
topic str

MQTT topic (can include wildcards)

required
qos int

Quality of service (0, 1, or 2)

0
Source code in src/cymongoose/_mongoose.pyi
def mqtt_sub(self, topic: str, qos: int = 0) -> None:
    """Subscribe to an MQTT topic.

    Args:
        topic: MQTT topic (can include wildcards)
        qos: Quality of service (0, 1, or 2)
    """
    ...

mqtt_ping()

Send MQTT ping.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_ping(self) -> None:
    """Send MQTT ping."""
    ...

mqtt_pong()

Send MQTT pong.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_pong(self) -> None:
    """Send MQTT pong."""
    ...

mqtt_disconnect()

Send MQTT disconnect message.

Gracefully disconnects from MQTT broker by sending a DISCONNECT packet.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_disconnect(self) -> None:
    """Send MQTT disconnect message.

    Gracefully disconnects from MQTT broker by sending a DISCONNECT packet.
    """
    ...

error(message)

Trigger an error event on this connection.

Parameters:

Name Type Description Default
message str

Error message

required
Source code in src/cymongoose/_mongoose.pyi
def error(self, message: str) -> None:
    """Trigger an error event on this connection.

    Args:
        message: Error message
    """
    ...

recv_data(length=-1)

Read data from receive buffer without consuming it.

Parameters:

Name Type Description Default
length int

Number of bytes to read, or -1 for all

-1

Returns:

Type Description
bytes

Data from receive buffer

Source code in src/cymongoose/_mongoose.pyi
def recv_data(self, length: int = -1) -> bytes:
    """Read data from receive buffer without consuming it.

    Args:
        length: Number of bytes to read, or -1 for all

    Returns:
        Data from receive buffer
    """
    ...

send_data(length=-1)

Read data from send buffer without consuming it.

Parameters:

Name Type Description Default
length int

Number of bytes to read, or -1 for all

-1

Returns:

Type Description
bytes

Data from send buffer

Source code in src/cymongoose/_mongoose.pyi
def send_data(self, length: int = -1) -> bytes:
    """Read data from send buffer without consuming it.

    Args:
        length: Number of bytes to read, or -1 for all

    Returns:
        Data from send buffer
    """
    ...

resolve(url)

Resolve a hostname asynchronously.

Triggers MG_EV_RESOLVE event when DNS lookup completes.

Parameters:

Name Type Description Default
url str

URL to resolve (e.g., "google.com" or "tcp://example.com:80")

required
Source code in src/cymongoose/_mongoose.pyi
def resolve(self, url: str) -> None:
    """Resolve a hostname asynchronously.

    Triggers MG_EV_RESOLVE event when DNS lookup completes.

    Args:
        url: URL to resolve (e.g., "google.com" or "tcp://example.com:80")
    """
    ...

resolve_cancel()

Cancel an ongoing DNS resolution.

Source code in src/cymongoose/_mongoose.pyi
def resolve_cancel(self) -> None:
    """Cancel an ongoing DNS resolution."""
    ...

http_basic_auth(username, password)

Send HTTP Basic Authentication credentials.

Typically used on client connections to authenticate with a server.

Parameters:

Name Type Description Default
username str

Username for basic auth

required
password str

Password for basic auth

required
Source code in src/cymongoose/_mongoose.pyi
def http_basic_auth(self, username: str, password: str) -> None:
    """Send HTTP Basic Authentication credentials.

    Typically used on client connections to authenticate with a server.

    Args:
        username: Username for basic auth
        password: Password for basic auth
    """
    ...

sntp_request()

Send an SNTP time request.

Use on a connection created with Manager.sntp_connect(). Triggers MG_EV_SNTP_TIME event when response is received.

Source code in src/cymongoose/_mongoose.pyi
def sntp_request(self) -> None:
    """Send an SNTP time request.

    Use on a connection created with Manager.sntp_connect().
    Triggers MG_EV_SNTP_TIME event when response is received.
    """
    ...

http_chunk(data)

Send an HTTP chunked transfer encoding chunk.

Used for streaming HTTP responses. Must call with empty data to end.

Parameters:

Name Type Description Default
data Union[str, bytes]

Chunk data (str or bytes). Empty to signal end of chunks.

required
Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: # Start chunked response conn.reply(200, "", headers={"Transfer-Encoding": "chunked"}) conn.http_chunk("First chunk\n") conn.http_chunk("Second chunk\n") conn.http_chunk("") # End chunks

Source code in src/cymongoose/_mongoose.pyi
def http_chunk(self, data: Union[str, bytes]) -> None:
    """Send an HTTP chunked transfer encoding chunk.

    Used for streaming HTTP responses. Must call with empty data to end.

    Args:
        data: Chunk data (str or bytes). Empty to signal end of chunks.

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                # Start chunked response
                conn.reply(200, "", headers={"Transfer-Encoding": "chunked"})
                conn.http_chunk("First chunk\\n")
                conn.http_chunk("Second chunk\\n")
                conn.http_chunk("")  # End chunks
    """
    ...

http_sse(event_type, data)

Send Server-Sent Events (SSE) formatted message.

SSE is used for real-time server push over HTTP. Must start with appropriate headers.

Parameters:

Name Type Description Default
event_type str

Event type name (e.g., "message", "update")

required
data str

Event data payload

required
Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: # Start SSE stream conn.reply(200, "", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache" }) conn.http_sse("message", "Hello from server") conn.http_sse("update", "Status: OK")

Source code in src/cymongoose/_mongoose.pyi
def http_sse(self, event_type: str, data: str) -> None:
    """Send Server-Sent Events (SSE) formatted message.

    SSE is used for real-time server push over HTTP. Must start with appropriate headers.

    Args:
        event_type: Event type name (e.g., "message", "update")
        data: Event data payload

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                # Start SSE stream
                conn.reply(200, "", headers={
                    "Content-Type": "text/event-stream",
                    "Cache-Control": "no-cache"
                })
                conn.http_sse("message", "Hello from server")
                conn.http_sse("update", "Status: OK")
    """
    ...

close()

Immediately close the connection.

For graceful shutdown, use drain() instead to let buffered data flush first.

Source code in src/cymongoose/_mongoose.pyi
def close(self) -> None:
    """Immediately close the connection.

    For graceful shutdown, use drain() instead to let buffered data flush first.
    """
    ...

drain()

Mark connection for graceful closure.

Sets is_draining=1, which tells Mongoose to: 1. Stop reading from the socket 2. Flush any buffered outgoing data 3. Close the connection after send buffer is empty

This is the recommended way to close connections from the server side, as it ensures response data is fully sent before closing.

Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: conn.reply(200, b"Goodbye!") conn.drain() # Close after response is sent

Source code in src/cymongoose/_mongoose.pyi
def drain(self) -> None:
    """Mark connection for graceful closure.

    Sets is_draining=1, which tells Mongoose to:
    1. Stop reading from the socket
    2. Flush any buffered outgoing data
    3. Close the connection after send buffer is empty

    This is the recommended way to close connections from the server side,
    as it ensures response data is fully sent before closing.

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                conn.reply(200, b"Goodbye!")
                conn.drain()  # Close after response is sent
    """
    ...

tls_init(opts)

Initialize TLS/SSL on this connection.

Parameters:

Name Type Description Default
opts TlsOpts

TlsOpts object with certificate, key, CA, etc.

required
Example

HTTPS server with custom certificate

opts = TlsOpts( cert=open('server.crt', 'rb').read(), key=open('server.key', 'rb').read() ) listener = manager.listen("https://0.0.0.0:443") listener.tls_init(opts)

HTTPS client with custom CA

opts = TlsOpts(ca=open('custom-ca.crt', 'rb').read()) conn = manager.connect("https://example.com") conn.tls_init(opts)

Source code in src/cymongoose/_mongoose.pyi
def tls_init(self, opts: TlsOpts) -> None:
    """Initialize TLS/SSL on this connection.

    Args:
        opts: TlsOpts object with certificate, key, CA, etc.

    Example:
        # HTTPS server with custom certificate
        opts = TlsOpts(
            cert=open('server.crt', 'rb').read(),
            key=open('server.key', 'rb').read()
        )
        listener = manager.listen("https://0.0.0.0:443")
        listener.tls_init(opts)

        # HTTPS client with custom CA
        opts = TlsOpts(ca=open('custom-ca.crt', 'rb').read())
        conn = manager.connect("https://example.com")
        conn.tls_init(opts)
    """
    ...

tls_free()

Free TLS/SSL resources on this connection.

Typically not needed as TLS is automatically freed when connection closes.

Source code in src/cymongoose/_mongoose.pyi
def tls_free(self) -> None:
    """Free TLS/SSL resources on this connection.

    Typically not needed as TLS is automatically freed when connection closes.
    """
    ...

__repr__()

Source code in src/cymongoose/_mongoose.pyi
def __repr__(self) -> str: ...

Overview

The Connection class represents a single network connection. Connections are created by Manager and passed to event handlers.

Important: Do not create Connection objects directly. They are created automatically by the Manager.

Connection Lifecycle

def handler(conn, ev, data):
    if ev == MG_EV_ACCEPT:
        # New inbound connection
        print(f"Client connected from {conn.remote_addr}")

    elif ev == MG_EV_CONNECT:
        # Outbound connection established
        print(f"Connected to {conn.remote_addr}")

    elif ev == MG_EV_CLOSE:
        # Connection closing
        print(f"Connection {conn.id} closed")
        # conn object becomes invalid after this event

Connection Properties

Basic Information

cymongoose.Connection.id property

Return connection ID.

cymongoose.Connection.handler property

Per-connection event handler.

cymongoose.Connection.userdata property writable

User-defined data attached to this connection.

State Flags

cymongoose.Connection.is_listening property

True if this is a listening connection.

cymongoose.Connection.is_client property

Return True if this is a client connection.

cymongoose.Connection.is_tls property

Return True if this connection uses TLS.

cymongoose.Connection.is_udp property

Return True if this is a UDP connection.

cymongoose.Connection.is_websocket property

Return True if this is a WebSocket connection.

cymongoose.Connection.is_readable property

Return True if connection has data to read.

cymongoose.Connection.is_writable property

Return True if connection can be written to.

cymongoose.Connection.is_closing property

True if connection is closing.

cymongoose.Connection.is_full property

Return True if receive buffer is full (backpressure - stop reads).

cymongoose.Connection.is_draining property

Return True if connection is draining (sending remaining data before close).

Addresses

cymongoose.Connection.local_addr property

Return local address as (ip, port, is_ipv6) tuple.

cymongoose.Connection.remote_addr property

Return remote address as (ip, port, is_ipv6) tuple.

Example:

local_ip, local_port, is_ipv6 = conn.local_addr
remote_ip, remote_port, is_ipv6 = conn.remote_addr

print(f"Local: {local_ip}:{local_port}")
print(f"Remote: {remote_ip}:{remote_port}")

Buffer Management

cymongoose.Connection.recv_len property

Return number of bytes in receive buffer.

cymongoose.Connection.send_len property

Return number of bytes in send buffer.

cymongoose.Connection.recv_size property

Return total allocated size of receive buffer.

cymongoose.Connection.send_size property

Return total allocated size of send buffer.

Flow control example:

def handler(conn, ev, data):
    if ev == MG_EV_READ:
        if conn.recv_len > 100000:  # 100KB
            print("Large amount of data buffered")

        if conn.is_full:
            print("Receive buffer full - backpressure active")

    if ev == MG_EV_WRITE:
        if conn.send_len > 0:
            print(f"Still sending {conn.send_len} bytes")

Sending Data

Raw Data

cymongoose.Connection.send(data)

Send raw bytes to the peer.

Parameters:

Name Type Description Default
data Union[str, bytes]

Data to send (str will be UTF-8 encoded)

required

Raises:

Type Description
RuntimeError

If send fails or connection is closed

Source code in src/cymongoose/_mongoose.pyi
def send(self, data: Union[str, bytes]) -> None:
    """Send raw bytes to the peer.

    Args:
        data: Data to send (str will be UTF-8 encoded)

    Raises:
        RuntimeError: If send fails or connection is closed
    """
    ...

Example:

# Send bytes
conn.send(b"Hello, World!")

# Send string (auto-encoded to UTF-8)
conn.send("Hello, World!")

HTTP Responses

cymongoose.Connection.reply(status_code, body=b'', headers=None)

Send a HTTP reply (final response).

Content-Type: text/plain is added automatically when headers is None or when the provided dict does not contain a Content-Type key (case-insensitive).

Parameters:

Name Type Description Default
status_code int

HTTP status code (e.g., 200, 404)

required
body Union[str, bytes]

Response body (str will be UTF-8 encoded)

b''
headers Optional[Dict[str, str]]

Optional dict of headers

None
Source code in src/cymongoose/_mongoose.pyi
def reply(
    self,
    status_code: int,
    body: Union[str, bytes] = b"",
    headers: Optional[Dict[str, str]] = None,
) -> None:
    """Send a HTTP reply (final response).

    ``Content-Type: text/plain`` is added automatically when *headers*
    is ``None`` or when the provided dict does not contain a
    ``Content-Type`` key (case-insensitive).

    Args:
        status_code: HTTP status code (e.g., 200, 404)
        body: Response body (str will be UTF-8 encoded)
        headers: Optional dict of headers
    """
    ...

Note: reply() adds Content-Type: text/plain by default when no Content-Type header is provided.

Example:

# Simple response (Content-Type: text/plain added automatically)
conn.reply(200, b"OK")

# HTML response
conn.reply(200, b"<html><body>Hello</body></html>",
          headers={"Content-Type": "text/html"})

JSON Responses

cymongoose.Connection.reply_json(data, status_code=200, headers=None)

Send a JSON HTTP reply.

Serialises data with json.dumps and sets Content-Type: application/json.

Parameters:

Name Type Description Default
data Any

JSON-serialisable object

required
status_code int

HTTP status code (default 200)

200
headers Optional[Dict[str, str]]

Optional extra headers (Content-Type is always set)

None
Source code in src/cymongoose/_mongoose.pyi
def reply_json(
    self,
    data: Any,
    status_code: int = 200,
    headers: Optional[Dict[str, str]] = None,
) -> None:
    """Send a JSON HTTP reply.

    Serialises *data* with ``json.dumps`` and sets
    ``Content-Type: application/json``.

    Args:
        data: JSON-serialisable object
        status_code: HTTP status code (default 200)
        headers: Optional extra headers (Content-Type is always set)
    """
    ...

Example:

# JSON response (Content-Type: application/json set automatically)
conn.reply_json({"status": "ok", "count": 42})

# With custom status code
conn.reply_json({"error": "not found"}, status_code=404)

Static Files

cymongoose.Connection.serve_dir(message, root_dir, extra_headers='', mime_types='', page404='')

Serve files from a directory using Mongoose's built-in static handler.

Parameters:

Name Type Description Default
message HttpMessage

HttpMessage from MG_EV_HTTP_MSG event

required
root_dir str

Root directory path

required
extra_headers str

Additional headers to include

''
mime_types str

Custom MIME type mappings

''
page404 str

Custom 404 page path

''

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def serve_dir(
    self,
    message: HttpMessage,
    root_dir: str,
    extra_headers: str = "",
    mime_types: str = "",
    page404: str = "",
) -> None:
    """Serve files from a directory using Mongoose's built-in static handler.

    Args:
        message: HttpMessage from MG_EV_HTTP_MSG event
        root_dir: Root directory path
        extra_headers: Additional headers to include
        mime_types: Custom MIME type mappings
        page404: Custom 404 page path

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

cymongoose.Connection.serve_file(message, path, extra_headers='', mime_types='')

Serve a single file using Mongoose's built-in static handler.

Parameters:

Name Type Description Default
message HttpMessage

HttpMessage from MG_EV_HTTP_MSG event

required
path str

File path to serve

required
extra_headers str

Additional headers to include

''
mime_types str

Custom MIME type mappings

''

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def serve_file(
    self, message: HttpMessage, path: str, extra_headers: str = "", mime_types: str = ""
) -> None:
    """Serve a single file using Mongoose's built-in static handler.

    Args:
        message: HttpMessage from MG_EV_HTTP_MSG event
        path: File path to serve
        extra_headers: Additional headers to include
        mime_types: Custom MIME type mappings

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

Example:

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        if data.uri.startswith("/static/"):
            # Serve static files
            conn.serve_dir(data, "./public")
        elif data.uri == "/favicon.ico":
            # Serve single file
            conn.serve_file(data, "./public/favicon.ico")
        else:
            # Dynamic response
            conn.reply(200, b"Hello!")
            conn.drain()

HTTP Streaming

cymongoose.Connection.http_chunk(data)

Send an HTTP chunked transfer encoding chunk.

Used for streaming HTTP responses. Must call with empty data to end.

Parameters:

Name Type Description Default
data Union[str, bytes]

Chunk data (str or bytes). Empty to signal end of chunks.

required
Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: # Start chunked response conn.reply(200, "", headers={"Transfer-Encoding": "chunked"}) conn.http_chunk("First chunk\n") conn.http_chunk("Second chunk\n") conn.http_chunk("") # End chunks

Source code in src/cymongoose/_mongoose.pyi
def http_chunk(self, data: Union[str, bytes]) -> None:
    """Send an HTTP chunked transfer encoding chunk.

    Used for streaming HTTP responses. Must call with empty data to end.

    Args:
        data: Chunk data (str or bytes). Empty to signal end of chunks.

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                # Start chunked response
                conn.reply(200, "", headers={"Transfer-Encoding": "chunked"})
                conn.http_chunk("First chunk\\n")
                conn.http_chunk("Second chunk\\n")
                conn.http_chunk("")  # End chunks
    """
    ...

Example:

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        # Start chunked response
        conn.reply(200, "",
                  headers={"Transfer-Encoding": "chunked"})

        # Send chunks
        conn.http_chunk("First chunk\n")
        conn.http_chunk("Second chunk\n")
        conn.http_chunk("Third chunk\n")

        # End chunked encoding
        conn.http_chunk("")  # Empty chunk signals end

Server-Sent Events

cymongoose.Connection.http_sse(event_type, data)

Send Server-Sent Events (SSE) formatted message.

SSE is used for real-time server push over HTTP. Must start with appropriate headers.

Parameters:

Name Type Description Default
event_type str

Event type name (e.g., "message", "update")

required
data str

Event data payload

required
Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: # Start SSE stream conn.reply(200, "", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache" }) conn.http_sse("message", "Hello from server") conn.http_sse("update", "Status: OK")

Source code in src/cymongoose/_mongoose.pyi
def http_sse(self, event_type: str, data: str) -> None:
    """Send Server-Sent Events (SSE) formatted message.

    SSE is used for real-time server push over HTTP. Must start with appropriate headers.

    Args:
        event_type: Event type name (e.g., "message", "update")
        data: Event data payload

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                # Start SSE stream
                conn.reply(200, "", headers={
                    "Content-Type": "text/event-stream",
                    "Cache-Control": "no-cache"
                })
                conn.http_sse("message", "Hello from server")
                conn.http_sse("update", "Status: OK")
    """
    ...

Example:

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG and data.uri == "/events":
        # Start SSE stream
        conn.reply(200, "",
                  headers={
                      "Content-Type": "text/event-stream",
                      "Cache-Control": "no-cache",
                  })

        # Send events
        conn.http_sse("message", "Hello from server")
        conn.http_sse("update", json.dumps({"value": 42}))

WebSocket

Upgrade

cymongoose.Connection.ws_upgrade(message, extra_headers=None)

Upgrade HTTP connection to WebSocket.

Parameters:

Name Type Description Default
message HttpMessage

The HttpMessage from MG_EV_HTTP_MSG event

required
extra_headers Optional[Dict[str, str]]

Optional dict of extra headers to send in upgrade response

None

Raises:

Type Description
ValueError

If HttpMessage is not valid for this event

Source code in src/cymongoose/_mongoose.pyi
def ws_upgrade(
    self, message: HttpMessage, extra_headers: Optional[Dict[str, str]] = None
) -> None:
    """Upgrade HTTP connection to WebSocket.

    Args:
        message: The HttpMessage from MG_EV_HTTP_MSG event
        extra_headers: Optional dict of extra headers to send in upgrade response

    Raises:
        ValueError: If HttpMessage is not valid for this event
    """
    ...

Send Messages

cymongoose.Connection.ws_send(data, op=WEBSOCKET_OP_TEXT)

Send a WebSocket frame.

Parameters:

Name Type Description Default
data Union[str, bytes]

Frame data (str will be UTF-8 encoded)

required
op int

WebSocket opcode (default: WEBSOCKET_OP_TEXT)

WEBSOCKET_OP_TEXT
Source code in src/cymongoose/_mongoose.pyi
def ws_send(self, data: Union[str, bytes], op: int = WEBSOCKET_OP_TEXT) -> None:
    """Send a WebSocket frame.

    Args:
        data: Frame data (str will be UTF-8 encoded)
        op: WebSocket opcode (default: WEBSOCKET_OP_TEXT)
    """
    ...

Example:

from cymongoose import (
    MG_EV_HTTP_MSG,
    MG_EV_WS_MSG,
    WEBSOCKET_OP_TEXT,
    WEBSOCKET_OP_BINARY,
)

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        if data.uri == "/ws":
            # Upgrade to WebSocket
            conn.ws_upgrade(data)

    elif ev == MG_EV_WS_MSG:
        # Echo message back
        if data.flags == WEBSOCKET_OP_TEXT:
            conn.ws_send(f"Echo: {data.text}", WEBSOCKET_OP_TEXT)
        else:
            conn.ws_send(data.data, WEBSOCKET_OP_BINARY)

MQTT

Publish

cymongoose.Connection.mqtt_pub(topic, message, qos=0, retain=False)

Publish an MQTT message.

Parameters:

Name Type Description Default
topic str

MQTT topic

required
message Union[str, bytes]

Message payload (str or bytes)

required
qos int

Quality of service (0, 1, or 2)

0
retain bool

Retain flag

False

Returns:

Type Description
int

Message ID

Source code in src/cymongoose/_mongoose.pyi
def mqtt_pub(
    self, topic: str, message: Union[str, bytes], qos: int = 0, retain: bool = False
) -> int:
    """Publish an MQTT message.

    Args:
        topic: MQTT topic
        message: Message payload (str or bytes)
        qos: Quality of service (0, 1, or 2)
        retain: Retain flag

    Returns:
        Message ID
    """
    ...

Subscribe

cymongoose.Connection.mqtt_sub(topic, qos=0)

Subscribe to an MQTT topic.

Parameters:

Name Type Description Default
topic str

MQTT topic (can include wildcards)

required
qos int

Quality of service (0, 1, or 2)

0
Source code in src/cymongoose/_mongoose.pyi
def mqtt_sub(self, topic: str, qos: int = 0) -> None:
    """Subscribe to an MQTT topic.

    Args:
        topic: MQTT topic (can include wildcards)
        qos: Quality of service (0, 1, or 2)
    """
    ...

Other Operations

cymongoose.Connection.mqtt_ping()

Send MQTT ping.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_ping(self) -> None:
    """Send MQTT ping."""
    ...

cymongoose.Connection.mqtt_pong()

Send MQTT pong.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_pong(self) -> None:
    """Send MQTT pong."""
    ...

cymongoose.Connection.mqtt_disconnect()

Send MQTT disconnect message.

Gracefully disconnects from MQTT broker by sending a DISCONNECT packet.

Source code in src/cymongoose/_mongoose.pyi
def mqtt_disconnect(self) -> None:
    """Send MQTT disconnect message.

    Gracefully disconnects from MQTT broker by sending a DISCONNECT packet.
    """
    ...

Example:

from cymongoose import MG_EV_MQTT_OPEN, MG_EV_MQTT_MSG

def handler(conn, ev, data):
    if ev == MG_EV_MQTT_OPEN:
        # Connected to broker
        print("MQTT connected")

        # Subscribe to topics
        conn.mqtt_sub("sensors/+/temperature", qos=1)
        conn.mqtt_sub("sensors/+/humidity", qos=1)

    elif ev == MG_EV_MQTT_MSG:
        # Message received
        print(f"Topic: {data.topic}")
        print(f"Message: {data.text}")

        # Publish response
        conn.mqtt_pub("ack/received", "OK", qos=1, retain=False)

TLS/SSL

Initialize TLS

cymongoose.Connection.tls_init(opts)

Initialize TLS/SSL on this connection.

Parameters:

Name Type Description Default
opts TlsOpts

TlsOpts object with certificate, key, CA, etc.

required
Example

HTTPS server with custom certificate

opts = TlsOpts( cert=open('server.crt', 'rb').read(), key=open('server.key', 'rb').read() ) listener = manager.listen("https://0.0.0.0:443") listener.tls_init(opts)

HTTPS client with custom CA

opts = TlsOpts(ca=open('custom-ca.crt', 'rb').read()) conn = manager.connect("https://example.com") conn.tls_init(opts)

Source code in src/cymongoose/_mongoose.pyi
def tls_init(self, opts: TlsOpts) -> None:
    """Initialize TLS/SSL on this connection.

    Args:
        opts: TlsOpts object with certificate, key, CA, etc.

    Example:
        # HTTPS server with custom certificate
        opts = TlsOpts(
            cert=open('server.crt', 'rb').read(),
            key=open('server.key', 'rb').read()
        )
        listener = manager.listen("https://0.0.0.0:443")
        listener.tls_init(opts)

        # HTTPS client with custom CA
        opts = TlsOpts(ca=open('custom-ca.crt', 'rb').read())
        conn = manager.connect("https://example.com")
        conn.tls_init(opts)
    """
    ...

Free TLS

cymongoose.Connection.tls_free()

Free TLS/SSL resources on this connection.

Typically not needed as TLS is automatically freed when connection closes.

Source code in src/cymongoose/_mongoose.pyi
def tls_free(self) -> None:
    """Free TLS/SSL resources on this connection.

    Typically not needed as TLS is automatically freed when connection closes.
    """
    ...

Example:

from cymongoose import MG_EV_ACCEPT, TlsOpts

# Load certificates
cert = open("server.crt", "rb").read()
key = open("server.key", "rb").read()
ca = open("ca.crt", "rb").read()

def handler(conn, ev, data):
    if ev == MG_EV_ACCEPT:
        # Initialize TLS on accepted connection
        opts = TlsOpts(cert=cert, key=key, ca=ca)
        conn.tls_init(opts)

    elif ev == MG_EV_TLS_HS:
        # TLS handshake complete
        print("TLS established")

DNS Resolution

cymongoose.Connection.resolve(url)

Resolve a hostname asynchronously.

Triggers MG_EV_RESOLVE event when DNS lookup completes.

Parameters:

Name Type Description Default
url str

URL to resolve (e.g., "google.com" or "tcp://example.com:80")

required
Source code in src/cymongoose/_mongoose.pyi
def resolve(self, url: str) -> None:
    """Resolve a hostname asynchronously.

    Triggers MG_EV_RESOLVE event when DNS lookup completes.

    Args:
        url: URL to resolve (e.g., "google.com" or "tcp://example.com:80")
    """
    ...

cymongoose.Connection.resolve_cancel()

Cancel an ongoing DNS resolution.

Source code in src/cymongoose/_mongoose.pyi
def resolve_cancel(self) -> None:
    """Cancel an ongoing DNS resolution."""
    ...

Example:

from cymongoose import MG_EV_RESOLVE

def handler(conn, ev, data):
    if ev == MG_EV_OPEN:
        # Start DNS resolution
        conn.resolve("example.com")

    elif ev == MG_EV_RESOLVE:
        # Resolution complete
        print(f"Resolved to: {conn.remote_addr}")

SNTP (Time Sync)

cymongoose.Connection.sntp_request()

Send an SNTP time request.

Use on a connection created with Manager.sntp_connect(). Triggers MG_EV_SNTP_TIME event when response is received.

Source code in src/cymongoose/_mongoose.pyi
def sntp_request(self) -> None:
    """Send an SNTP time request.

    Use on a connection created with Manager.sntp_connect().
    Triggers MG_EV_SNTP_TIME event when response is received.
    """
    ...

Example:

from cymongoose import MG_EV_SNTP_TIME

def handler(conn, ev, data):
    if ev == MG_EV_CONNECT:
        # Connected to SNTP server
        conn.sntp_request()

    elif ev == MG_EV_SNTP_TIME:
        # Time received (milliseconds since epoch)
        import datetime
        dt = datetime.datetime.fromtimestamp(data / 1000.0)
        print(f"Server time: {dt}")

Authentication

cymongoose.Connection.http_basic_auth(username, password)

Send HTTP Basic Authentication credentials.

Typically used on client connections to authenticate with a server.

Parameters:

Name Type Description Default
username str

Username for basic auth

required
password str

Password for basic auth

required
Source code in src/cymongoose/_mongoose.pyi
def http_basic_auth(self, username: str, password: str) -> None:
    """Send HTTP Basic Authentication credentials.

    Typically used on client connections to authenticate with a server.

    Args:
        username: Username for basic auth
        password: Password for basic auth
    """
    ...

Example:

from cymongoose import MG_EV_CONNECT

def handler(conn, ev, data):
    if ev == MG_EV_CONNECT:
        # Send basic auth credentials
        conn.http_basic_auth("username", "password")
        conn.send(b"GET /protected HTTP/1.1\r\n\r\n")

Buffer Access

cymongoose.Connection.recv_data(length=-1)

Read data from receive buffer without consuming it.

Parameters:

Name Type Description Default
length int

Number of bytes to read, or -1 for all

-1

Returns:

Type Description
bytes

Data from receive buffer

Source code in src/cymongoose/_mongoose.pyi
def recv_data(self, length: int = -1) -> bytes:
    """Read data from receive buffer without consuming it.

    Args:
        length: Number of bytes to read, or -1 for all

    Returns:
        Data from receive buffer
    """
    ...

cymongoose.Connection.send_data(length=-1)

Read data from send buffer without consuming it.

Parameters:

Name Type Description Default
length int

Number of bytes to read, or -1 for all

-1

Returns:

Type Description
bytes

Data from send buffer

Source code in src/cymongoose/_mongoose.pyi
def send_data(self, length: int = -1) -> bytes:
    """Read data from send buffer without consuming it.

    Args:
        length: Number of bytes to read, or -1 for all

    Returns:
        Data from send buffer
    """
    ...

Example:

from cymongoose import MG_EV_READ

def handler(conn, ev, data):
    if ev == MG_EV_READ:
        # Peek at receive buffer
        data = conn.recv_data(100)  # First 100 bytes
        print(f"Received: {data}")

        # Check send buffer
        pending = conn.send_data()
        if len(pending) > 10000:
            print("Large amount of data pending")

Connection Management

Per-Connection Handler

cymongoose.Connection.set_handler(handler)

Assign a per-connection event handler.

Parameters:

Name Type Description Default
handler Optional[EventHandler]

Callable that takes (Connection, event, data)

required
Source code in src/cymongoose/_mongoose.pyi
def set_handler(self, handler: Optional[EventHandler]) -> None:
    """Assign a per-connection event handler.

    Args:
        handler: Callable that takes (Connection, event, data)
    """
    ...

Example:

def main_handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        if data.uri == "/ws":
            # Switch to WebSocket handler
            conn.set_handler(websocket_handler)
            conn.ws_upgrade(data)

def websocket_handler(conn, ev, data):
    if ev == MG_EV_WS_MSG:
        conn.ws_send(f"Echo: {data.text}")

User Data

Store custom data on connections:

def handler(conn, ev, data):
    if ev == MG_EV_ACCEPT:
        # Initialize user data
        conn.userdata = {
            "authenticated": False,
            "username": None,
            "connected_at": time.time(),
        }

    elif ev == MG_EV_HTTP_MSG:
        # Access user data
        if conn.userdata["authenticated"]:
            conn.reply(200, b"Welcome!")
        else:
            conn.reply(401, b"Unauthorized")

Closing Connections

cymongoose.Connection.close()

Immediately close the connection.

For graceful shutdown, use drain() instead to let buffered data flush first.

Source code in src/cymongoose/_mongoose.pyi
def close(self) -> None:
    """Immediately close the connection.

    For graceful shutdown, use drain() instead to let buffered data flush first.
    """
    ...

cymongoose.Connection.drain()

Mark connection for graceful closure.

Sets is_draining=1, which tells Mongoose to: 1. Stop reading from the socket 2. Flush any buffered outgoing data 3. Close the connection after send buffer is empty

This is the recommended way to close connections from the server side, as it ensures response data is fully sent before closing.

Example

def handler(conn, ev, data): if ev == MG_EV_HTTP_MSG: conn.reply(200, b"Goodbye!") conn.drain() # Close after response is sent

Source code in src/cymongoose/_mongoose.pyi
def drain(self) -> None:
    """Mark connection for graceful closure.

    Sets is_draining=1, which tells Mongoose to:
    1. Stop reading from the socket
    2. Flush any buffered outgoing data
    3. Close the connection after send buffer is empty

    This is the recommended way to close connections from the server side,
    as it ensures response data is fully sent before closing.

    Example:
        def handler(conn, ev, data):
            if ev == MG_EV_HTTP_MSG:
                conn.reply(200, b"Goodbye!")
                conn.drain()  # Close after response is sent
    """
    ...

Important: Use drain() for graceful shutdown, not close().

Example:

def handler(conn, ev, data):
    if ev == MG_EV_HTTP_MSG:
        conn.reply(200, b"Goodbye!")

        # Graceful close - flushes send buffer first
        conn.drain()

        # Immediate close (may lose data)
        # conn.close()  # DON'T use this

Error Handling

cymongoose.Connection.error(message)

Trigger an error event on this connection.

Parameters:

Name Type Description Default
message str

Error message

required
Source code in src/cymongoose/_mongoose.pyi
def error(self, message: str) -> None:
    """Trigger an error event on this connection.

    Args:
        message: Error message
    """
    ...

Example:

from cymongoose import MG_EV_ERROR

def handler(conn, ev, data):
    if ev == MG_EV_ERROR:
        # data is error message string
        print(f"Error on connection {conn.id}: {data}")
        conn.close()

    # Trigger error manually
    if some_error_condition:
        conn.error("Custom error message")

See Also

  • Manager - Event loop management
  • HttpMessage, WsMessage, MqttMessage - Message views
  • Guide - Protocol guides