Skip to content

Streaming

Block-based processing, ring buffers, and processor chains for streaming audio.

Usage examples

Ring buffer

from nanodsp.stream import RingBuffer
from nanodsp.buffer import AudioBuffer
import numpy as np

# Create a stereo ring buffer with 8192 frames of capacity
rb = RingBuffer(channels=2, capacity=8192, sample_rate=48000)

# Write audio data
chunk = AudioBuffer.noise(channels=2, frames=512)
written = rb.write(chunk)

# Check available data
print(f"Available to read: {rb.available_read}")
print(f"Available to write: {rb.available_write}")

# Read audio data (consumes from buffer)
out = rb.read(256)

# Peek without consuming
peeked = rb.peek(256)

# Clear the buffer
rb.clear()

Block processor (subclass)

from nanodsp.stream import BlockProcessor
from nanodsp.effects import filters

class LowpassProcessor(BlockProcessor):
    def __init__(self, cutoff_hz, sample_rate=48000):
        super().__init__(block_size=512, channels=1, sample_rate=sample_rate)
        self.cutoff_hz = cutoff_hz

    def process_block(self, block):
        return filters.lowpass(block, cutoff_hz=self.cutoff_hz)

proc = LowpassProcessor(cutoff_hz=1000.0)
buf = AudioBuffer.from_file("input.wav")
filtered = proc.process(buf)

Callback processor

from nanodsp.stream import CallbackProcessor

# Quick inline processor with a lambda
gain_proc = CallbackProcessor(
    callback=lambda block: block * 0.5,
    block_size=512,
)
quiet = gain_proc.process(buf)

Processor chain

from nanodsp.stream import ProcessorChain, CallbackProcessor

# Chain multiple processors
chain = ProcessorChain(
    CallbackProcessor(lambda b: b * 2.0, block_size=512),       # boost
    CallbackProcessor(lambda b: b.pipe(filters.lowpass, cutoff_hz=2000), block_size=512),
    CallbackProcessor(lambda b: b * 0.5, block_size=512),       # attenuate
)
result = chain.process(buf)
chain.reset()

Overlap-add block processing

from nanodsp.stream import process_blocks

# Apply a function to overlapping blocks with automatic reconstruction
def spectral_fn(block):
    # Any per-block processing
    return block * 0.8

out = process_blocks(buf, fn=spectral_fn, block_size=2048, hop_size=512)

API reference

stream

Streaming / real-time audio processing infrastructure.

Provides ring buffers, block processors, and overlap-add utilities for processing audio in fixed-size chunks.

RingBuffer

RingBuffer(
    channels: int,
    capacity: int,
    sample_rate: float = 48000.0,
)

Ring buffer for streaming audio.

Stores planar float32 audio in a circular buffer with independent read and write positions.

.. warning::

This class is **not thread-safe**.  Concurrent reads and writes
from different threads can corrupt internal state.  If you need
to share a RingBuffer between a producer thread and a consumer
thread, protect every ``read``/``write``/``peek``/``clear`` call
with an external lock (e.g. ``threading.Lock``).
PARAMETER DESCRIPTION
channels

Number of audio channels.

TYPE: int

capacity

Maximum number of frames the buffer can hold.

TYPE: int

sample_rate

Sample rate metadata for AudioBuffer output.

TYPE: float DEFAULT: 48000.0

available_read property

available_read: int

Number of frames available to read.

available_write property

available_write: int

Number of frames that can be written before full.

write

write(data: AudioBuffer | ndarray) -> int

Write frames into the buffer.

Returns the number of frames actually written (may be less than requested if the buffer is nearly full).

read

read(frames: int) -> AudioBuffer

Read and consume frames from the buffer.

Returns an AudioBuffer that may be shorter than frames if insufficient data is available.

peek

peek(frames: int) -> AudioBuffer

Read frames without consuming them.

clear

clear() -> None

Reset to empty without reallocating.

BlockProcessor

BlockProcessor(
    block_size: int,
    channels: int = 1,
    sample_rate: float = 48000.0,
)

Base class for block-based audio processors.

Subclass and override :meth:process_block to implement custom processing. Call :meth:process to run on an arbitrary-length buffer.

.. note::

Each block is processed independently -- no state is carried
between successive blocks.  This means stateful DSP objects
(IIR filters, reverbs, compressors) that are instantiated inside
``process_block`` will be re-created per block, losing their
internal memory.  To preserve state across blocks, instantiate
stateful objects in ``__init__`` and reuse them in
``process_block``.
PARAMETER DESCRIPTION
block_size

Number of frames per processing block.

TYPE: int

channels

Expected channel count.

TYPE: int DEFAULT: 1

sample_rate

Sample rate metadata.

TYPE: float DEFAULT: 48000.0

process_block

process_block(block: AudioBuffer) -> AudioBuffer

Process exactly block_size frames. Must return same shape.

Override in subclasses.

process

process(buf: AudioBuffer) -> AudioBuffer

Process an entire buffer in block_size chunks.

The last block is zero-padded if needed; output is trimmed to the original length.

reset

reset() -> None

Reset internal state. Override in subclasses if needed.

CallbackProcessor

CallbackProcessor(
    callback,
    block_size: int,
    channels: int = 1,
    sample_rate: float = 48000.0,
)

Bases: BlockProcessor

Wraps a callable as a BlockProcessor.

PARAMETER DESCRIPTION
callback

Function (AudioBuffer) -> AudioBuffer that processes one block.

TYPE: callable

block_size

Block size in frames.

TYPE: int

channels

Channel count.

TYPE: int DEFAULT: 1

sample_rate

Sample rate metadata.

TYPE: float DEFAULT: 48000.0

ProcessorChain

ProcessorChain(*processors: BlockProcessor)

Chain multiple BlockProcessors in series.

PARAMETER DESCRIPTION
*processors

Processors to chain. All must share the same block_size.

TYPE: BlockProcessor DEFAULT: ()

process

process(buf: AudioBuffer) -> AudioBuffer

Process through all processors in order.

reset

reset() -> None

Reset all processors.

process_blocks

process_blocks(
    buf: AudioBuffer,
    fn,
    block_size: int,
    hop_size: int | None = None,
) -> AudioBuffer

Process a buffer through fn in blocks, optionally with overlap-add.

PARAMETER DESCRIPTION
buf

Input audio.

TYPE: AudioBuffer

fn

(AudioBuffer) -> AudioBuffer block processing function.

TYPE: callable

block_size

Size of each processing block in frames.

TYPE: int

hop_size

Hop between successive blocks. None or equal to block_size means non-overlapping. Values < block_size trigger overlap-add with Hann windowing and COLA normalization.

TYPE: int or None DEFAULT: None

RETURNS DESCRIPTION
AudioBuffer

Processed audio, same length as input.