Skip to content

API Reference

Complete API reference for coremusic. The package provides both functional and object-oriented APIs.

Note

The object-oriented API is recommended for new applications due to automatic resource management and Pythonic interfaces.

Object-Oriented API

High-level Pythonic wrappers with automatic resource management.

AudioFile Class

coremusic.audio.AudioFile

Bases: CoreAudioObject

High-level audio file operations with automatic resource management

Source code in src/coremusic/audio/core.py
class AudioFile(capi.CoreAudioObject):
    """High-level audio file operations with automatic resource management"""

    def __init__(self, path: str | Path, *, writable: bool = False):
        super().__init__()
        self._path = str(path)
        self._format: AudioFormat | None = None
        self._is_open = False
        self._writable = writable

    def open(self) -> "AudioFile":
        """Open the audio file"""
        self._ensure_not_disposed()
        if not self._is_open:
            try:
                permissions = 3 if self._writable else 1  # READ_WRITE or READ
                file_id = capi.audio_file_open_url(self._path, permissions)
                self._set_object_id(file_id)
                self._is_open = True
            except Exception as e:
                raise AudioFileError(f"Failed to open file {self._path}: {e}")
        return self

    def close(self) -> None:
        """Close the audio file"""
        if self._is_open:
            try:
                capi.audio_file_close(self.object_id)
            except Exception as e:
                raise AudioFileError(f"Failed to close file: {e}")
            finally:
                self._is_open = False
                self.dispose()

    def __enter__(self) -> "AudioFile":
        self.open()
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        self.close()

    @property
    def format(self) -> AudioFormat:
        """Get the audio format of the file"""
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        if self._format is None:
            try:
                format_data = capi.audio_file_get_property(
                    self.object_id, capi.get_audio_file_property_data_format()
                )
                self._format = AudioFormat.from_asbd_bytes(format_data)
            except Exception as e:
                raise AudioFileError(f"Failed to get format: {e}")

        return self._format

    def read_packets(self, start_packet: int, packet_count: int) -> tuple[bytes, int]:
        """Read audio packets from the file.

        Args:
            start_packet: Starting packet index (must be non-negative)
            packet_count: Number of packets to read (must be positive)

        Returns:
            Tuple of (audio_data_bytes, packets_read)

        Raises:
            ValueError: If start_packet < 0 or packet_count <= 0
            AudioFileError: If reading fails

        Example::

            from coremusic.audio import AudioFile

            # Read audio data in chunks
            with AudioFile("audio.wav") as audio:
                chunk_size = 4096
                offset = 0

                while True:
                    data, packets_read = audio.read_packets(offset, chunk_size)
                    if packets_read == 0:
                        break
                    # Process data...
                    offset += packets_read
        """
        if start_packet < 0:
            raise ValueError(f"start_packet must be non-negative, got {start_packet}")
        if packet_count <= 0:
            raise ValueError(f"packet_count must be positive, got {packet_count}")

        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            return capi.audio_file_read_packets(
                self.object_id, start_packet, packet_count
            )
        except Exception as e:
            raise AudioFileError(f"Failed to read packets: {e}")

    def read_as_numpy(
        self, start_packet: int = 0, packet_count: int | None = None
    ) -> "NDArray[Any]":
        """
        Read audio data from the file as a NumPy array.

        Args:
            start_packet: Starting packet index (default: 0)
            packet_count: Number of packets to read (default: all remaining packets)

        Returns:
            NumPy array with shape (frames, channels) for multi-channel audio,
            or (frames,) for mono audio. The dtype is determined by the audio format.

        Raises:
            ImportError: If NumPy is not available
            AudioFileError: If reading fails

        Example:

            with AudioFile("audio.wav") as audio:
                data = audio.read_as_numpy()
                print(f"Shape: {data.shape}, dtype: {data.dtype}")

            # output: Shape: (44100, 2), dtype: int16
        """
        if not NUMPY_AVAILABLE:
            raise ImportError(
                "NumPy is not available. Install numpy to use this feature."
            )

        if start_packet < 0:
            raise ValueError(f"start_packet must be non-negative, got {start_packet}")
        if packet_count is not None and packet_count <= 0:
            raise ValueError(f"packet_count must be positive, got {packet_count}")

        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            # Get format information
            format = self.format

            # If packet_count not specified, read all remaining packets
            if packet_count is None:
                # Get total packet count from file
                packet_count_data = capi.audio_file_get_property(
                    self.object_id,
                    capi.get_audio_file_property_audio_data_packet_count(),
                )
                if len(packet_count_data) >= 8:
                    total_packets = struct.unpack("<Q", packet_count_data[:8])[0]
                    packet_count = total_packets - start_packet
                else:
                    raise AudioFileError("Cannot determine packet count")

            # Read the raw audio data
            data_bytes, actual_count = capi.audio_file_read_packets(
                self.object_id, start_packet, packet_count
            )

            # Get NumPy dtype from format
            dtype = format.to_numpy_dtype()

            # Convert bytes to NumPy array
            audio_data = np.frombuffer(data_bytes, dtype=dtype)

            # Reshape for multi-channel audio
            # Audio data is typically interleaved: L R L R L R ...
            if format.channels_per_frame > 1:
                # Calculate number of frames
                samples_per_frame = format.channels_per_frame
                num_frames = len(audio_data) // samples_per_frame

                # Reshape to (frames, channels)
                audio_data = audio_data[: num_frames * samples_per_frame].reshape(
                    num_frames, samples_per_frame
                )

            return audio_data

        except Exception as e:
            if isinstance(e, (ImportError, AudioFileError)):
                raise
            raise AudioFileError(f"Failed to read as NumPy array: {e}")

    def get_property(self, property_id: int) -> bytes:
        """Get a property from the audio file"""
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            return capi.audio_file_get_property(self.object_id, property_id)
        except Exception as e:
            raise AudioFileError(f"Failed to get property: {e}")

    def set_property(self, property_id: int, data: bytes) -> None:
        """Set a property on the audio file.

        The file must have been opened with writable=True.
        """
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            capi.audio_file_set_property(self.object_id, property_id, data)
        except Exception as e:
            raise AudioFileError(f"Failed to set property: {e}")

    @property
    def metadata(self) -> dict[str, Any] | None:
        """Read the info dictionary metadata from the audio file.

        Returns a dict with string keys and string/number/bytes values,
        or None if the file format does not support metadata.
        """
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            return capi.audio_file_read_info_dictionary(self.object_id)
        except Exception:
            return None

    def set_metadata(self, tags: dict[str, Any]) -> None:
        """Write metadata tags to the audio file.

        The file must have been opened with writable=True.
        Keys should be strings. Values can be str, int, or float.

        Common keys: 'title', 'artist', 'album', 'genre', 'year',
        'track number', 'comments', 'approximate duration in seconds'.
        """
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()
        if not self._writable:
            raise AudioFileError(
                "File not opened for writing. Use AudioFile(path, writable=True)."
            )

        try:
            capi.audio_file_write_info_dictionary(self.object_id, tags)
        except Exception as e:
            raise AudioFileError(f"Failed to write metadata: {e}")

    @property
    def duration(self) -> float:
        """Duration in seconds"""
        self._ensure_not_disposed()
        if not self._is_open:
            self.open()

        try:
            # Try to get estimated duration property
            duration_data = capi.audio_file_get_property(
                self.object_id, capi.get_audio_file_property_estimated_duration()
            )
            if len(duration_data) >= 8:
                # Duration is a Float64 (double)
                duration: float = struct.unpack("<d", duration_data[:8])[0]
                return duration
            else:
                # Fallback: calculate from packet count and sample rate
                packet_count_data = capi.audio_file_get_property(
                    self.object_id,
                    capi.get_audio_file_property_audio_data_packet_count(),
                )
                if len(packet_count_data) >= 8:
                    packet_count = struct.unpack("<Q", packet_count_data[:8])[0]
                    format = self.format
                    if format.sample_rate > 0:
                        calculated: float = (
                            packet_count * format.frames_per_packet / format.sample_rate
                        )
                        return calculated
                return 0.0
        except Exception:
            # If all methods fail, return 0.0
            return 0.0

    def __repr__(self) -> str:
        status = "open" if self._is_open else "closed"
        return f"AudioFile({self._path}, {status})"

    def dispose(self) -> None:
        """Dispose of the audio file"""
        if not self.is_disposed:
            if self._is_open:
                try:
                    capi.audio_file_close(self.object_id)
                except Exception:
                    pass  # Best effort cleanup
                finally:
                    self._is_open = False
            super().dispose()

_path = str(path) instance-attribute

_format = None instance-attribute

_is_open = False instance-attribute

_writable = writable instance-attribute

format property

Get the audio format of the file

metadata property

Read the info dictionary metadata from the audio file.

Returns a dict with string keys and string/number/bytes values, or None if the file format does not support metadata.

duration property

Duration in seconds

__init__(path, *, writable=False)

Source code in src/coremusic/audio/core.py
def __init__(self, path: str | Path, *, writable: bool = False):
    super().__init__()
    self._path = str(path)
    self._format: AudioFormat | None = None
    self._is_open = False
    self._writable = writable

open()

Open the audio file

Source code in src/coremusic/audio/core.py
def open(self) -> "AudioFile":
    """Open the audio file"""
    self._ensure_not_disposed()
    if not self._is_open:
        try:
            permissions = 3 if self._writable else 1  # READ_WRITE or READ
            file_id = capi.audio_file_open_url(self._path, permissions)
            self._set_object_id(file_id)
            self._is_open = True
        except Exception as e:
            raise AudioFileError(f"Failed to open file {self._path}: {e}")
    return self

close()

Close the audio file

Source code in src/coremusic/audio/core.py
def close(self) -> None:
    """Close the audio file"""
    if self._is_open:
        try:
            capi.audio_file_close(self.object_id)
        except Exception as e:
            raise AudioFileError(f"Failed to close file: {e}")
        finally:
            self._is_open = False
            self.dispose()

__enter__()

Source code in src/coremusic/audio/core.py
def __enter__(self) -> "AudioFile":
    self.open()
    return self

__exit__(exc_type, exc_val, exc_tb)

Source code in src/coremusic/audio/core.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    self.close()

read_packets(start_packet, packet_count)

Read audio packets from the file.

Parameters:

Name Type Description Default
start_packet int

Starting packet index (must be non-negative)

required
packet_count int

Number of packets to read (must be positive)

required

Returns:

Type Description
tuple[bytes, int]

Tuple of (audio_data_bytes, packets_read)

Raises:

Type Description
ValueError

If start_packet < 0 or packet_count <= 0

AudioFileError

If reading fails

Example::

from coremusic.audio import AudioFile

# Read audio data in chunks
with AudioFile("audio.wav") as audio:
    chunk_size = 4096
    offset = 0

    while True:
        data, packets_read = audio.read_packets(offset, chunk_size)
        if packets_read == 0:
            break
        # Process data...
        offset += packets_read
Source code in src/coremusic/audio/core.py
def read_packets(self, start_packet: int, packet_count: int) -> tuple[bytes, int]:
    """Read audio packets from the file.

    Args:
        start_packet: Starting packet index (must be non-negative)
        packet_count: Number of packets to read (must be positive)

    Returns:
        Tuple of (audio_data_bytes, packets_read)

    Raises:
        ValueError: If start_packet < 0 or packet_count <= 0
        AudioFileError: If reading fails

    Example::

        from coremusic.audio import AudioFile

        # Read audio data in chunks
        with AudioFile("audio.wav") as audio:
            chunk_size = 4096
            offset = 0

            while True:
                data, packets_read = audio.read_packets(offset, chunk_size)
                if packets_read == 0:
                    break
                # Process data...
                offset += packets_read
    """
    if start_packet < 0:
        raise ValueError(f"start_packet must be non-negative, got {start_packet}")
    if packet_count <= 0:
        raise ValueError(f"packet_count must be positive, got {packet_count}")

    self._ensure_not_disposed()
    if not self._is_open:
        self.open()

    try:
        return capi.audio_file_read_packets(
            self.object_id, start_packet, packet_count
        )
    except Exception as e:
        raise AudioFileError(f"Failed to read packets: {e}")

read_as_numpy(start_packet=0, packet_count=None)

Read audio data from the file as a NumPy array.

Parameters:

Name Type Description Default
start_packet int

Starting packet index (default: 0)

0
packet_count int | None

Number of packets to read (default: all remaining packets)

None

Returns:

Type Description
'NDArray[Any]'

NumPy array with shape (frames, channels) for multi-channel audio,

'NDArray[Any]'

or (frames,) for mono audio. The dtype is determined by the audio format.

Raises:

Type Description
ImportError

If NumPy is not available

AudioFileError

If reading fails

Example:

with AudioFile("audio.wav") as audio:
    data = audio.read_as_numpy()
    print(f"Shape: {data.shape}, dtype: {data.dtype}")

# output: Shape: (44100, 2), dtype: int16
Source code in src/coremusic/audio/core.py
def read_as_numpy(
    self, start_packet: int = 0, packet_count: int | None = None
) -> "NDArray[Any]":
    """
    Read audio data from the file as a NumPy array.

    Args:
        start_packet: Starting packet index (default: 0)
        packet_count: Number of packets to read (default: all remaining packets)

    Returns:
        NumPy array with shape (frames, channels) for multi-channel audio,
        or (frames,) for mono audio. The dtype is determined by the audio format.

    Raises:
        ImportError: If NumPy is not available
        AudioFileError: If reading fails

    Example:

        with AudioFile("audio.wav") as audio:
            data = audio.read_as_numpy()
            print(f"Shape: {data.shape}, dtype: {data.dtype}")

        # output: Shape: (44100, 2), dtype: int16
    """
    if not NUMPY_AVAILABLE:
        raise ImportError(
            "NumPy is not available. Install numpy to use this feature."
        )

    if start_packet < 0:
        raise ValueError(f"start_packet must be non-negative, got {start_packet}")
    if packet_count is not None and packet_count <= 0:
        raise ValueError(f"packet_count must be positive, got {packet_count}")

    self._ensure_not_disposed()
    if not self._is_open:
        self.open()

    try:
        # Get format information
        format = self.format

        # If packet_count not specified, read all remaining packets
        if packet_count is None:
            # Get total packet count from file
            packet_count_data = capi.audio_file_get_property(
                self.object_id,
                capi.get_audio_file_property_audio_data_packet_count(),
            )
            if len(packet_count_data) >= 8:
                total_packets = struct.unpack("<Q", packet_count_data[:8])[0]
                packet_count = total_packets - start_packet
            else:
                raise AudioFileError("Cannot determine packet count")

        # Read the raw audio data
        data_bytes, actual_count = capi.audio_file_read_packets(
            self.object_id, start_packet, packet_count
        )

        # Get NumPy dtype from format
        dtype = format.to_numpy_dtype()

        # Convert bytes to NumPy array
        audio_data = np.frombuffer(data_bytes, dtype=dtype)

        # Reshape for multi-channel audio
        # Audio data is typically interleaved: L R L R L R ...
        if format.channels_per_frame > 1:
            # Calculate number of frames
            samples_per_frame = format.channels_per_frame
            num_frames = len(audio_data) // samples_per_frame

            # Reshape to (frames, channels)
            audio_data = audio_data[: num_frames * samples_per_frame].reshape(
                num_frames, samples_per_frame
            )

        return audio_data

    except Exception as e:
        if isinstance(e, (ImportError, AudioFileError)):
            raise
        raise AudioFileError(f"Failed to read as NumPy array: {e}")

get_property(property_id)

Get a property from the audio file

Source code in src/coremusic/audio/core.py
def get_property(self, property_id: int) -> bytes:
    """Get a property from the audio file"""
    self._ensure_not_disposed()
    if not self._is_open:
        self.open()

    try:
        return capi.audio_file_get_property(self.object_id, property_id)
    except Exception as e:
        raise AudioFileError(f"Failed to get property: {e}")

set_property(property_id, data)

Set a property on the audio file.

The file must have been opened with writable=True.

Source code in src/coremusic/audio/core.py
def set_property(self, property_id: int, data: bytes) -> None:
    """Set a property on the audio file.

    The file must have been opened with writable=True.
    """
    self._ensure_not_disposed()
    if not self._is_open:
        self.open()

    try:
        capi.audio_file_set_property(self.object_id, property_id, data)
    except Exception as e:
        raise AudioFileError(f"Failed to set property: {e}")

set_metadata(tags)

Write metadata tags to the audio file.

The file must have been opened with writable=True. Keys should be strings. Values can be str, int, or float.

Common keys: 'title', 'artist', 'album', 'genre', 'year', 'track number', 'comments', 'approximate duration in seconds'.

Source code in src/coremusic/audio/core.py
def set_metadata(self, tags: dict[str, Any]) -> None:
    """Write metadata tags to the audio file.

    The file must have been opened with writable=True.
    Keys should be strings. Values can be str, int, or float.

    Common keys: 'title', 'artist', 'album', 'genre', 'year',
    'track number', 'comments', 'approximate duration in seconds'.
    """
    self._ensure_not_disposed()
    if not self._is_open:
        self.open()
    if not self._writable:
        raise AudioFileError(
            "File not opened for writing. Use AudioFile(path, writable=True)."
        )

    try:
        capi.audio_file_write_info_dictionary(self.object_id, tags)
    except Exception as e:
        raise AudioFileError(f"Failed to write metadata: {e}")

__repr__()

Source code in src/coremusic/audio/core.py
def __repr__(self) -> str:
    status = "open" if self._is_open else "closed"
    return f"AudioFile({self._path}, {status})"

dispose()

Dispose of the audio file

Source code in src/coremusic/audio/core.py
def dispose(self) -> None:
    """Dispose of the audio file"""
    if not self.is_disposed:
        if self._is_open:
            try:
                capi.audio_file_close(self.object_id)
            except Exception:
                pass  # Best effort cleanup
            finally:
                self._is_open = False
        super().dispose()

AudioFormat Class

coremusic.audio.AudioFormat

Pythonic representation of AudioStreamBasicDescription

Source code in src/coremusic/audio/core.py
class AudioFormat:
    """Pythonic representation of AudioStreamBasicDescription"""

    def __init__(
        self,
        sample_rate: float,
        format_id: str,
        format_flags: int = 0,
        bytes_per_packet: int = 0,
        frames_per_packet: int = 0,
        bytes_per_frame: int = 0,
        channels_per_frame: int = 2,
        bits_per_channel: int = 16,
    ):
        self.sample_rate = sample_rate
        self.format_id = format_id
        self.format_flags = format_flags
        self.bytes_per_packet = bytes_per_packet
        self.frames_per_packet = frames_per_packet
        self.bytes_per_frame = bytes_per_frame
        self.channels_per_frame = channels_per_frame
        self.bits_per_channel = bits_per_channel

    @classmethod
    def from_asbd_bytes(cls, data: bytes) -> "AudioFormat":
        """Parse an AudioStreamBasicDescription (40 bytes) into an AudioFormat.

        Args:
            data: Raw ASBD bytes (at least 40 bytes)

        Returns:
            AudioFormat with parsed fields

        Raises:
            ValueError: If data is too short
        """
        if len(data) < 40:
            raise ValueError(f"ASBD data too short: {len(data)} bytes (need 40)")
        (
            sample_rate,
            format_id_int,
            format_flags,
            bytes_per_packet,
            frames_per_packet,
            bytes_per_frame,
            channels_per_frame,
            bits_per_channel,
            _reserved,
        ) = struct.unpack("<dLLLLLLLL", data[:40])
        format_id = capi.int_to_fourchar(format_id_int)
        return cls(
            sample_rate=sample_rate,
            format_id=format_id,
            format_flags=format_flags,
            bytes_per_packet=bytes_per_packet,
            frames_per_packet=frames_per_packet,
            bytes_per_frame=bytes_per_frame,
            channels_per_frame=channels_per_frame,
            bits_per_channel=bits_per_channel,
        )

    @classmethod
    def pcm(
        cls,
        sample_rate: float = 44100.0,
        channels: int = 2,
        bits: int = 16,
        is_float: bool = False,
    ) -> "AudioFormat":
        """Create a PCM AudioFormat with correctly computed derived fields.

        Args:
            sample_rate: Sample rate in Hz (default: 44100.0)
            channels: Number of channels (default: 2)
            bits: Bits per sample (default: 16)
            is_float: If True, create float format; otherwise signed integer

        Returns:
            AudioFormat with all ASBD fields correctly computed
        """
        bytes_per_sample = bits // 8
        bytes_per_frame = bytes_per_sample * channels
        flags = 0
        if is_float:
            flags |= 1  # kAudioFormatFlagIsFloat
        else:
            flags |= 4 | 2  # kAudioFormatFlagIsPacked | kAudioFormatFlagIsSignedInteger
        return cls(
            sample_rate=sample_rate,
            format_id="lpcm",
            format_flags=flags,
            bytes_per_packet=bytes_per_frame,
            frames_per_packet=1,
            bytes_per_frame=bytes_per_frame,
            channels_per_frame=channels,
            bits_per_channel=bits,
        )

    @property
    def is_pcm(self) -> bool:
        """Check if this is a PCM format"""
        return self.format_id == "lpcm"

    @property
    def is_stereo(self) -> bool:
        """Check if this is stereo (2 channels)"""
        return self.channels_per_frame == 2

    @property
    def is_mono(self) -> bool:
        """Check if this is mono (1 channel)"""
        return self.channels_per_frame == 1

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary format for functional API"""
        format_id_int = (
            capi.fourchar_to_int(self.format_id)
            if isinstance(self.format_id, str)
            else self.format_id
        )

        return {
            "sample_rate": self.sample_rate,
            "format_id": format_id_int,
            "format_flags": self.format_flags,
            "bytes_per_packet": self.bytes_per_packet,
            "frames_per_packet": self.frames_per_packet,
            "bytes_per_frame": self.bytes_per_frame,
            "channels_per_frame": self.channels_per_frame,
            "bits_per_channel": self.bits_per_channel,
        }

    def to_numpy_dtype(self) -> "np.dtype[Any]":
        """
        Convert audio format to NumPy dtype for audio data arrays.

        Returns:
            NumPy dtype object suitable for audio data representation

        Raises:
            ImportError: If NumPy is not available
            ValueError: If format cannot be converted to NumPy dtype
        """
        if not NUMPY_AVAILABLE:
            raise ImportError(
                "NumPy is not available. Install numpy to use this feature."
            )

        # Handle PCM formats
        if self.is_pcm:
            # Check if float or integer
            is_float = bool(self.format_flags & 1)  # kAudioFormatFlagIsFloat
            is_signed = not bool(
                self.format_flags & 2
            )  # kAudioFormatFlagIsSignedInteger

            if is_float:
                if self.bits_per_channel == 32:
                    return np.dtype(np.float32)
                elif self.bits_per_channel == 64:
                    return np.dtype(np.float64)
                else:
                    raise ValueError(
                        f"Unsupported float bit depth: {self.bits_per_channel}"
                    )
            else:
                # Integer formats
                if self.bits_per_channel == 8:
                    return np.dtype(np.int8 if is_signed else np.uint8)
                elif self.bits_per_channel == 16:
                    return np.dtype(np.int16)
                elif self.bits_per_channel == 24:
                    # 24-bit audio is typically padded to 32-bit
                    return np.dtype(np.int32)
                elif self.bits_per_channel == 32:
                    return np.dtype(np.int32)
                else:
                    raise ValueError(
                        f"Unsupported integer bit depth: {self.bits_per_channel}"
                    )
        else:
            raise ValueError(
                f"Cannot convert non-PCM format '{self.format_id}' to NumPy dtype"
            )

    def __repr__(self) -> str:
        return (
            f"AudioFormat({self.sample_rate}Hz, {self.format_id}, "
            f"channels={self.channels_per_frame}, bits={self.bits_per_channel})"
        )

sample_rate = sample_rate instance-attribute

format_id = format_id instance-attribute

format_flags = format_flags instance-attribute

bytes_per_packet = bytes_per_packet instance-attribute

frames_per_packet = frames_per_packet instance-attribute

bytes_per_frame = bytes_per_frame instance-attribute

channels_per_frame = channels_per_frame instance-attribute

bits_per_channel = bits_per_channel instance-attribute

is_pcm property

Check if this is a PCM format

is_stereo property

Check if this is stereo (2 channels)

is_mono property

Check if this is mono (1 channel)

__init__(sample_rate, format_id, format_flags=0, bytes_per_packet=0, frames_per_packet=0, bytes_per_frame=0, channels_per_frame=2, bits_per_channel=16)

Source code in src/coremusic/audio/core.py
def __init__(
    self,
    sample_rate: float,
    format_id: str,
    format_flags: int = 0,
    bytes_per_packet: int = 0,
    frames_per_packet: int = 0,
    bytes_per_frame: int = 0,
    channels_per_frame: int = 2,
    bits_per_channel: int = 16,
):
    self.sample_rate = sample_rate
    self.format_id = format_id
    self.format_flags = format_flags
    self.bytes_per_packet = bytes_per_packet
    self.frames_per_packet = frames_per_packet
    self.bytes_per_frame = bytes_per_frame
    self.channels_per_frame = channels_per_frame
    self.bits_per_channel = bits_per_channel

from_asbd_bytes(data) classmethod

Parse an AudioStreamBasicDescription (40 bytes) into an AudioFormat.

Parameters:

Name Type Description Default
data bytes

Raw ASBD bytes (at least 40 bytes)

required

Returns:

Type Description
'AudioFormat'

AudioFormat with parsed fields

Raises:

Type Description
ValueError

If data is too short

Source code in src/coremusic/audio/core.py
@classmethod
def from_asbd_bytes(cls, data: bytes) -> "AudioFormat":
    """Parse an AudioStreamBasicDescription (40 bytes) into an AudioFormat.

    Args:
        data: Raw ASBD bytes (at least 40 bytes)

    Returns:
        AudioFormat with parsed fields

    Raises:
        ValueError: If data is too short
    """
    if len(data) < 40:
        raise ValueError(f"ASBD data too short: {len(data)} bytes (need 40)")
    (
        sample_rate,
        format_id_int,
        format_flags,
        bytes_per_packet,
        frames_per_packet,
        bytes_per_frame,
        channels_per_frame,
        bits_per_channel,
        _reserved,
    ) = struct.unpack("<dLLLLLLLL", data[:40])
    format_id = capi.int_to_fourchar(format_id_int)
    return cls(
        sample_rate=sample_rate,
        format_id=format_id,
        format_flags=format_flags,
        bytes_per_packet=bytes_per_packet,
        frames_per_packet=frames_per_packet,
        bytes_per_frame=bytes_per_frame,
        channels_per_frame=channels_per_frame,
        bits_per_channel=bits_per_channel,
    )

pcm(sample_rate=44100.0, channels=2, bits=16, is_float=False) classmethod

Create a PCM AudioFormat with correctly computed derived fields.

Parameters:

Name Type Description Default
sample_rate float

Sample rate in Hz (default: 44100.0)

44100.0
channels int

Number of channels (default: 2)

2
bits int

Bits per sample (default: 16)

16
is_float bool

If True, create float format; otherwise signed integer

False

Returns:

Type Description
'AudioFormat'

AudioFormat with all ASBD fields correctly computed

Source code in src/coremusic/audio/core.py
@classmethod
def pcm(
    cls,
    sample_rate: float = 44100.0,
    channels: int = 2,
    bits: int = 16,
    is_float: bool = False,
) -> "AudioFormat":
    """Create a PCM AudioFormat with correctly computed derived fields.

    Args:
        sample_rate: Sample rate in Hz (default: 44100.0)
        channels: Number of channels (default: 2)
        bits: Bits per sample (default: 16)
        is_float: If True, create float format; otherwise signed integer

    Returns:
        AudioFormat with all ASBD fields correctly computed
    """
    bytes_per_sample = bits // 8
    bytes_per_frame = bytes_per_sample * channels
    flags = 0
    if is_float:
        flags |= 1  # kAudioFormatFlagIsFloat
    else:
        flags |= 4 | 2  # kAudioFormatFlagIsPacked | kAudioFormatFlagIsSignedInteger
    return cls(
        sample_rate=sample_rate,
        format_id="lpcm",
        format_flags=flags,
        bytes_per_packet=bytes_per_frame,
        frames_per_packet=1,
        bytes_per_frame=bytes_per_frame,
        channels_per_frame=channels,
        bits_per_channel=bits,
    )

to_dict()

Convert to dictionary format for functional API

Source code in src/coremusic/audio/core.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary format for functional API"""
    format_id_int = (
        capi.fourchar_to_int(self.format_id)
        if isinstance(self.format_id, str)
        else self.format_id
    )

    return {
        "sample_rate": self.sample_rate,
        "format_id": format_id_int,
        "format_flags": self.format_flags,
        "bytes_per_packet": self.bytes_per_packet,
        "frames_per_packet": self.frames_per_packet,
        "bytes_per_frame": self.bytes_per_frame,
        "channels_per_frame": self.channels_per_frame,
        "bits_per_channel": self.bits_per_channel,
    }

to_numpy_dtype()

Convert audio format to NumPy dtype for audio data arrays.

Returns:

Type Description
'np.dtype[Any]'

NumPy dtype object suitable for audio data representation

Raises:

Type Description
ImportError

If NumPy is not available

ValueError

If format cannot be converted to NumPy dtype

Source code in src/coremusic/audio/core.py
def to_numpy_dtype(self) -> "np.dtype[Any]":
    """
    Convert audio format to NumPy dtype for audio data arrays.

    Returns:
        NumPy dtype object suitable for audio data representation

    Raises:
        ImportError: If NumPy is not available
        ValueError: If format cannot be converted to NumPy dtype
    """
    if not NUMPY_AVAILABLE:
        raise ImportError(
            "NumPy is not available. Install numpy to use this feature."
        )

    # Handle PCM formats
    if self.is_pcm:
        # Check if float or integer
        is_float = bool(self.format_flags & 1)  # kAudioFormatFlagIsFloat
        is_signed = not bool(
            self.format_flags & 2
        )  # kAudioFormatFlagIsSignedInteger

        if is_float:
            if self.bits_per_channel == 32:
                return np.dtype(np.float32)
            elif self.bits_per_channel == 64:
                return np.dtype(np.float64)
            else:
                raise ValueError(
                    f"Unsupported float bit depth: {self.bits_per_channel}"
                )
        else:
            # Integer formats
            if self.bits_per_channel == 8:
                return np.dtype(np.int8 if is_signed else np.uint8)
            elif self.bits_per_channel == 16:
                return np.dtype(np.int16)
            elif self.bits_per_channel == 24:
                # 24-bit audio is typically padded to 32-bit
                return np.dtype(np.int32)
            elif self.bits_per_channel == 32:
                return np.dtype(np.int32)
            else:
                raise ValueError(
                    f"Unsupported integer bit depth: {self.bits_per_channel}"
                )
    else:
        raise ValueError(
            f"Cannot convert non-PCM format '{self.format_id}' to NumPy dtype"
        )

__repr__()

Source code in src/coremusic/audio/core.py
def __repr__(self) -> str:
    return (
        f"AudioFormat({self.sample_rate}Hz, {self.format_id}, "
        f"channels={self.channels_per_frame}, bits={self.bits_per_channel})"
    )

AudioUnit Class

coremusic.audio.AudioUnit

Bases: CoreAudioObject

Audio unit for real-time audio processing

Source code in src/coremusic/audio/units.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class AudioUnit(capi.CoreAudioObject):
    """Audio unit for real-time audio processing"""

    def __init__(self, description: AudioComponentDescription):
        super().__init__()
        self._description = description
        self._is_initialized = False

    @classmethod
    def default_output(cls) -> "AudioUnit":
        """Create a default output AudioUnit"""
        desc = AudioComponentDescription(
            type="auou",  # kAudioUnitType_Output
            subtype="def ",  # kAudioUnitSubType_DefaultOutput
            manufacturer="appl",  # kAudioUnitManufacturer_Apple
        )
        component = AudioComponent.find_next(desc)
        if component is None:
            raise AudioUnitError("Default output AudioUnit not found")
        return component.create_instance()

    def initialize(self) -> None:
        """Initialize the AudioUnit"""
        self._ensure_not_disposed()
        if not self._is_initialized:
            try:
                capi.audio_unit_initialize(self.object_id)
                self._is_initialized = True
            except Exception as e:
                raise AudioUnitError(f"Failed to initialize: {e}")

    def uninitialize(self) -> None:
        """Uninitialize the AudioUnit"""
        if self._is_initialized:
            try:
                capi.audio_unit_uninitialize(self.object_id)
            except Exception as e:
                raise AudioUnitError(f"Failed to uninitialize: {e}")
            finally:
                self._is_initialized = False

    def start(self) -> None:
        """Start the AudioUnit output"""
        self._ensure_not_disposed()
        if not self._is_initialized:
            raise AudioUnitError("AudioUnit not initialized")
        try:
            capi.audio_output_unit_start(self.object_id)
        except Exception as e:
            raise AudioUnitError(f"Failed to start: {e}")

    def stop(self) -> None:
        """Stop the AudioUnit output"""
        self._ensure_not_disposed()
        try:
            capi.audio_output_unit_stop(self.object_id)
        except Exception as e:
            raise AudioUnitError(f"Failed to stop: {e}")

    def get_property(self, property_id: int, scope: int, element: int) -> bytes:
        """Get a property from the AudioUnit"""
        self._ensure_not_disposed()
        try:
            return capi.audio_unit_get_property(
                self.object_id, property_id, scope, element
            )
        except Exception as e:
            raise AudioUnitError(f"Failed to get property: {e}")

    def set_property(
        self, property_id: int, scope: int, element: int, data: bytes
    ) -> None:
        """Set a property on the AudioUnit"""
        self._ensure_not_disposed()
        try:
            capi.audio_unit_set_property(
                self.object_id, property_id, scope, element, data
            )
        except Exception as e:
            raise AudioUnitError(f"Failed to set property: {e}")

    # ========================================================================
    # Advanced AudioUnit Features
    # ========================================================================

    def get_stream_format(self, scope: str = "output", element: int = 0) -> AudioFormat:
        """Get the stream format for a specific scope and element

        Args:
            scope: 'input', 'output', or 'global' (default: 'output')
            element: Element index (default: 0)

        Returns:
            AudioFormat object with the current stream format

        Raises:
            AudioUnitError: If scope is invalid or getting format fails

        Example::

            # Get the output format of an AudioUnit
            with AudioUnit(component) as unit:
                format = unit.get_stream_format('output', 0)
                print(f"Sample rate: {format.sample_rate}")
                print(f"Channels: {format.channels_per_frame}")
                print(f"Bits: {format.bits_per_channel}")
        """
        self._ensure_not_disposed()

        # Map scope name to constant
        scope_map = {
            "input": capi.get_audio_unit_scope_input(),
            "output": capi.get_audio_unit_scope_output(),
            "global": capi.get_audio_unit_scope_global(),
        }
        scope_val = scope_map.get(scope.lower())
        if scope_val is None:
            raise AudioUnitError(f"Invalid scope: {scope}")

        try:
            asbd_data = self.get_property(
                capi.get_audio_unit_property_stream_format(), scope_val, element
            )
            return AudioFormat.from_asbd_bytes(asbd_data)
        except ValueError as e:
            raise AudioUnitError(f"Invalid ASBD data: {e}")
        except Exception as e:
            raise AudioUnitError(f"Failed to get stream format: {e}")

    def set_stream_format(
        self, format: AudioFormat, scope: str = "output", element: int = 0
    ) -> None:
        """Set the stream format for a specific scope and element

        Args:
            format: AudioFormat object with desired format
            scope: 'input', 'output', or 'global' (default: 'output')
            element: Element index (must be non-negative, default: 0)

        Raises:
            TypeError: If format is not an AudioFormat
            ValueError: If scope is invalid or element is negative
            AudioUnitError: If setting format fails

        Example::

            from coremusic.audio import AudioFormat, AudioUnit

            # Create a stereo 44.1kHz 16-bit PCM format
            format = AudioFormat(
                sample_rate=44100.0,
                format_id='lpcm',
                channels_per_frame=2,
                bits_per_channel=16
            )

            # Set the input format on an effect unit
            with AudioUnit(effect_component) as effect:
                effect.set_stream_format(format, 'input', 0)
        """
        if not isinstance(format, AudioFormat):
            raise TypeError(f"format must be AudioFormat, got {type(format).__name__}")
        if element < 0:
            raise ValueError(f"element must be non-negative, got {element}")

        self._ensure_not_disposed()

        # Map scope name to constant
        scope_map = {
            "input": capi.get_audio_unit_scope_input(),
            "output": capi.get_audio_unit_scope_output(),
            "global": capi.get_audio_unit_scope_global(),
        }
        scope_val = scope_map.get(scope.lower())
        if scope_val is None:
            raise AudioUnitError(f"Invalid scope: {scope}")

        try:
            # Convert format_id to integer
            format_id_int = (
                capi.fourchar_to_int(format.format_id)
                if isinstance(format.format_id, str)
                else format.format_id
            )

            # Pack AudioStreamBasicDescription
            asbd_data = struct.pack(
                "<dLLLLLLLL",
                format.sample_rate,
                format_id_int,
                format.format_flags,
                format.bytes_per_packet,
                format.frames_per_packet,
                format.bytes_per_frame,
                format.channels_per_frame,
                format.bits_per_channel,
                0,  # reserved
            )

            self.set_property(
                capi.get_audio_unit_property_stream_format(),
                scope_val,
                element,
                asbd_data,
            )
        except Exception as e:
            raise AudioUnitError(f"Failed to set stream format: {e}")

    @property
    def sample_rate(self) -> float:
        """Get the sample rate (kAudioUnitProperty_SampleRate on global scope)"""
        self._ensure_not_disposed()
        try:
            data = self.get_property(
                2, capi.get_audio_unit_scope_global(), 0
            )  # kAudioUnitProperty_SampleRate = 2
            if len(data) >= 8:
                result: float = struct.unpack("<d", data[:8])[0]
                return result
            return 0.0
        except Exception:
            # Fallback to stream format sample rate
            try:
                return self.get_stream_format("output", 0).sample_rate
            except Exception:
                return 0.0

    @sample_rate.setter
    def sample_rate(self, rate: float) -> None:
        """Set the sample rate (kAudioUnitProperty_SampleRate)"""
        self._ensure_not_disposed()
        data = struct.pack("<d", rate)

        # Try input scope first (for output units, input scope is configurable before init)
        try:
            self.set_property(
                2, capi.get_audio_unit_scope_input(), 0, data
            )  # kAudioUnitProperty_SampleRate = 2
            return
        except Exception:
            pass

        # Try output scope (may work after initialization)
        try:
            self.set_property(2, capi.get_audio_unit_scope_output(), 0, data)
            return
        except Exception:
            pass

        # Try global scope as fallback
        try:
            self.set_property(2, capi.get_audio_unit_scope_global(), 0, data)
            return
        except Exception:
            pass

        # Last resort: try to set via stream format on input scope
        try:
            format_data = self.get_stream_format("input", 0)
            format_data.sample_rate = rate
            self.set_stream_format(format_data, "input", 0)
        except Exception as e:
            raise AudioUnitError(f"Failed to set sample rate: {e}")

    @property
    def latency(self) -> float:
        """Get the latency in seconds (kAudioUnitProperty_Latency)"""
        self._ensure_not_disposed()
        try:
            data = self.get_property(
                12, capi.get_audio_unit_scope_global(), 0
            )  # kAudioUnitProperty_Latency = 12
            if len(data) >= 8:
                result: float = struct.unpack("<d", data[:8])[0]
                return result
            return 0.0
        except Exception:
            return 0.0

    @property
    def cpu_load(self) -> float:
        """Get the CPU load as a fraction (0.0 to 1.0) (kAudioUnitProperty_CPULoad)"""
        self._ensure_not_disposed()
        try:
            data = self.get_property(
                6, capi.get_audio_unit_scope_global(), 0
            )  # kAudioUnitProperty_CPULoad = 6
            if len(data) >= 4:
                result: float = struct.unpack("<f", data[:4])[0]
                return result
            return 0.0
        except Exception:
            return 0.0

    @property
    def max_frames_per_slice(self) -> int:
        """Get the maximum frames per slice (kAudioUnitProperty_MaximumFramesPerSlice)"""
        self._ensure_not_disposed()
        try:
            data = self.get_property(
                14, capi.get_audio_unit_scope_global(), 0
            )  # kAudioUnitProperty_MaximumFramesPerSlice = 14
            if len(data) >= 4:
                result: int = struct.unpack("<L", data[:4])[0]
                return result
            return 0
        except Exception:
            return 0

    @max_frames_per_slice.setter
    def max_frames_per_slice(self, frames: int) -> None:
        """Set the maximum frames per slice (kAudioUnitProperty_MaximumFramesPerSlice)"""
        self._ensure_not_disposed()
        try:
            data = struct.pack("<L", frames)
            self.set_property(
                14, capi.get_audio_unit_scope_global(), 0, data
            )  # kAudioUnitProperty_MaximumFramesPerSlice = 14
        except Exception as e:
            raise AudioUnitError(f"Failed to set max frames per slice: {e}")

    def get_parameter_list(self, scope: str = "global") -> list[int]:
        """Get list of available parameter IDs (kAudioUnitProperty_ParameterList)

        Args:
            scope: 'input', 'output', or 'global' (default: 'global')

        Returns:
            List of parameter IDs
        """
        self._ensure_not_disposed()

        scope_map = {
            "input": capi.get_audio_unit_scope_input(),
            "output": capi.get_audio_unit_scope_output(),
            "global": capi.get_audio_unit_scope_global(),
        }
        scope_val = scope_map.get(scope.lower())
        if scope_val is None:
            raise AudioUnitError(f"Invalid scope: {scope}")

        try:
            data = self.get_property(
                3, scope_val, 0
            )  # kAudioUnitProperty_ParameterList = 3
            # Data is an array of UInt32 parameter IDs
            param_count = len(data) // 4
            if param_count > 0:
                return list(struct.unpack(f"<{param_count}L", data[: param_count * 4]))
            return []
        except Exception:
            return []

    def render(self, num_frames: int, timestamp: int | None = None) -> bytes:
        """Render audio frames (for offline processing)

        Args:
            num_frames: Number of frames to render
            timestamp: Optional timestamp (default: None uses current time)

        Returns:
            Rendered audio data as bytes

        Note: This is a simplified render method for offline processing.
        For real-time audio, use render callbacks with the audio player infrastructure.
        """
        # This would require implementing AudioUnitRender which needs more infrastructure
        raise NotImplementedError(
            "Direct rendering not yet implemented. "
            "Use the audio player infrastructure with render callbacks for real-time audio."
        )

    def __repr__(self) -> str:
        if self.is_disposed:
            return f"AudioUnit({self._description.subtype!r}, disposed)"
        status = "initialized" if self._is_initialized else "uninitialized"
        return f"AudioUnit({self._description.subtype!r}, {status})"

    def __enter__(self) -> "AudioUnit":
        self.initialize()
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        self.uninitialize()
        self.dispose()

    @property
    def is_initialized(self) -> bool:
        return self._is_initialized

    def dispose(self) -> None:
        """Dispose of the AudioUnit"""
        if not self.is_disposed:
            if self._is_initialized:
                try:
                    capi.audio_unit_uninitialize(self.object_id)
                except Exception:
                    pass  # Best effort cleanup
                finally:
                    self._is_initialized = False

            if self.object_id != 0:
                try:
                    capi.audio_component_instance_dispose(self.object_id)
                except Exception:
                    pass  # Best effort cleanup

            super().dispose()

_description = description instance-attribute

_is_initialized = False instance-attribute

sample_rate property writable

Get the sample rate (kAudioUnitProperty_SampleRate on global scope)

latency property

Get the latency in seconds (kAudioUnitProperty_Latency)

cpu_load property

Get the CPU load as a fraction (0.0 to 1.0) (kAudioUnitProperty_CPULoad)

max_frames_per_slice property writable

Get the maximum frames per slice (kAudioUnitProperty_MaximumFramesPerSlice)

is_initialized property

__init__(description)

Source code in src/coremusic/audio/units.py
def __init__(self, description: AudioComponentDescription):
    super().__init__()
    self._description = description
    self._is_initialized = False

default_output() classmethod

Create a default output AudioUnit

Source code in src/coremusic/audio/units.py
@classmethod
def default_output(cls) -> "AudioUnit":
    """Create a default output AudioUnit"""
    desc = AudioComponentDescription(
        type="auou",  # kAudioUnitType_Output
        subtype="def ",  # kAudioUnitSubType_DefaultOutput
        manufacturer="appl",  # kAudioUnitManufacturer_Apple
    )
    component = AudioComponent.find_next(desc)
    if component is None:
        raise AudioUnitError("Default output AudioUnit not found")
    return component.create_instance()

initialize()

Initialize the AudioUnit

Source code in src/coremusic/audio/units.py
def initialize(self) -> None:
    """Initialize the AudioUnit"""
    self._ensure_not_disposed()
    if not self._is_initialized:
        try:
            capi.audio_unit_initialize(self.object_id)
            self._is_initialized = True
        except Exception as e:
            raise AudioUnitError(f"Failed to initialize: {e}")

uninitialize()

Uninitialize the AudioUnit

Source code in src/coremusic/audio/units.py
def uninitialize(self) -> None:
    """Uninitialize the AudioUnit"""
    if self._is_initialized:
        try:
            capi.audio_unit_uninitialize(self.object_id)
        except Exception as e:
            raise AudioUnitError(f"Failed to uninitialize: {e}")
        finally:
            self._is_initialized = False

start()

Start the AudioUnit output

Source code in src/coremusic/audio/units.py
def start(self) -> None:
    """Start the AudioUnit output"""
    self._ensure_not_disposed()
    if not self._is_initialized:
        raise AudioUnitError("AudioUnit not initialized")
    try:
        capi.audio_output_unit_start(self.object_id)
    except Exception as e:
        raise AudioUnitError(f"Failed to start: {e}")

stop()

Stop the AudioUnit output

Source code in src/coremusic/audio/units.py
def stop(self) -> None:
    """Stop the AudioUnit output"""
    self._ensure_not_disposed()
    try:
        capi.audio_output_unit_stop(self.object_id)
    except Exception as e:
        raise AudioUnitError(f"Failed to stop: {e}")

get_property(property_id, scope, element)

Get a property from the AudioUnit

Source code in src/coremusic/audio/units.py
def get_property(self, property_id: int, scope: int, element: int) -> bytes:
    """Get a property from the AudioUnit"""
    self._ensure_not_disposed()
    try:
        return capi.audio_unit_get_property(
            self.object_id, property_id, scope, element
        )
    except Exception as e:
        raise AudioUnitError(f"Failed to get property: {e}")

set_property(property_id, scope, element, data)

Set a property on the AudioUnit

Source code in src/coremusic/audio/units.py
def set_property(
    self, property_id: int, scope: int, element: int, data: bytes
) -> None:
    """Set a property on the AudioUnit"""
    self._ensure_not_disposed()
    try:
        capi.audio_unit_set_property(
            self.object_id, property_id, scope, element, data
        )
    except Exception as e:
        raise AudioUnitError(f"Failed to set property: {e}")

get_stream_format(scope='output', element=0)

Get the stream format for a specific scope and element

Parameters:

Name Type Description Default
scope str

'input', 'output', or 'global' (default: 'output')

'output'
element int

Element index (default: 0)

0

Returns:

Type Description
AudioFormat

AudioFormat object with the current stream format

Raises:

Type Description
AudioUnitError

If scope is invalid or getting format fails

Example::

# Get the output format of an AudioUnit
with AudioUnit(component) as unit:
    format = unit.get_stream_format('output', 0)
    print(f"Sample rate: {format.sample_rate}")
    print(f"Channels: {format.channels_per_frame}")
    print(f"Bits: {format.bits_per_channel}")
Source code in src/coremusic/audio/units.py
def get_stream_format(self, scope: str = "output", element: int = 0) -> AudioFormat:
    """Get the stream format for a specific scope and element

    Args:
        scope: 'input', 'output', or 'global' (default: 'output')
        element: Element index (default: 0)

    Returns:
        AudioFormat object with the current stream format

    Raises:
        AudioUnitError: If scope is invalid or getting format fails

    Example::

        # Get the output format of an AudioUnit
        with AudioUnit(component) as unit:
            format = unit.get_stream_format('output', 0)
            print(f"Sample rate: {format.sample_rate}")
            print(f"Channels: {format.channels_per_frame}")
            print(f"Bits: {format.bits_per_channel}")
    """
    self._ensure_not_disposed()

    # Map scope name to constant
    scope_map = {
        "input": capi.get_audio_unit_scope_input(),
        "output": capi.get_audio_unit_scope_output(),
        "global": capi.get_audio_unit_scope_global(),
    }
    scope_val = scope_map.get(scope.lower())
    if scope_val is None:
        raise AudioUnitError(f"Invalid scope: {scope}")

    try:
        asbd_data = self.get_property(
            capi.get_audio_unit_property_stream_format(), scope_val, element
        )
        return AudioFormat.from_asbd_bytes(asbd_data)
    except ValueError as e:
        raise AudioUnitError(f"Invalid ASBD data: {e}")
    except Exception as e:
        raise AudioUnitError(f"Failed to get stream format: {e}")

set_stream_format(format, scope='output', element=0)

Set the stream format for a specific scope and element

Parameters:

Name Type Description Default
format AudioFormat

AudioFormat object with desired format

required
scope str

'input', 'output', or 'global' (default: 'output')

'output'
element int

Element index (must be non-negative, default: 0)

0

Raises:

Type Description
TypeError

If format is not an AudioFormat

ValueError

If scope is invalid or element is negative

AudioUnitError

If setting format fails

Example::

from coremusic.audio import AudioFormat, AudioUnit

# Create a stereo 44.1kHz 16-bit PCM format
format = AudioFormat(
    sample_rate=44100.0,
    format_id='lpcm',
    channels_per_frame=2,
    bits_per_channel=16
)

# Set the input format on an effect unit
with AudioUnit(effect_component) as effect:
    effect.set_stream_format(format, 'input', 0)
Source code in src/coremusic/audio/units.py
def set_stream_format(
    self, format: AudioFormat, scope: str = "output", element: int = 0
) -> None:
    """Set the stream format for a specific scope and element

    Args:
        format: AudioFormat object with desired format
        scope: 'input', 'output', or 'global' (default: 'output')
        element: Element index (must be non-negative, default: 0)

    Raises:
        TypeError: If format is not an AudioFormat
        ValueError: If scope is invalid or element is negative
        AudioUnitError: If setting format fails

    Example::

        from coremusic.audio import AudioFormat, AudioUnit

        # Create a stereo 44.1kHz 16-bit PCM format
        format = AudioFormat(
            sample_rate=44100.0,
            format_id='lpcm',
            channels_per_frame=2,
            bits_per_channel=16
        )

        # Set the input format on an effect unit
        with AudioUnit(effect_component) as effect:
            effect.set_stream_format(format, 'input', 0)
    """
    if not isinstance(format, AudioFormat):
        raise TypeError(f"format must be AudioFormat, got {type(format).__name__}")
    if element < 0:
        raise ValueError(f"element must be non-negative, got {element}")

    self._ensure_not_disposed()

    # Map scope name to constant
    scope_map = {
        "input": capi.get_audio_unit_scope_input(),
        "output": capi.get_audio_unit_scope_output(),
        "global": capi.get_audio_unit_scope_global(),
    }
    scope_val = scope_map.get(scope.lower())
    if scope_val is None:
        raise AudioUnitError(f"Invalid scope: {scope}")

    try:
        # Convert format_id to integer
        format_id_int = (
            capi.fourchar_to_int(format.format_id)
            if isinstance(format.format_id, str)
            else format.format_id
        )

        # Pack AudioStreamBasicDescription
        asbd_data = struct.pack(
            "<dLLLLLLLL",
            format.sample_rate,
            format_id_int,
            format.format_flags,
            format.bytes_per_packet,
            format.frames_per_packet,
            format.bytes_per_frame,
            format.channels_per_frame,
            format.bits_per_channel,
            0,  # reserved
        )

        self.set_property(
            capi.get_audio_unit_property_stream_format(),
            scope_val,
            element,
            asbd_data,
        )
    except Exception as e:
        raise AudioUnitError(f"Failed to set stream format: {e}")

get_parameter_list(scope='global')

Get list of available parameter IDs (kAudioUnitProperty_ParameterList)

Parameters:

Name Type Description Default
scope str

'input', 'output', or 'global' (default: 'global')

'global'

Returns:

Type Description
list[int]

List of parameter IDs

Source code in src/coremusic/audio/units.py
def get_parameter_list(self, scope: str = "global") -> list[int]:
    """Get list of available parameter IDs (kAudioUnitProperty_ParameterList)

    Args:
        scope: 'input', 'output', or 'global' (default: 'global')

    Returns:
        List of parameter IDs
    """
    self._ensure_not_disposed()

    scope_map = {
        "input": capi.get_audio_unit_scope_input(),
        "output": capi.get_audio_unit_scope_output(),
        "global": capi.get_audio_unit_scope_global(),
    }
    scope_val = scope_map.get(scope.lower())
    if scope_val is None:
        raise AudioUnitError(f"Invalid scope: {scope}")

    try:
        data = self.get_property(
            3, scope_val, 0
        )  # kAudioUnitProperty_ParameterList = 3
        # Data is an array of UInt32 parameter IDs
        param_count = len(data) // 4
        if param_count > 0:
            return list(struct.unpack(f"<{param_count}L", data[: param_count * 4]))
        return []
    except Exception:
        return []

render(num_frames, timestamp=None)

Render audio frames (for offline processing)

Parameters:

Name Type Description Default
num_frames int

Number of frames to render

required
timestamp int | None

Optional timestamp (default: None uses current time)

None

Returns:

Type Description
bytes

Rendered audio data as bytes

Note: This is a simplified render method for offline processing. For real-time audio, use render callbacks with the audio player infrastructure.

Source code in src/coremusic/audio/units.py
def render(self, num_frames: int, timestamp: int | None = None) -> bytes:
    """Render audio frames (for offline processing)

    Args:
        num_frames: Number of frames to render
        timestamp: Optional timestamp (default: None uses current time)

    Returns:
        Rendered audio data as bytes

    Note: This is a simplified render method for offline processing.
    For real-time audio, use render callbacks with the audio player infrastructure.
    """
    # This would require implementing AudioUnitRender which needs more infrastructure
    raise NotImplementedError(
        "Direct rendering not yet implemented. "
        "Use the audio player infrastructure with render callbacks for real-time audio."
    )

__repr__()

Source code in src/coremusic/audio/units.py
def __repr__(self) -> str:
    if self.is_disposed:
        return f"AudioUnit({self._description.subtype!r}, disposed)"
    status = "initialized" if self._is_initialized else "uninitialized"
    return f"AudioUnit({self._description.subtype!r}, {status})"

__enter__()

Source code in src/coremusic/audio/units.py
def __enter__(self) -> "AudioUnit":
    self.initialize()
    return self

__exit__(exc_type, exc_val, exc_tb)

Source code in src/coremusic/audio/units.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    self.uninitialize()
    self.dispose()

dispose()

Dispose of the AudioUnit

Source code in src/coremusic/audio/units.py
def dispose(self) -> None:
    """Dispose of the AudioUnit"""
    if not self.is_disposed:
        if self._is_initialized:
            try:
                capi.audio_unit_uninitialize(self.object_id)
            except Exception:
                pass  # Best effort cleanup
            finally:
                self._is_initialized = False

        if self.object_id != 0:
            try:
                capi.audio_component_instance_dispose(self.object_id)
            except Exception:
                pass  # Best effort cleanup

        super().dispose()

AudioQueue Class

coremusic.audio.AudioQueue

Bases: CoreAudioObject

Audio queue for buffered playback and recording

Source code in src/coremusic/audio/core.py
class AudioQueue(capi.CoreAudioObject):
    """Audio queue for buffered playback and recording"""

    def __init__(self, audio_format: AudioFormat):
        super().__init__()
        self._format = audio_format
        self._buffers: list[AudioBuffer] = []

    @classmethod
    def new_output(cls, audio_format: AudioFormat) -> "AudioQueue":
        """Create a new output audio queue"""
        queue = cls(audio_format)
        try:
            queue_id = capi.audio_queue_new_output(audio_format.to_dict())
            queue._set_object_id(queue_id)
        except Exception as e:
            raise AudioQueueError(f"Failed to create output queue: {e}")
        return queue

    def allocate_buffer(self, buffer_size: int) -> AudioBuffer:
        """Allocate an audio buffer"""
        self._ensure_not_disposed()
        try:
            buffer_id = capi.audio_queue_allocate_buffer(self.object_id, buffer_size)
            buffer = AudioBuffer(self.object_id, buffer_size)
            buffer._set_object_id(buffer_id)
            self._buffers.append(buffer)
            return buffer
        except Exception as e:
            raise AudioQueueError(f"Failed to allocate buffer: {e}")

    def enqueue_buffer(self, buffer: AudioBuffer) -> None:
        """Enqueue an audio buffer"""
        self._ensure_not_disposed()
        try:
            capi.audio_queue_enqueue_buffer(self.object_id, buffer.object_id)
        except Exception as e:
            raise AudioQueueError(f"Failed to enqueue buffer: {e}")

    def start(self) -> None:
        """Start the audio queue"""
        self._ensure_not_disposed()
        try:
            capi.audio_queue_start(self.object_id)
        except Exception as e:
            raise AudioQueueError(f"Failed to start queue: {e}")

    def stop(self, immediate: bool = True) -> None:
        """Stop the audio queue"""
        self._ensure_not_disposed()
        try:
            capi.audio_queue_stop(self.object_id, immediate)
        except Exception as e:
            raise AudioQueueError(f"Failed to stop queue: {e}")

    def __repr__(self) -> str:
        if self.is_disposed:
            return "AudioQueue(disposed)"
        return f"AudioQueue({self._format.sample_rate}Hz, {self._format.channels_per_frame}ch, buffers={len(self._buffers)})"

    def dispose(self, immediate: bool = True) -> None:
        """Dispose of the audio queue"""
        if not self.is_disposed:
            try:
                capi.audio_queue_dispose(self.object_id, immediate)
            except Exception as e:
                raise AudioQueueError(f"Failed to dispose queue: {e}")
            finally:
                # Clear buffer references and call base dispose
                self._buffers.clear()
                super().dispose()

_format = audio_format instance-attribute

_buffers = [] instance-attribute

__init__(audio_format)

Source code in src/coremusic/audio/core.py
def __init__(self, audio_format: AudioFormat):
    super().__init__()
    self._format = audio_format
    self._buffers: list[AudioBuffer] = []

new_output(audio_format) classmethod

Create a new output audio queue

Source code in src/coremusic/audio/core.py
@classmethod
def new_output(cls, audio_format: AudioFormat) -> "AudioQueue":
    """Create a new output audio queue"""
    queue = cls(audio_format)
    try:
        queue_id = capi.audio_queue_new_output(audio_format.to_dict())
        queue._set_object_id(queue_id)
    except Exception as e:
        raise AudioQueueError(f"Failed to create output queue: {e}")
    return queue

allocate_buffer(buffer_size)

Allocate an audio buffer

Source code in src/coremusic/audio/core.py
def allocate_buffer(self, buffer_size: int) -> AudioBuffer:
    """Allocate an audio buffer"""
    self._ensure_not_disposed()
    try:
        buffer_id = capi.audio_queue_allocate_buffer(self.object_id, buffer_size)
        buffer = AudioBuffer(self.object_id, buffer_size)
        buffer._set_object_id(buffer_id)
        self._buffers.append(buffer)
        return buffer
    except Exception as e:
        raise AudioQueueError(f"Failed to allocate buffer: {e}")

enqueue_buffer(buffer)

Enqueue an audio buffer

Source code in src/coremusic/audio/core.py
def enqueue_buffer(self, buffer: AudioBuffer) -> None:
    """Enqueue an audio buffer"""
    self._ensure_not_disposed()
    try:
        capi.audio_queue_enqueue_buffer(self.object_id, buffer.object_id)
    except Exception as e:
        raise AudioQueueError(f"Failed to enqueue buffer: {e}")

start()

Start the audio queue

Source code in src/coremusic/audio/core.py
def start(self) -> None:
    """Start the audio queue"""
    self._ensure_not_disposed()
    try:
        capi.audio_queue_start(self.object_id)
    except Exception as e:
        raise AudioQueueError(f"Failed to start queue: {e}")

stop(immediate=True)

Stop the audio queue

Source code in src/coremusic/audio/core.py
def stop(self, immediate: bool = True) -> None:
    """Stop the audio queue"""
    self._ensure_not_disposed()
    try:
        capi.audio_queue_stop(self.object_id, immediate)
    except Exception as e:
        raise AudioQueueError(f"Failed to stop queue: {e}")

__repr__()

Source code in src/coremusic/audio/core.py
def __repr__(self) -> str:
    if self.is_disposed:
        return "AudioQueue(disposed)"
    return f"AudioQueue({self._format.sample_rate}Hz, {self._format.channels_per_frame}ch, buffers={len(self._buffers)})"

dispose(immediate=True)

Dispose of the audio queue

Source code in src/coremusic/audio/core.py
def dispose(self, immediate: bool = True) -> None:
    """Dispose of the audio queue"""
    if not self.is_disposed:
        try:
            capi.audio_queue_dispose(self.object_id, immediate)
        except Exception as e:
            raise AudioQueueError(f"Failed to dispose queue: {e}")
        finally:
            # Clear buffer references and call base dispose
            self._buffers.clear()
            super().dispose()

AudioConverter Class

coremusic.audio.AudioConverter

Bases: CoreAudioObject

Audio format converter for sample rate and format conversion

Provides high-level interface for converting between audio formats, sample rates, bit depths, and channel configurations.

Source code in src/coremusic/audio/core.py
class AudioConverter(capi.CoreAudioObject):
    """Audio format converter for sample rate and format conversion

    Provides high-level interface for converting between audio formats,
    sample rates, bit depths, and channel configurations.
    """

    def __init__(self, source_format: AudioFormat, dest_format: AudioFormat):
        """Create an AudioConverter

        Args:
            source_format: Source audio format
            dest_format: Destination audio format

        Raises:
            AudioConverterError: If converter creation fails
        """
        super().__init__()
        self._source_format = source_format
        self._dest_format = dest_format

        try:
            converter_id = capi.audio_converter_new(
                source_format.to_dict(), dest_format.to_dict()
            )
            self._set_object_id(converter_id)
        except Exception as e:
            raise AudioConverterError(f"Failed to create converter: {e}")

    @property
    def source_format(self) -> AudioFormat:
        """Get source audio format"""
        return self._source_format

    @property
    def dest_format(self) -> AudioFormat:
        """Get destination audio format"""
        return self._dest_format

    def convert(self, audio_data: bytes) -> bytes:
        """Convert audio data from source to destination format

        This method uses the simple buffer-based API (AudioConverterConvertBuffer)
        which only supports conversions where the input/output sizes are predictable.
        For complex conversions (sample rate, bit depth), use convert_with_callback().

        Args:
            audio_data: Input audio data in source format (raw PCM samples)

        Returns:
            Converted audio data in destination format

        Raises:
            AudioConverterError: If conversion fails

        Example:

            # Convert stereo to mono (simple format conversion)
            source = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
            dest = AudioFormat(44100.0, 'lpcm', channels_per_frame=1, bits_per_channel=16)

            with AudioConverter(source, dest) as converter:
                stereo_data = b'\\x00\\x00\\xff\\xff' * 1024  # Raw 16-bit stereo samples
                mono_data = converter.convert(stereo_data)
        """
        self._ensure_not_disposed()
        try:
            return capi.audio_converter_convert_buffer(self.object_id, audio_data)
        except Exception as e:
            raise AudioConverterError(f"Failed to convert audio: {e}")

    def convert_with_callback(
        self,
        input_data: bytes,
        input_packet_count: int,
        output_packet_count: int | None = None,
    ) -> bytes:
        """Convert audio using callback-based API for complex conversions

        This method supports all types of conversions including:
        - Sample rate changes (e.g., 44.1kHz -> 48kHz)
        - Bit depth changes (e.g., 16-bit -> 24-bit)
        - Channel count changes (stereo <-> mono)
        - Combinations of the above

        Args:
            input_data: Input audio data as bytes
            input_packet_count: Number of packets in input data
            output_packet_count: Expected output packets (auto-calculated if None)

        Returns:
            Converted audio data as bytes

        Raises:
            AudioConverterError: If conversion fails

        Example:

            # Convert 44.1kHz to 48kHz
            source_format = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
            dest_format = AudioFormat(48000.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)

            with AudioConverter(source_format, dest_format) as converter:
                # Read input data
                with AudioFile("input_44100.wav") as af:
                    input_data, packet_count = af.read_packets(0, 999999999)

                # Convert
                output_data = converter.convert_with_callback(input_data, packet_count)

                # Write output
                with ExtendedAudioFile.create("output_48000.wav", 'WAVE', dest_format) as out:
                    num_frames = len(output_data) // dest_format.bytes_per_frame
                    out.write(num_frames, output_data)
        """
        if input_packet_count <= 0:
            raise ValueError(
                f"input_packet_count must be positive, got {input_packet_count}"
            )
        if output_packet_count is not None and output_packet_count <= 0:
            raise ValueError(
                f"output_packet_count must be positive, got {output_packet_count}"
            )
        if not isinstance(input_data, (bytes, bytearray)):
            raise TypeError(
                f"input_data must be bytes or bytearray, got {type(input_data).__name__}"
            )
        if len(input_data) == 0:
            raise ValueError("input_data cannot be empty")

        self._ensure_not_disposed()

        # Auto-calculate output packet count if not provided
        if output_packet_count is None:
            # Estimate based on sample rate ratio
            rate_ratio = self._dest_format.sample_rate / self._source_format.sample_rate
            output_packet_count = int(
                input_packet_count * rate_ratio * 1.1
            )  # 10% extra

        try:
            output_data, actual_packets = capi.audio_converter_fill_complex_buffer(
                self.object_id,
                input_data,
                input_packet_count,
                output_packet_count,
                self._source_format.to_dict(),
            )
            return output_data
        except Exception as e:
            raise AudioConverterError(f"Failed to convert audio: {e}")

    def get_property(self, property_id: int) -> bytes:
        """Get a property from the converter

        Args:
            property_id: Property ID

        Returns:
            Property data as bytes

        Raises:
            AudioConverterError: If getting property fails
        """
        self._ensure_not_disposed()
        try:
            return capi.audio_converter_get_property(self.object_id, property_id)
        except Exception as e:
            raise AudioConverterError(f"Failed to get property: {e}")

    def set_property(self, property_id: int, data: bytes) -> None:
        """Set a property on the converter

        Args:
            property_id: Property ID (from capi.get_audio_converter_property_*())
            data: Property data as bytes (use struct.pack for binary encoding)

        Raises:
            AudioConverterError: If setting property fails

        Example:

            import struct
            import coremusic as cm

            converter = AudioConverter(source_fmt, dest_fmt)

            # Set bitrate to 128 kbps (requires UInt32)
            bitrate_prop = cm.capi.get_audio_converter_property_bit_rate()
            converter.set_property(bitrate_prop, struct.pack('<I', 128000))

            # Set quality (requires UInt32, 0=lowest, 127=highest)
            quality_prop = cm.capi.get_audio_converter_property_quality()
            converter.set_property(quality_prop, struct.pack('<I', 127))
        """
        self._ensure_not_disposed()
        try:
            capi.audio_converter_set_property(self.object_id, property_id, data)
        except Exception as e:
            raise AudioConverterError(f"Failed to set property: {e}")

    def reset(self) -> None:
        """Reset the converter to its initial state"""
        self._ensure_not_disposed()
        try:
            capi.audio_converter_reset(self.object_id)
        except Exception as e:
            raise AudioConverterError(f"Failed to reset converter: {e}")

    def dispose(self) -> None:
        """Dispose of the audio converter"""
        if not self.is_disposed:
            try:
                capi.audio_converter_dispose(self.object_id)
            except Exception:
                pass  # Best effort cleanup
            finally:
                super().dispose()

    def __enter__(self) -> "AudioConverter":
        """Enter context manager"""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit context manager and dispose"""
        self.dispose()

    def __repr__(self) -> str:
        return f"AudioConverter({self._source_format} -> {self._dest_format})"

_source_format = source_format instance-attribute

_dest_format = dest_format instance-attribute

source_format property

Get source audio format

dest_format property

Get destination audio format

__init__(source_format, dest_format)

Create an AudioConverter

Parameters:

Name Type Description Default
source_format AudioFormat

Source audio format

required
dest_format AudioFormat

Destination audio format

required

Raises:

Type Description
AudioConverterError

If converter creation fails

Source code in src/coremusic/audio/core.py
def __init__(self, source_format: AudioFormat, dest_format: AudioFormat):
    """Create an AudioConverter

    Args:
        source_format: Source audio format
        dest_format: Destination audio format

    Raises:
        AudioConverterError: If converter creation fails
    """
    super().__init__()
    self._source_format = source_format
    self._dest_format = dest_format

    try:
        converter_id = capi.audio_converter_new(
            source_format.to_dict(), dest_format.to_dict()
        )
        self._set_object_id(converter_id)
    except Exception as e:
        raise AudioConverterError(f"Failed to create converter: {e}")

convert(audio_data)

Convert audio data from source to destination format

This method uses the simple buffer-based API (AudioConverterConvertBuffer) which only supports conversions where the input/output sizes are predictable. For complex conversions (sample rate, bit depth), use convert_with_callback().

Parameters:

Name Type Description Default
audio_data bytes

Input audio data in source format (raw PCM samples)

required

Returns:

Type Description
bytes

Converted audio data in destination format

Raises:

Type Description
AudioConverterError

If conversion fails

Example:

# Convert stereo to mono (simple format conversion)
source = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
dest = AudioFormat(44100.0, 'lpcm', channels_per_frame=1, bits_per_channel=16)

with AudioConverter(source, dest) as converter:
    stereo_data = b'\x00\x00\xff\xff' * 1024  # Raw 16-bit stereo samples
    mono_data = converter.convert(stereo_data)
Source code in src/coremusic/audio/core.py
def convert(self, audio_data: bytes) -> bytes:
    """Convert audio data from source to destination format

    This method uses the simple buffer-based API (AudioConverterConvertBuffer)
    which only supports conversions where the input/output sizes are predictable.
    For complex conversions (sample rate, bit depth), use convert_with_callback().

    Args:
        audio_data: Input audio data in source format (raw PCM samples)

    Returns:
        Converted audio data in destination format

    Raises:
        AudioConverterError: If conversion fails

    Example:

        # Convert stereo to mono (simple format conversion)
        source = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
        dest = AudioFormat(44100.0, 'lpcm', channels_per_frame=1, bits_per_channel=16)

        with AudioConverter(source, dest) as converter:
            stereo_data = b'\\x00\\x00\\xff\\xff' * 1024  # Raw 16-bit stereo samples
            mono_data = converter.convert(stereo_data)
    """
    self._ensure_not_disposed()
    try:
        return capi.audio_converter_convert_buffer(self.object_id, audio_data)
    except Exception as e:
        raise AudioConverterError(f"Failed to convert audio: {e}")

convert_with_callback(input_data, input_packet_count, output_packet_count=None)

Convert audio using callback-based API for complex conversions

This method supports all types of conversions including: - Sample rate changes (e.g., 44.1kHz -> 48kHz) - Bit depth changes (e.g., 16-bit -> 24-bit) - Channel count changes (stereo <-> mono) - Combinations of the above

Parameters:

Name Type Description Default
input_data bytes

Input audio data as bytes

required
input_packet_count int

Number of packets in input data

required
output_packet_count int | None

Expected output packets (auto-calculated if None)

None

Returns:

Type Description
bytes

Converted audio data as bytes

Raises:

Type Description
AudioConverterError

If conversion fails

Example:

# Convert 44.1kHz to 48kHz
source_format = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
dest_format = AudioFormat(48000.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)

with AudioConverter(source_format, dest_format) as converter:
    # Read input data
    with AudioFile("input_44100.wav") as af:
        input_data, packet_count = af.read_packets(0, 999999999)

    # Convert
    output_data = converter.convert_with_callback(input_data, packet_count)

    # Write output
    with ExtendedAudioFile.create("output_48000.wav", 'WAVE', dest_format) as out:
        num_frames = len(output_data) // dest_format.bytes_per_frame
        out.write(num_frames, output_data)
Source code in src/coremusic/audio/core.py
def convert_with_callback(
    self,
    input_data: bytes,
    input_packet_count: int,
    output_packet_count: int | None = None,
) -> bytes:
    """Convert audio using callback-based API for complex conversions

    This method supports all types of conversions including:
    - Sample rate changes (e.g., 44.1kHz -> 48kHz)
    - Bit depth changes (e.g., 16-bit -> 24-bit)
    - Channel count changes (stereo <-> mono)
    - Combinations of the above

    Args:
        input_data: Input audio data as bytes
        input_packet_count: Number of packets in input data
        output_packet_count: Expected output packets (auto-calculated if None)

    Returns:
        Converted audio data as bytes

    Raises:
        AudioConverterError: If conversion fails

    Example:

        # Convert 44.1kHz to 48kHz
        source_format = AudioFormat(44100.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)
        dest_format = AudioFormat(48000.0, 'lpcm', channels_per_frame=2, bits_per_channel=16)

        with AudioConverter(source_format, dest_format) as converter:
            # Read input data
            with AudioFile("input_44100.wav") as af:
                input_data, packet_count = af.read_packets(0, 999999999)

            # Convert
            output_data = converter.convert_with_callback(input_data, packet_count)

            # Write output
            with ExtendedAudioFile.create("output_48000.wav", 'WAVE', dest_format) as out:
                num_frames = len(output_data) // dest_format.bytes_per_frame
                out.write(num_frames, output_data)
    """
    if input_packet_count <= 0:
        raise ValueError(
            f"input_packet_count must be positive, got {input_packet_count}"
        )
    if output_packet_count is not None and output_packet_count <= 0:
        raise ValueError(
            f"output_packet_count must be positive, got {output_packet_count}"
        )
    if not isinstance(input_data, (bytes, bytearray)):
        raise TypeError(
            f"input_data must be bytes or bytearray, got {type(input_data).__name__}"
        )
    if len(input_data) == 0:
        raise ValueError("input_data cannot be empty")

    self._ensure_not_disposed()

    # Auto-calculate output packet count if not provided
    if output_packet_count is None:
        # Estimate based on sample rate ratio
        rate_ratio = self._dest_format.sample_rate / self._source_format.sample_rate
        output_packet_count = int(
            input_packet_count * rate_ratio * 1.1
        )  # 10% extra

    try:
        output_data, actual_packets = capi.audio_converter_fill_complex_buffer(
            self.object_id,
            input_data,
            input_packet_count,
            output_packet_count,
            self._source_format.to_dict(),
        )
        return output_data
    except Exception as e:
        raise AudioConverterError(f"Failed to convert audio: {e}")

get_property(property_id)

Get a property from the converter

Parameters:

Name Type Description Default
property_id int

Property ID

required

Returns:

Type Description
bytes

Property data as bytes

Raises:

Type Description
AudioConverterError

If getting property fails

Source code in src/coremusic/audio/core.py
def get_property(self, property_id: int) -> bytes:
    """Get a property from the converter

    Args:
        property_id: Property ID

    Returns:
        Property data as bytes

    Raises:
        AudioConverterError: If getting property fails
    """
    self._ensure_not_disposed()
    try:
        return capi.audio_converter_get_property(self.object_id, property_id)
    except Exception as e:
        raise AudioConverterError(f"Failed to get property: {e}")

set_property(property_id, data)

Set a property on the converter

Parameters:

Name Type Description Default
property_id int

Property ID (from capi.get_audio_converter_property_*())

required
data bytes

Property data as bytes (use struct.pack for binary encoding)

required

Raises:

Type Description
AudioConverterError

If setting property fails

Example:

import struct
import coremusic as cm

converter = AudioConverter(source_fmt, dest_fmt)

# Set bitrate to 128 kbps (requires UInt32)
bitrate_prop = cm.capi.get_audio_converter_property_bit_rate()
converter.set_property(bitrate_prop, struct.pack('<I', 128000))

# Set quality (requires UInt32, 0=lowest, 127=highest)
quality_prop = cm.capi.get_audio_converter_property_quality()
converter.set_property(quality_prop, struct.pack('<I', 127))
Source code in src/coremusic/audio/core.py
def set_property(self, property_id: int, data: bytes) -> None:
    """Set a property on the converter

    Args:
        property_id: Property ID (from capi.get_audio_converter_property_*())
        data: Property data as bytes (use struct.pack for binary encoding)

    Raises:
        AudioConverterError: If setting property fails

    Example:

        import struct
        import coremusic as cm

        converter = AudioConverter(source_fmt, dest_fmt)

        # Set bitrate to 128 kbps (requires UInt32)
        bitrate_prop = cm.capi.get_audio_converter_property_bit_rate()
        converter.set_property(bitrate_prop, struct.pack('<I', 128000))

        # Set quality (requires UInt32, 0=lowest, 127=highest)
        quality_prop = cm.capi.get_audio_converter_property_quality()
        converter.set_property(quality_prop, struct.pack('<I', 127))
    """
    self._ensure_not_disposed()
    try:
        capi.audio_converter_set_property(self.object_id, property_id, data)
    except Exception as e:
        raise AudioConverterError(f"Failed to set property: {e}")

reset()

Reset the converter to its initial state

Source code in src/coremusic/audio/core.py
def reset(self) -> None:
    """Reset the converter to its initial state"""
    self._ensure_not_disposed()
    try:
        capi.audio_converter_reset(self.object_id)
    except Exception as e:
        raise AudioConverterError(f"Failed to reset converter: {e}")

dispose()

Dispose of the audio converter

Source code in src/coremusic/audio/core.py
def dispose(self) -> None:
    """Dispose of the audio converter"""
    if not self.is_disposed:
        try:
            capi.audio_converter_dispose(self.object_id)
        except Exception:
            pass  # Best effort cleanup
        finally:
            super().dispose()

__enter__()

Enter context manager

Source code in src/coremusic/audio/core.py
def __enter__(self) -> "AudioConverter":
    """Enter context manager"""
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit context manager and dispose

Source code in src/coremusic/audio/core.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit context manager and dispose"""
    self.dispose()

__repr__()

Source code in src/coremusic/audio/core.py
def __repr__(self) -> str:
    return f"AudioConverter({self._source_format} -> {self._dest_format})"

MIDIClient Class

coremusic.midi.MIDIClient

Bases: CoreAudioObject

MIDI client for managing MIDI operations

Source code in src/coremusic/midi/core.py
class MIDIClient(capi.CoreAudioObject):
    """MIDI client for managing MIDI operations"""

    def __init__(self, name: str):
        super().__init__()
        self._name = name
        self._ports: list[MIDIPort] = []
        try:
            client_id = capi.midi_client_create(name)
            self._set_object_id(client_id)
        except Exception as e:
            raise MIDIError(f"Failed to create MIDI client: {e}")

    @property
    def name(self) -> str:
        return self._name

    def __repr__(self) -> str:
        if self.is_disposed:
            return f"MIDIClient({self._name!r}, disposed)"
        return f"MIDIClient({self._name!r}, ports={len(self._ports)})"

    def create_input_port(self, name: str) -> MIDIInputPort:
        """Create a MIDI input port"""
        self._ensure_not_disposed()
        try:
            port_id = capi.midi_input_port_create(self.object_id, name)
            port = MIDIInputPort(name)
            port._set_object_id(port_id)
            port._client = self
            self._ports.append(port)
            return port
        except Exception as e:
            raise MIDIError(f"Failed to create input port: {e}")

    def create_output_port(self, name: str) -> MIDIOutputPort:
        """Create a MIDI output port"""
        self._ensure_not_disposed()
        try:
            port_id = capi.midi_output_port_create(self.object_id, name)
            port = MIDIOutputPort(name)
            port._set_object_id(port_id)
            port._client = self
            self._ports.append(port)
            return port
        except Exception as e:
            raise MIDIError(f"Failed to create output port: {e}")

    def dispose(self) -> None:
        """Dispose of the MIDI client and all its ports"""
        if not self.is_disposed:
            # Dispose all ports first
            for port in self._ports[
                :
            ]:  # Copy list to avoid modification during iteration
                if not port.is_disposed:
                    try:
                        port.dispose()
                    except Exception:
                        pass  # Best effort cleanup

            try:
                capi.midi_client_dispose(self.object_id)
            except Exception:
                # Best effort disposal - some MIDI operations may fail in test environments
                pass
            finally:
                # Clear port references and call base dispose
                self._ports.clear()
                super().dispose()

_name = name instance-attribute

_ports = [] instance-attribute

name property

__init__(name)

Source code in src/coremusic/midi/core.py
def __init__(self, name: str):
    super().__init__()
    self._name = name
    self._ports: list[MIDIPort] = []
    try:
        client_id = capi.midi_client_create(name)
        self._set_object_id(client_id)
    except Exception as e:
        raise MIDIError(f"Failed to create MIDI client: {e}")

__repr__()

Source code in src/coremusic/midi/core.py
def __repr__(self) -> str:
    if self.is_disposed:
        return f"MIDIClient({self._name!r}, disposed)"
    return f"MIDIClient({self._name!r}, ports={len(self._ports)})"

create_input_port(name)

Create a MIDI input port

Source code in src/coremusic/midi/core.py
def create_input_port(self, name: str) -> MIDIInputPort:
    """Create a MIDI input port"""
    self._ensure_not_disposed()
    try:
        port_id = capi.midi_input_port_create(self.object_id, name)
        port = MIDIInputPort(name)
        port._set_object_id(port_id)
        port._client = self
        self._ports.append(port)
        return port
    except Exception as e:
        raise MIDIError(f"Failed to create input port: {e}")

create_output_port(name)

Create a MIDI output port

Source code in src/coremusic/midi/core.py
def create_output_port(self, name: str) -> MIDIOutputPort:
    """Create a MIDI output port"""
    self._ensure_not_disposed()
    try:
        port_id = capi.midi_output_port_create(self.object_id, name)
        port = MIDIOutputPort(name)
        port._set_object_id(port_id)
        port._client = self
        self._ports.append(port)
        return port
    except Exception as e:
        raise MIDIError(f"Failed to create output port: {e}")

dispose()

Dispose of the MIDI client and all its ports

Source code in src/coremusic/midi/core.py
def dispose(self) -> None:
    """Dispose of the MIDI client and all its ports"""
    if not self.is_disposed:
        # Dispose all ports first
        for port in self._ports[
            :
        ]:  # Copy list to avoid modification during iteration
            if not port.is_disposed:
                try:
                    port.dispose()
                except Exception:
                    pass  # Best effort cleanup

        try:
            capi.midi_client_dispose(self.object_id)
        except Exception:
            # Best effort disposal - some MIDI operations may fail in test environments
            pass
        finally:
            # Clear port references and call base dispose
            self._ports.clear()
            super().dispose()

AudioClock Class

coremusic.audio.AudioClock

Bases: CoreAudioObject

High-level CoreAudioClock for audio/MIDI synchronization and timing

AudioClock provides synchronization services for audio and MIDI applications, supporting multiple time formats and playback control.

Supported time formats:

  • Host time (mach_absolute_time)
  • Audio samples
  • Musical beats
  • Seconds
  • SMPTE timecode

Example::

# Create and control a clock
with AudioClock() as clock:
    clock.play_rate = 1.0
    clock.start()

    # Get current time in different formats
    seconds = clock.get_time_seconds()
    beats = clock.get_time_beats()

    print(f"Position: {seconds:.2f}s ({beats:.2f} beats)")

    clock.stop()

Example with tempo and speed control::

clock = AudioClock()
clock.play_rate = 0.5  # Half speed
clock.start()
# ... use clock for synchronization
clock.stop()
clock.dispose()
Source code in src/coremusic/audio/clock.py
class AudioClock(capi.CoreAudioObject):
    """High-level CoreAudioClock for audio/MIDI synchronization and timing

    AudioClock provides synchronization services for audio and MIDI applications,
    supporting multiple time formats and playback control.

    Supported time formats:

    - Host time (mach_absolute_time)
    - Audio samples
    - Musical beats
    - Seconds
    - SMPTE timecode

    Example::

        # Create and control a clock
        with AudioClock() as clock:
            clock.play_rate = 1.0
            clock.start()

            # Get current time in different formats
            seconds = clock.get_time_seconds()
            beats = clock.get_time_beats()

            print(f"Position: {seconds:.2f}s ({beats:.2f} beats)")

            clock.stop()

    Example with tempo and speed control::

        clock = AudioClock()
        clock.play_rate = 0.5  # Half speed
        clock.start()
        # ... use clock for synchronization
        clock.stop()
        clock.dispose()
    """

    def __init__(self) -> None:
        """Initialize a new CoreAudioClock"""
        super().__init__()
        self._is_created = False
        self._is_running = False

    def create(self) -> "AudioClock":
        """Create the underlying clock object

        Returns:
            Self for method chaining

        Raises:
            RuntimeError: If clock creation fails
        """
        if not self._is_created:
            try:
                clock_id = capi.ca_clock_new()
                self._set_object_id(clock_id)
                self._is_created = True
            except Exception as e:
                raise RuntimeError(f"Failed to create clock: {e}")
        return self

    def start(self) -> None:
        """Start the clock advancing on its timeline

        Raises:
            RuntimeError: If clock is not created or start fails
        """
        self._ensure_not_disposed()
        if not self._is_created:
            self.create()

        try:
            capi.ca_clock_start(self.object_id)
            self._is_running = True
        except Exception as e:
            raise RuntimeError(f"Failed to start clock: {e}")

    def stop(self) -> None:
        """Stop the clock

        Raises:
            RuntimeError: If stop fails
        """
        if self._is_running:
            try:
                capi.ca_clock_stop(self.object_id)
                self._is_running = False
            except Exception as e:
                raise RuntimeError(f"Failed to stop clock: {e}")

    @property
    def is_running(self) -> bool:
        """Check if the clock is currently running"""
        return self._is_running

    @property
    def play_rate(self) -> float:
        """Get or set the playback rate (1.0 = normal speed)"""
        self._ensure_not_disposed()
        if not self._is_created:
            self.create()

        try:
            return capi.ca_clock_get_play_rate(self.object_id)
        except Exception as e:
            raise RuntimeError(f"Failed to get play rate: {e}")

    @play_rate.setter
    def play_rate(self, rate: float) -> None:
        """Set the playback rate"""
        self._ensure_not_disposed()
        if not self._is_created:
            self.create()

        try:
            capi.ca_clock_set_play_rate(self.object_id, rate)
        except Exception as e:
            raise RuntimeError(f"Failed to set play rate: {e}")

    def get_current_time(self, time_format: int) -> dict[str, Any]:
        """Get current time in specified format

        Args:
            time_format: Time format constant from ClockTimeFormat

        Returns:
            Dictionary with 'format' and 'value' keys

        Raises:
            RuntimeError: If getting time fails
        """
        self._ensure_not_disposed()
        if not self._is_created:
            self.create()

        try:
            return capi.ca_clock_get_current_time(self.object_id, time_format)
        except Exception as e:
            raise RuntimeError(f"Failed to get current time: {e}")

    def get_time_seconds(self) -> float:
        """Get current time in seconds

        Returns:
            Current time in seconds
        """
        time_info = self.get_current_time(ClockTimeFormat.SECONDS)
        return float(time_info.get("value", 0.0))

    def get_time_beats(self) -> float:
        """Get current time in musical beats

        Returns:
            Current time in beats
        """
        time_info = self.get_current_time(ClockTimeFormat.BEATS)
        return float(time_info.get("value", 0.0))

    def get_time_samples(self) -> float:
        """Get current time in audio samples

        Returns:
            Current time in samples
        """
        time_info = self.get_current_time(ClockTimeFormat.SAMPLES)
        return float(time_info.get("value", 0.0))

    def get_time_host(self) -> int:
        """Get current time as host time

        Returns:
            Current host time (mach_absolute_time)
        """
        time_info = self.get_current_time(ClockTimeFormat.HOST_TIME)
        return int(time_info.get("value", 0))

    def get_smpte_time(self) -> dict[str, int]:
        """Get current time as SMPTE timecode

        Returns:
            Dictionary with SMPTE time components:
            - hours, minutes, seconds, frames
            - subframes, subframe_divisor
            - type, flags
        """
        time_info = self.get_current_time(ClockTimeFormat.SMPTE_TIME)
        value = time_info.get("value", {})
        if isinstance(value, dict):
            return value
        return {}

    def dispose(self) -> None:
        """Dispose the clock and free resources"""
        if not self.is_disposed and self._is_created:
            try:
                if self._is_running:
                    self.stop()
                capi.ca_clock_dispose(self.object_id)
            except Exception:
                pass  # Best effort cleanup
            finally:
                self._is_created = False
                self._is_running = False
                super().dispose()

    def __enter__(self) -> "AudioClock":
        """Enter context manager"""
        self.create()
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit context manager and dispose"""
        self.dispose()

    def __repr__(self) -> str:
        status = []
        if not self.is_disposed:
            if self._is_created:
                status.append("created")
            if self._is_running:
                status.append("running")
                try:
                    rate = self.play_rate
                    status.append(f"rate={rate:.2f}")
                except Exception:
                    pass
        else:
            status.append("disposed")

        status_str = ", ".join(status) if status else "not created"
        return f"AudioClock({status_str})"

_is_created = False instance-attribute

_is_running = False instance-attribute

is_running property

Check if the clock is currently running

play_rate property writable

Get or set the playback rate (1.0 = normal speed)

__init__()

Initialize a new CoreAudioClock

Source code in src/coremusic/audio/clock.py
def __init__(self) -> None:
    """Initialize a new CoreAudioClock"""
    super().__init__()
    self._is_created = False
    self._is_running = False

create()

Create the underlying clock object

Returns:

Type Description
'AudioClock'

Self for method chaining

Raises:

Type Description
RuntimeError

If clock creation fails

Source code in src/coremusic/audio/clock.py
def create(self) -> "AudioClock":
    """Create the underlying clock object

    Returns:
        Self for method chaining

    Raises:
        RuntimeError: If clock creation fails
    """
    if not self._is_created:
        try:
            clock_id = capi.ca_clock_new()
            self._set_object_id(clock_id)
            self._is_created = True
        except Exception as e:
            raise RuntimeError(f"Failed to create clock: {e}")
    return self

start()

Start the clock advancing on its timeline

Raises:

Type Description
RuntimeError

If clock is not created or start fails

Source code in src/coremusic/audio/clock.py
def start(self) -> None:
    """Start the clock advancing on its timeline

    Raises:
        RuntimeError: If clock is not created or start fails
    """
    self._ensure_not_disposed()
    if not self._is_created:
        self.create()

    try:
        capi.ca_clock_start(self.object_id)
        self._is_running = True
    except Exception as e:
        raise RuntimeError(f"Failed to start clock: {e}")

stop()

Stop the clock

Raises:

Type Description
RuntimeError

If stop fails

Source code in src/coremusic/audio/clock.py
def stop(self) -> None:
    """Stop the clock

    Raises:
        RuntimeError: If stop fails
    """
    if self._is_running:
        try:
            capi.ca_clock_stop(self.object_id)
            self._is_running = False
        except Exception as e:
            raise RuntimeError(f"Failed to stop clock: {e}")

get_current_time(time_format)

Get current time in specified format

Parameters:

Name Type Description Default
time_format int

Time format constant from ClockTimeFormat

required

Returns:

Type Description
dict[str, Any]

Dictionary with 'format' and 'value' keys

Raises:

Type Description
RuntimeError

If getting time fails

Source code in src/coremusic/audio/clock.py
def get_current_time(self, time_format: int) -> dict[str, Any]:
    """Get current time in specified format

    Args:
        time_format: Time format constant from ClockTimeFormat

    Returns:
        Dictionary with 'format' and 'value' keys

    Raises:
        RuntimeError: If getting time fails
    """
    self._ensure_not_disposed()
    if not self._is_created:
        self.create()

    try:
        return capi.ca_clock_get_current_time(self.object_id, time_format)
    except Exception as e:
        raise RuntimeError(f"Failed to get current time: {e}")

get_time_seconds()

Get current time in seconds

Returns:

Type Description
float

Current time in seconds

Source code in src/coremusic/audio/clock.py
def get_time_seconds(self) -> float:
    """Get current time in seconds

    Returns:
        Current time in seconds
    """
    time_info = self.get_current_time(ClockTimeFormat.SECONDS)
    return float(time_info.get("value", 0.0))

get_time_beats()

Get current time in musical beats

Returns:

Type Description
float

Current time in beats

Source code in src/coremusic/audio/clock.py
def get_time_beats(self) -> float:
    """Get current time in musical beats

    Returns:
        Current time in beats
    """
    time_info = self.get_current_time(ClockTimeFormat.BEATS)
    return float(time_info.get("value", 0.0))

get_time_samples()

Get current time in audio samples

Returns:

Type Description
float

Current time in samples

Source code in src/coremusic/audio/clock.py
def get_time_samples(self) -> float:
    """Get current time in audio samples

    Returns:
        Current time in samples
    """
    time_info = self.get_current_time(ClockTimeFormat.SAMPLES)
    return float(time_info.get("value", 0.0))

get_time_host()

Get current time as host time

Returns:

Type Description
int

Current host time (mach_absolute_time)

Source code in src/coremusic/audio/clock.py
def get_time_host(self) -> int:
    """Get current time as host time

    Returns:
        Current host time (mach_absolute_time)
    """
    time_info = self.get_current_time(ClockTimeFormat.HOST_TIME)
    return int(time_info.get("value", 0))

get_smpte_time()

Get current time as SMPTE timecode

Returns:

Type Description
dict[str, int]

Dictionary with SMPTE time components:

dict[str, int]
  • hours, minutes, seconds, frames
dict[str, int]
  • subframes, subframe_divisor
dict[str, int]
  • type, flags
Source code in src/coremusic/audio/clock.py
def get_smpte_time(self) -> dict[str, int]:
    """Get current time as SMPTE timecode

    Returns:
        Dictionary with SMPTE time components:
        - hours, minutes, seconds, frames
        - subframes, subframe_divisor
        - type, flags
    """
    time_info = self.get_current_time(ClockTimeFormat.SMPTE_TIME)
    value = time_info.get("value", {})
    if isinstance(value, dict):
        return value
    return {}

dispose()

Dispose the clock and free resources

Source code in src/coremusic/audio/clock.py
def dispose(self) -> None:
    """Dispose the clock and free resources"""
    if not self.is_disposed and self._is_created:
        try:
            if self._is_running:
                self.stop()
            capi.ca_clock_dispose(self.object_id)
        except Exception:
            pass  # Best effort cleanup
        finally:
            self._is_created = False
            self._is_running = False
            super().dispose()

__enter__()

Enter context manager

Source code in src/coremusic/audio/clock.py
def __enter__(self) -> "AudioClock":
    """Enter context manager"""
    self.create()
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit context manager and dispose

Source code in src/coremusic/audio/clock.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit context manager and dispose"""
    self.dispose()

__repr__()

Source code in src/coremusic/audio/clock.py
def __repr__(self) -> str:
    status = []
    if not self.is_disposed:
        if self._is_created:
            status.append("created")
        if self._is_running:
            status.append("running")
            try:
                rate = self.play_rate
                status.append(f"rate={rate:.2f}")
            except Exception:
                pass
    else:
        status.append("disposed")

    status_str = ", ".join(status) if status else "not created"
    return f"AudioClock({status_str})"

ClockTimeFormat

coremusic.audio.ClockTimeFormat

Time format constants for CoreAudioClock

Source code in src/coremusic/audio/clock.py
class ClockTimeFormat:
    """Time format constants for CoreAudioClock"""

    HOST_TIME = capi.get_ca_clock_time_format_host_time()
    SAMPLES = capi.get_ca_clock_time_format_samples()
    BEATS = capi.get_ca_clock_time_format_beats()
    SECONDS = capi.get_ca_clock_time_format_seconds()
    SMPTE_TIME = capi.get_ca_clock_time_format_smpte_time()

HOST_TIME = capi.get_ca_clock_time_format_host_time() class-attribute instance-attribute

SAMPLES = capi.get_ca_clock_time_format_samples() class-attribute instance-attribute

BEATS = capi.get_ca_clock_time_format_beats() class-attribute instance-attribute

SECONDS = capi.get_ca_clock_time_format_seconds() class-attribute instance-attribute

SMPTE_TIME = capi.get_ca_clock_time_format_smpte_time() class-attribute instance-attribute

AudioEffectsChain Class

coremusic.audio.AudioEffectsChain

High-level audio effects chain using AUGraph.

Provides a simplified API for creating and managing chains of AudioUnit effects for real-time audio processing.

Example
from coremusic.audio.utilities import AudioEffectsChain

# Create an effects chain
chain = AudioEffectsChain()

# Add effects (example effect types)
reverb_node = chain.add_effect('aumu', 'rvb2', 'appl')  # Reverb
eq_node = chain.add_effect('aufx', 'eqal', 'appl')      # EQ

# Connect to output
output_node = chain.add_output()
chain.connect(reverb_node, eq_node)
chain.connect(eq_node, output_node)

# Initialize and start
chain.initialize()
chain.start()

# ... process audio ...

# Cleanup
chain.stop()
chain.dispose()
Source code in src/coremusic/audio/utilities.py
class AudioEffectsChain:
    """High-level audio effects chain using AUGraph.

    Provides a simplified API for creating and managing chains of AudioUnit
    effects for real-time audio processing.

    Example:
        ```python
        from coremusic.audio.utilities import AudioEffectsChain

        # Create an effects chain
        chain = AudioEffectsChain()

        # Add effects (example effect types)
        reverb_node = chain.add_effect('aumu', 'rvb2', 'appl')  # Reverb
        eq_node = chain.add_effect('aufx', 'eqal', 'appl')      # EQ

        # Connect to output
        output_node = chain.add_output()
        chain.connect(reverb_node, eq_node)
        chain.connect(eq_node, output_node)

        # Initialize and start
        chain.initialize()
        chain.start()

        # ... process audio ...

        # Cleanup
        chain.stop()
        chain.dispose()
        ```
    """

    def __init__(self) -> None:
        """Create a new audio effects chain"""
        from .graph import AUGraph

        self._graph = AUGraph()
        self._nodes: dict[int, Any] = {}  # Map of node_id -> description

    @property
    def graph(self) -> Any:
        """Get the underlying AUGraph"""
        return self._graph

    def add_effect(
        self,
        effect_type: str,
        effect_subtype: str,
        manufacturer: str = "appl",
        flags: int = 0,
    ) -> int:
        """Add an audio effect to the chain.

        Args:
            effect_type: AudioUnit type (4-char code or string)
            effect_subtype: AudioUnit subtype (4-char code or string)
            manufacturer: Manufacturer code (default: 'appl' for Apple)
            flags: Component flags (default: 0)

        Returns:
            Node ID for this effect

        Example:
            ```python
            # Add a reverb effect
            reverb = chain.add_effect('aumu', 'rvb2', 'appl')

            # Add an EQ effect
            eq = chain.add_effect('aufx', 'eqal', 'appl')

            # Add a dynamics processor
            dynamics = chain.add_effect('aufx', 'dcmp', 'appl')
            ```
        """
        from .units import AudioComponentDescription

        desc = AudioComponentDescription(
            type=effect_type,
            subtype=effect_subtype,
            manufacturer=manufacturer,
            flags=flags,
            flags_mask=0,
        )

        node_id = self._graph.add_node(desc)
        self._nodes[node_id] = desc
        return node_id

    def add_effect_by_name(self, name: str) -> int | None:
        """Add an audio effect to the chain by name.

        This searches for an AudioUnit matching the given name and adds it
        to the chain.

        Args:
            name: AudioUnit name (e.g., 'AUDelay', 'Reverb', 'AUGraphicEQ')

        Returns:
            Node ID for this effect, or None if not found

        Example:
            ```python
            # Add effects by name instead of FourCC codes
            delay = chain.add_effect_by_name('AUDelay')
            reverb = chain.add_effect_by_name('Reverb')
            eq = chain.add_effect_by_name('AUGraphicEQ')
            ```
        """
        component = find_audio_unit_by_name(name)
        if component is None:
            return None

        # Extract FourCC codes from the component
        desc = component._description
        return self.add_effect(desc.type, desc.subtype, desc.manufacturer)

    def add_output(
        self, output_type: str = "auou", output_subtype: str = "def "
    ) -> int:
        """Add an output node to the chain.

        Args:
            output_type: Output type (default: 'auou' for AudioUnit Output)
            output_subtype: Output subtype (default: 'def ' for default output)

        Returns:
            Node ID for the output

        Example:
            ```python
            # Add default output
            output = chain.add_output()

            # Add system output
            output = chain.add_output('auou', 'sys ')
            ```
        """
        return self.add_effect(output_type, output_subtype, "appl")

    def connect(
        self, source_node: int, dest_node: int, source_bus: int = 0, dest_bus: int = 0
    ) -> None:
        """Connect two nodes in the effects chain.

        Args:
            source_node: Source node ID
            dest_node: Destination node ID
            source_bus: Source output bus (default: 0)
            dest_bus: Destination input bus (default: 0)

        Example:
            ```python
            # Connect reverb to EQ
            chain.connect(reverb_node, eq_node)

            # Connect EQ to output
            chain.connect(eq_node, output_node)
            ```
        """
        self._graph.connect(source_node, source_bus, dest_node, dest_bus)

    def disconnect(self, dest_node: int, dest_bus: int = 0) -> None:
        """Disconnect a node input.

        Args:
            dest_node: Destination node ID
            dest_bus: Destination input bus (default: 0)
        """
        self._graph.disconnect(dest_node, dest_bus)

    def remove_node(self, node_id: int) -> None:
        """Remove a node from the chain.

        Args:
            node_id: Node ID to remove
        """
        if node_id in self._nodes:
            del self._nodes[node_id]
        self._graph.remove_node(node_id)

    def open(self) -> "AudioEffectsChain":
        """Open the graph (opens all AudioUnits).

        Returns:
            Self for method chaining
        """
        self._graph.open()
        return self

    def initialize(self) -> "AudioEffectsChain":
        """Initialize the graph (prepares for rendering).

        Returns:
            Self for method chaining
        """
        self._graph.initialize()
        return self

    def start(self) -> None:
        """Start audio rendering"""
        self._graph.start()

    def stop(self) -> None:
        """Stop audio rendering"""
        self._graph.stop()

    def dispose(self) -> None:
        """Dispose of the graph and all resources"""
        try:
            if self._graph.is_running:
                self.stop()
        except Exception:
            pass
        self._graph.dispose()

    def __enter__(self) -> "AudioEffectsChain":
        """Context manager entry"""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit"""
        self.dispose()

    @property
    def is_open(self) -> bool:
        """Check if the graph is open"""
        return self._graph.is_open

    @property
    def is_initialized(self) -> bool:
        """Check if the graph is initialized"""
        return self._graph.is_initialized

    @property
    def is_running(self) -> bool:
        """Check if the graph is running"""
        return self._graph.is_running

    @property
    def node_count(self) -> int:
        """Get the number of nodes in the chain"""
        return len(self._nodes)

_graph = AUGraph() instance-attribute

_nodes = {} instance-attribute

graph property

Get the underlying AUGraph

is_open property

Check if the graph is open

is_initialized property

Check if the graph is initialized

is_running property

Check if the graph is running

node_count property

Get the number of nodes in the chain

__init__()

Create a new audio effects chain

Source code in src/coremusic/audio/utilities.py
def __init__(self) -> None:
    """Create a new audio effects chain"""
    from .graph import AUGraph

    self._graph = AUGraph()
    self._nodes: dict[int, Any] = {}  # Map of node_id -> description

add_effect(effect_type, effect_subtype, manufacturer='appl', flags=0)

Add an audio effect to the chain.

Parameters:

Name Type Description Default
effect_type str

AudioUnit type (4-char code or string)

required
effect_subtype str

AudioUnit subtype (4-char code or string)

required
manufacturer str

Manufacturer code (default: 'appl' for Apple)

'appl'
flags int

Component flags (default: 0)

0

Returns:

Type Description
int

Node ID for this effect

Example
# Add a reverb effect
reverb = chain.add_effect('aumu', 'rvb2', 'appl')

# Add an EQ effect
eq = chain.add_effect('aufx', 'eqal', 'appl')

# Add a dynamics processor
dynamics = chain.add_effect('aufx', 'dcmp', 'appl')
Source code in src/coremusic/audio/utilities.py
def add_effect(
    self,
    effect_type: str,
    effect_subtype: str,
    manufacturer: str = "appl",
    flags: int = 0,
) -> int:
    """Add an audio effect to the chain.

    Args:
        effect_type: AudioUnit type (4-char code or string)
        effect_subtype: AudioUnit subtype (4-char code or string)
        manufacturer: Manufacturer code (default: 'appl' for Apple)
        flags: Component flags (default: 0)

    Returns:
        Node ID for this effect

    Example:
        ```python
        # Add a reverb effect
        reverb = chain.add_effect('aumu', 'rvb2', 'appl')

        # Add an EQ effect
        eq = chain.add_effect('aufx', 'eqal', 'appl')

        # Add a dynamics processor
        dynamics = chain.add_effect('aufx', 'dcmp', 'appl')
        ```
    """
    from .units import AudioComponentDescription

    desc = AudioComponentDescription(
        type=effect_type,
        subtype=effect_subtype,
        manufacturer=manufacturer,
        flags=flags,
        flags_mask=0,
    )

    node_id = self._graph.add_node(desc)
    self._nodes[node_id] = desc
    return node_id

add_effect_by_name(name)

Add an audio effect to the chain by name.

This searches for an AudioUnit matching the given name and adds it to the chain.

Parameters:

Name Type Description Default
name str

AudioUnit name (e.g., 'AUDelay', 'Reverb', 'AUGraphicEQ')

required

Returns:

Type Description
int | None

Node ID for this effect, or None if not found

Example
# Add effects by name instead of FourCC codes
delay = chain.add_effect_by_name('AUDelay')
reverb = chain.add_effect_by_name('Reverb')
eq = chain.add_effect_by_name('AUGraphicEQ')
Source code in src/coremusic/audio/utilities.py
def add_effect_by_name(self, name: str) -> int | None:
    """Add an audio effect to the chain by name.

    This searches for an AudioUnit matching the given name and adds it
    to the chain.

    Args:
        name: AudioUnit name (e.g., 'AUDelay', 'Reverb', 'AUGraphicEQ')

    Returns:
        Node ID for this effect, or None if not found

    Example:
        ```python
        # Add effects by name instead of FourCC codes
        delay = chain.add_effect_by_name('AUDelay')
        reverb = chain.add_effect_by_name('Reverb')
        eq = chain.add_effect_by_name('AUGraphicEQ')
        ```
    """
    component = find_audio_unit_by_name(name)
    if component is None:
        return None

    # Extract FourCC codes from the component
    desc = component._description
    return self.add_effect(desc.type, desc.subtype, desc.manufacturer)

add_output(output_type='auou', output_subtype='def ')

Add an output node to the chain.

Parameters:

Name Type Description Default
output_type str

Output type (default: 'auou' for AudioUnit Output)

'auou'
output_subtype str

Output subtype (default: 'def ' for default output)

'def '

Returns:

Type Description
int

Node ID for the output

Example
# Add default output
output = chain.add_output()

# Add system output
output = chain.add_output('auou', 'sys ')
Source code in src/coremusic/audio/utilities.py
def add_output(
    self, output_type: str = "auou", output_subtype: str = "def "
) -> int:
    """Add an output node to the chain.

    Args:
        output_type: Output type (default: 'auou' for AudioUnit Output)
        output_subtype: Output subtype (default: 'def ' for default output)

    Returns:
        Node ID for the output

    Example:
        ```python
        # Add default output
        output = chain.add_output()

        # Add system output
        output = chain.add_output('auou', 'sys ')
        ```
    """
    return self.add_effect(output_type, output_subtype, "appl")

connect(source_node, dest_node, source_bus=0, dest_bus=0)

Connect two nodes in the effects chain.

Parameters:

Name Type Description Default
source_node int

Source node ID

required
dest_node int

Destination node ID

required
source_bus int

Source output bus (default: 0)

0
dest_bus int

Destination input bus (default: 0)

0
Example
# Connect reverb to EQ
chain.connect(reverb_node, eq_node)

# Connect EQ to output
chain.connect(eq_node, output_node)
Source code in src/coremusic/audio/utilities.py
def connect(
    self, source_node: int, dest_node: int, source_bus: int = 0, dest_bus: int = 0
) -> None:
    """Connect two nodes in the effects chain.

    Args:
        source_node: Source node ID
        dest_node: Destination node ID
        source_bus: Source output bus (default: 0)
        dest_bus: Destination input bus (default: 0)

    Example:
        ```python
        # Connect reverb to EQ
        chain.connect(reverb_node, eq_node)

        # Connect EQ to output
        chain.connect(eq_node, output_node)
        ```
    """
    self._graph.connect(source_node, source_bus, dest_node, dest_bus)

disconnect(dest_node, dest_bus=0)

Disconnect a node input.

Parameters:

Name Type Description Default
dest_node int

Destination node ID

required
dest_bus int

Destination input bus (default: 0)

0
Source code in src/coremusic/audio/utilities.py
def disconnect(self, dest_node: int, dest_bus: int = 0) -> None:
    """Disconnect a node input.

    Args:
        dest_node: Destination node ID
        dest_bus: Destination input bus (default: 0)
    """
    self._graph.disconnect(dest_node, dest_bus)

remove_node(node_id)

Remove a node from the chain.

Parameters:

Name Type Description Default
node_id int

Node ID to remove

required
Source code in src/coremusic/audio/utilities.py
def remove_node(self, node_id: int) -> None:
    """Remove a node from the chain.

    Args:
        node_id: Node ID to remove
    """
    if node_id in self._nodes:
        del self._nodes[node_id]
    self._graph.remove_node(node_id)

open()

Open the graph (opens all AudioUnits).

Returns:

Type Description
'AudioEffectsChain'

Self for method chaining

Source code in src/coremusic/audio/utilities.py
def open(self) -> "AudioEffectsChain":
    """Open the graph (opens all AudioUnits).

    Returns:
        Self for method chaining
    """
    self._graph.open()
    return self

initialize()

Initialize the graph (prepares for rendering).

Returns:

Type Description
'AudioEffectsChain'

Self for method chaining

Source code in src/coremusic/audio/utilities.py
def initialize(self) -> "AudioEffectsChain":
    """Initialize the graph (prepares for rendering).

    Returns:
        Self for method chaining
    """
    self._graph.initialize()
    return self

start()

Start audio rendering

Source code in src/coremusic/audio/utilities.py
def start(self) -> None:
    """Start audio rendering"""
    self._graph.start()

stop()

Stop audio rendering

Source code in src/coremusic/audio/utilities.py
def stop(self) -> None:
    """Stop audio rendering"""
    self._graph.stop()

dispose()

Dispose of the graph and all resources

Source code in src/coremusic/audio/utilities.py
def dispose(self) -> None:
    """Dispose of the graph and all resources"""
    try:
        if self._graph.is_running:
            self.stop()
    except Exception:
        pass
    self._graph.dispose()

__enter__()

Context manager entry

Source code in src/coremusic/audio/utilities.py
def __enter__(self) -> "AudioEffectsChain":
    """Context manager entry"""
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit

Source code in src/coremusic/audio/utilities.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit"""
    self.dispose()

Functional API

Low-level C-style functions are available through the coremusic.capi module for advanced use cases requiring direct access to CoreAudio frameworks.

Note

The object-oriented API is recommended for most use cases. The functional API in coremusic.capi provides low-level access when needed.

For direct access to low-level functions:

import coremusic.capi as capi

# Low-level audio file operations
file_id = capi.audio_file_open_url("audio.wav")
# ... operations ...
capi.audio_file_close(file_id)

# Low-level clock operations
clock_id = capi.ca_clock_new()
capi.ca_clock_start(clock_id)
# ... operations ...
capi.ca_clock_dispose(clock_id)

Error Handling

coremusic provides exception classes for different CoreAudio subsystems:

coremusic.exceptions.CoreAudioError

Bases: Exception

Base exception for CoreAudio errors

Source code in src/coremusic/exceptions.py
class CoreAudioError(Exception):
    """Base exception for CoreAudio errors"""

    def __init__(self, message: str, status_code: int = 0):
        super().__init__(message)
        self.status_code = status_code

    @classmethod
    def from_os_status(cls, status: int, operation: str = "") -> "CoreAudioError":
        """Create exception from OSStatus code with human-readable error message.

        Args:
            status: OSStatus error code
            operation: Description of failed operation (e.g., "open audio file")

        Returns:
            CoreAudioError with formatted message including error name and suggestion
        """
        error_str = os_status.os_status_to_string(status)
        suggestion = os_status.get_error_suggestion(status)

        if operation:
            message = f"Failed to {operation}: {error_str}"
        else:
            message = error_str

        if suggestion:
            message += f". {suggestion}"

        return cls(message, status_code=status)

status_code = status_code instance-attribute

__init__(message, status_code=0)

Source code in src/coremusic/exceptions.py
def __init__(self, message: str, status_code: int = 0):
    super().__init__(message)
    self.status_code = status_code

from_os_status(status, operation='') classmethod

Create exception from OSStatus code with human-readable error message.

Parameters:

Name Type Description Default
status int

OSStatus error code

required
operation str

Description of failed operation (e.g., "open audio file")

''

Returns:

Type Description
'CoreAudioError'

CoreAudioError with formatted message including error name and suggestion

Source code in src/coremusic/exceptions.py
@classmethod
def from_os_status(cls, status: int, operation: str = "") -> "CoreAudioError":
    """Create exception from OSStatus code with human-readable error message.

    Args:
        status: OSStatus error code
        operation: Description of failed operation (e.g., "open audio file")

    Returns:
        CoreAudioError with formatted message including error name and suggestion
    """
    error_str = os_status.os_status_to_string(status)
    suggestion = os_status.get_error_suggestion(status)

    if operation:
        message = f"Failed to {operation}: {error_str}"
    else:
        message = error_str

    if suggestion:
        message += f". {suggestion}"

    return cls(message, status_code=status)

coremusic.exceptions.AudioFileError

Bases: CoreAudioError

Exception for AudioFile operations

Source code in src/coremusic/exceptions.py
class AudioFileError(CoreAudioError):
    """Exception for AudioFile operations"""

coremusic.exceptions.AudioUnitError

Bases: CoreAudioError

Exception for AudioUnit operations

Source code in src/coremusic/exceptions.py
class AudioUnitError(CoreAudioError):
    """Exception for AudioUnit operations"""

coremusic.exceptions.AudioQueueError

Bases: CoreAudioError

Exception for AudioQueue operations

Source code in src/coremusic/exceptions.py
class AudioQueueError(CoreAudioError):
    """Exception for AudioQueue operations"""

coremusic.exceptions.AudioConverterError

Bases: CoreAudioError

Exception for AudioConverter operations

Source code in src/coremusic/exceptions.py
class AudioConverterError(CoreAudioError):
    """Exception for AudioConverter operations"""

coremusic.exceptions.MIDIError

Bases: CoreAudioError

Exception for MIDI operations

Source code in src/coremusic/exceptions.py
class MIDIError(CoreAudioError):
    """Exception for MIDI operations"""

coremusic.exceptions.MusicPlayerError

Bases: CoreAudioError

Exception for MusicPlayer operations

Source code in src/coremusic/exceptions.py
class MusicPlayerError(CoreAudioError):
    """Exception for MusicPlayer operations"""

coremusic.exceptions.AudioDeviceError

Bases: CoreAudioError

Exception for AudioDevice operations

Source code in src/coremusic/exceptions.py
class AudioDeviceError(CoreAudioError):
    """Exception for AudioDevice operations"""

coremusic.exceptions.AUGraphError

Bases: CoreAudioError

Exception for AUGraph operations

Source code in src/coremusic/exceptions.py
class AUGraphError(CoreAudioError):
    """Exception for AUGraph operations"""

Utility Functions

Utility functions are available through coremusic.capi for FourCC conversion and other low-level operations:

import coremusic.capi as capi

# Convert FourCC string to integer
format_int = capi.fourchar_to_int('lpcm')

# Convert integer back to FourCC string
format_str = capi.int_to_fourchar(format_int)