Skip to content

Verification API

Formal verification components using Z3 theorem prover.

Bounds Prover

multigen.frontend.verifiers.bounds_prover

Memory Safety and Bounds Verification.

This module provides formal verification of memory safety properties, including bounds checking, buffer overflow prevention, and pointer safety.

BoundsProver

Formal verification of memory bounds and safety properties.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
class BoundsProver:
    """Formal verification of memory bounds and safety properties."""

    def __init__(self, theorem_prover: Optional[TheoremProver] = None):
        """Initialize the bounds prover.

        Args:
            theorem_prover: Z3 theorem prover instance
        """
        self.theorem_prover = theorem_prover or TheoremProver()
        self.z3_available = Z3_AVAILABLE

        # Track memory regions and their properties
        self.memory_regions: dict[str, MemoryRegion] = {}
        self.memory_accesses: list[MemoryAccess] = []

    def verify_memory_safety(self, context: AnalysisContext) -> MemorySafetyProof:
        """Verify memory safety for a function.

        Args:
            context: Analysis context with AST and metadata

        Returns:
            MemorySafetyProof with verification results
        """
        start_time = time.time()

        # Extract memory operations from AST
        memory_extractor = MemoryOperationExtractor()
        memory_extractor.visit(context.ast_node)

        self.memory_regions = memory_extractor.memory_regions
        self.memory_accesses = memory_extractor.memory_accesses

        # Verify different types of memory safety
        all_proof_results = []
        unsafe_accesses = []

        # 1. Bounds checking verification
        bounds_results = self._verify_bounds_safety()
        all_proof_results.extend(bounds_results)

        # 2. Null pointer verification
        null_results = self._verify_null_pointer_safety()
        all_proof_results.extend(null_results)

        # 3. Buffer overflow verification
        overflow_results = self._verify_buffer_overflow_safety()
        all_proof_results.extend(overflow_results)

        # Determine overall safety status
        is_safe = all(result.is_verified for result in all_proof_results)

        # Find unsafe accesses
        for access in self.memory_accesses:
            if access.is_safe is False:
                unsafe_accesses.append(access)

        # Calculate confidence based on proof results
        confidence = self._calculate_confidence(all_proof_results)

        # Generate recommendations
        recommendations = self._generate_recommendations(unsafe_accesses, all_proof_results)

        verification_time = time.time() - start_time

        function_name = "unknown"
        if context.analysis_result and hasattr(context.analysis_result, "functions"):
            if context.analysis_result.functions:
                function_name = list(context.analysis_result.functions.keys())[0]
        elif hasattr(context.ast_node, "name"):
            function_name = context.ast_node.name

        return MemorySafetyProof(
            function_name=function_name,
            safety_type=MemorySafetyType.BOUNDS_VIOLATION,  # Primary type
            is_safe=is_safe,
            proof_results=all_proof_results,
            unsafe_accesses=unsafe_accesses,
            recommendations=recommendations,
            verification_time=verification_time,
            confidence=confidence,
        )

    def _verify_bounds_safety(self) -> list[ProofResult]:
        """Verify bounds checking for all memory accesses."""
        results = []

        for access in self.memory_accesses:
            if access.access_type in ["read", "write"]:
                # Create bounds checking property
                prop = self._create_bounds_property(access)
                result = self.theorem_prover.verify_property(prop)
                results.append(result)

                # Update access safety status
                access.is_safe = result.is_verified

        return results

    def _verify_null_pointer_safety(self) -> list[ProofResult]:
        """Verify null pointer dereference safety."""
        results = []

        for _region_name, region in self.memory_regions.items():
            if region.base_address == 0 or region.base_address == "null":
                # Create null pointer safety property
                prop = self._create_null_pointer_property(region)
                result = self.theorem_prover.verify_property(prop)
                results.append(result)

        return results

    def _verify_buffer_overflow_safety(self) -> list[ProofResult]:
        """Verify buffer overflow prevention."""
        results = []

        # Group accesses by memory region
        region_accesses: dict[str, list[MemoryAccess]] = {}
        for access in self.memory_accesses:
            region_name = access.region.name
            if region_name not in region_accesses:
                region_accesses[region_name] = []
            region_accesses[region_name].append(access)

        # Verify each region for buffer overflows
        for region_name, accesses in region_accesses.items():
            self.memory_regions[region_name]

            # Check for potential buffer overflows
            for access in accesses:
                if access.access_type == "write":
                    prop = self._create_buffer_overflow_property(access)
                    result = self.theorem_prover.verify_property(prop)
                    results.append(result)

        return results

    def _create_bounds_property(self, access: MemoryAccess) -> ProofProperty:
        """Create a bounds checking property for a memory access."""
        region = access.region

        if not self.z3_available:
            return ProofProperty(
                name=f"bounds_check_{region.name}",
                property_type=PropertyType.BOUNDS_CHECKING,
                description=f"Access to {region.name} at offset {access.offset} is within bounds",
                z3_formula="mock_formula",
            )

        # Create Z3 variables
        offset = z3.Int(f"offset_{region.name}")
        size = z3.Int(f"size_{region.name}")

        # Set concrete values if known
        if isinstance(access.offset, int):
            offset = z3.IntVal(access.offset)
        if isinstance(region.size, int):
            size = z3.IntVal(region.size)

        # Bounds property: 0 <= offset < size
        bounds_formula = z3.And(offset >= 0, offset < size)

        return ProofProperty(
            name=f"bounds_check_{region.name}_{access.line_number}",
            property_type=PropertyType.BOUNDS_CHECKING,
            description=f"Access to {region.name} at offset {access.offset} is within bounds",
            z3_formula=bounds_formula,
            context={"access": access, "region": region},
        )

    def _create_null_pointer_property(self, region: MemoryRegion) -> ProofProperty:
        """Create a null pointer safety property."""
        if not self.z3_available:
            return ProofProperty(
                name=f"null_check_{region.name}",
                property_type=PropertyType.NULL_POINTER_SAFETY,
                description=f"Region {region.name} is not null",
                z3_formula="mock_formula",
            )

        # Create Z3 variable for base address
        base_addr = z3.Int(f"base_{region.name}")

        # Null safety: base_addr != 0
        null_safety_formula = base_addr != 0

        return ProofProperty(
            name=f"null_check_{region.name}",
            property_type=PropertyType.NULL_POINTER_SAFETY,
            description=f"Region {region.name} is not null",
            z3_formula=null_safety_formula,
            context={"region": region},
        )

    def _create_buffer_overflow_property(self, access: MemoryAccess) -> ProofProperty:
        """Create a buffer overflow prevention property."""
        region = access.region

        if not self.z3_available:
            return ProofProperty(
                name=f"overflow_check_{region.name}",
                property_type=PropertyType.MEMORY_SAFETY,
                description=f"Write to {region.name} does not cause buffer overflow",
                z3_formula="mock_formula",
            )

        # Create Z3 variables
        offset = z3.Int(f"offset_{region.name}")
        write_size = z3.IntVal(access.size)
        buffer_size = z3.Int(f"size_{region.name}")

        # Set concrete values if known
        if isinstance(access.offset, int):
            offset = z3.IntVal(access.offset)
        if isinstance(region.size, int):
            buffer_size = z3.IntVal(region.size)

        # Overflow prevention: offset + write_size <= buffer_size
        overflow_formula = offset + write_size <= buffer_size

        return ProofProperty(
            name=f"overflow_check_{region.name}_{access.line_number}",
            property_type=PropertyType.MEMORY_SAFETY,
            description=f"Write to {region.name} does not cause buffer overflow",
            z3_formula=overflow_formula,
            context={"access": access, "region": region},
        )

    def _calculate_confidence(self, proof_results: list[ProofResult]) -> float:
        """Calculate confidence based on proof results."""
        if not proof_results:
            return 0.0

        proved_count = sum(1 for result in proof_results if result.status == ProofStatus.PROVED)
        unknown_count = sum(1 for result in proof_results if result.status == ProofStatus.UNKNOWN)
        total_count = len(proof_results)

        # Full confidence for proved properties, partial for unknown
        confidence = (proved_count + unknown_count * 0.5) / total_count
        return min(confidence, 1.0)

    def _generate_recommendations(
        self, unsafe_accesses: list[MemoryAccess], proof_results: list[ProofResult]
    ) -> list[str]:
        """Generate recommendations for improving memory safety."""
        recommendations = []

        if unsafe_accesses:
            recommendations.append("Add bounds checking before array/buffer accesses")
            recommendations.append("Consider using safer data structures (e.g., std::vector)")

        failed_proofs = [r for r in proof_results if r.status == ProofStatus.DISPROVED]
        if failed_proofs:
            recommendations.append("Review memory access patterns in failed verification points")

        unknown_proofs = [r for r in proof_results if r.status == ProofStatus.UNKNOWN]
        if unknown_proofs:
            recommendations.append("Add explicit bounds information to help verification")
            recommendations.append("Consider simplifying complex indexing expressions")

        if not recommendations:
            recommendations.append("Memory safety verification passed - code appears safe")

        return recommendations

__init__(theorem_prover=None)

Initialize the bounds prover.

Parameters:

Name Type Description Default
theorem_prover Optional[TheoremProver]

Z3 theorem prover instance

None
Source code in src/multigen/frontend/verifiers/bounds_prover.py
def __init__(self, theorem_prover: Optional[TheoremProver] = None):
    """Initialize the bounds prover.

    Args:
        theorem_prover: Z3 theorem prover instance
    """
    self.theorem_prover = theorem_prover or TheoremProver()
    self.z3_available = Z3_AVAILABLE

    # Track memory regions and their properties
    self.memory_regions: dict[str, MemoryRegion] = {}
    self.memory_accesses: list[MemoryAccess] = []

verify_memory_safety(context)

Verify memory safety for a function.

Parameters:

Name Type Description Default
context AnalysisContext

Analysis context with AST and metadata

required

Returns:

Type Description
MemorySafetyProof

MemorySafetyProof with verification results

Source code in src/multigen/frontend/verifiers/bounds_prover.py
def verify_memory_safety(self, context: AnalysisContext) -> MemorySafetyProof:
    """Verify memory safety for a function.

    Args:
        context: Analysis context with AST and metadata

    Returns:
        MemorySafetyProof with verification results
    """
    start_time = time.time()

    # Extract memory operations from AST
    memory_extractor = MemoryOperationExtractor()
    memory_extractor.visit(context.ast_node)

    self.memory_regions = memory_extractor.memory_regions
    self.memory_accesses = memory_extractor.memory_accesses

    # Verify different types of memory safety
    all_proof_results = []
    unsafe_accesses = []

    # 1. Bounds checking verification
    bounds_results = self._verify_bounds_safety()
    all_proof_results.extend(bounds_results)

    # 2. Null pointer verification
    null_results = self._verify_null_pointer_safety()
    all_proof_results.extend(null_results)

    # 3. Buffer overflow verification
    overflow_results = self._verify_buffer_overflow_safety()
    all_proof_results.extend(overflow_results)

    # Determine overall safety status
    is_safe = all(result.is_verified for result in all_proof_results)

    # Find unsafe accesses
    for access in self.memory_accesses:
        if access.is_safe is False:
            unsafe_accesses.append(access)

    # Calculate confidence based on proof results
    confidence = self._calculate_confidence(all_proof_results)

    # Generate recommendations
    recommendations = self._generate_recommendations(unsafe_accesses, all_proof_results)

    verification_time = time.time() - start_time

    function_name = "unknown"
    if context.analysis_result and hasattr(context.analysis_result, "functions"):
        if context.analysis_result.functions:
            function_name = list(context.analysis_result.functions.keys())[0]
    elif hasattr(context.ast_node, "name"):
        function_name = context.ast_node.name

    return MemorySafetyProof(
        function_name=function_name,
        safety_type=MemorySafetyType.BOUNDS_VIOLATION,  # Primary type
        is_safe=is_safe,
        proof_results=all_proof_results,
        unsafe_accesses=unsafe_accesses,
        recommendations=recommendations,
        verification_time=verification_time,
        confidence=confidence,
    )

MemoryAccess dataclass

Represents a memory access operation.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
@dataclass
class MemoryAccess:
    """Represents a memory access operation."""

    region: MemoryRegion
    access_type: str  # "read", "write", "address"
    offset: Union[str, int]
    size: int
    line_number: int
    is_safe: Optional[bool] = None

MemoryOperationExtractor

Bases: NodeVisitor

Extract memory operations and regions from Python AST.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
class MemoryOperationExtractor(ast.NodeVisitor):
    """Extract memory operations and regions from Python AST."""

    def __init__(self) -> None:
        self.memory_regions: dict[str, MemoryRegion] = {}
        self.memory_accesses: list[MemoryAccess] = []
        self.current_function: Optional[str] = None

    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
        """Track function context."""
        self.current_function = node.name
        self.generic_visit(node)

    def visit_Assign(self, node: ast.Assign) -> None:
        """Extract memory region declarations."""
        if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
            var_name = node.targets[0].id

            # Check for list/array initialization
            if isinstance(node.value, ast.List):
                size = len(node.value.elts)
                element_type = self._infer_element_type(node.value.elts)

                region = MemoryRegion(
                    name=var_name,
                    base_address=f"&{var_name}",
                    size=size,
                    element_type=element_type,
                    initialization_status="initialized",
                )
                self.memory_regions[var_name] = region

            # Check for explicit array allocation patterns
            elif isinstance(node.value, ast.Call):
                if isinstance(node.value.func, ast.Name) and node.value.func.id == "list":
                    # list(size) or similar
                    if node.value.args:
                        list_size = self._extract_size_info(node.value.args[0])
                        region = MemoryRegion(
                            name=var_name,
                            base_address=f"&{var_name}",
                            size=list_size,
                            element_type="unknown",
                            initialization_status="uninitialized",
                        )
                        self.memory_regions[var_name] = region

        self.generic_visit(node)

    def visit_Subscript(self, node: ast.Subscript) -> None:
        """Extract memory access operations."""
        if isinstance(node.value, ast.Name):
            array_name = node.value.id

            # Ensure we have a memory region for this array
            if array_name not in self.memory_regions:
                # Create unknown region
                self.memory_regions[array_name] = MemoryRegion(
                    name=array_name, base_address=f"&{array_name}", size="unknown", element_type="unknown"
                )

            region = self.memory_regions[array_name]

            # Extract index information
            offset = self._extract_offset_info(node.slice)

            # Determine access type from context
            access_type = "read"  # Default
            # Note: In a more complete implementation, we'd analyze the context
            # to determine if this is a read or write access

            access = MemoryAccess(
                region=region,
                access_type=access_type,
                offset=offset,
                size=1,  # Assume single element access
                line_number=node.lineno,
            )
            self.memory_accesses.append(access)

        self.generic_visit(node)

    def _infer_element_type(self, elements: list[ast.expr]) -> str:
        """Infer element type from list elements."""
        if not elements:
            return "unknown"

        first_elem = elements[0]
        if isinstance(first_elem, ast.Constant):
            if isinstance(first_elem.value, int):
                return "int"
            elif isinstance(first_elem.value, float):
                return "double"
            elif isinstance(first_elem.value, str):
                return "char*"
            elif isinstance(first_elem.value, bool):
                return "bool"

        return "unknown"

    def _extract_size_info(self, size_node: ast.expr) -> Union[str, int]:
        """Extract size information from AST node."""
        if isinstance(size_node, ast.Constant):
            if isinstance(size_node.value, (int, str)):
                return size_node.value
            else:
                return str(size_node.value) if size_node.value is not None else "unknown"
        elif isinstance(size_node, ast.Name):
            return size_node.id
        else:
            return "complex_expression"

    def _extract_offset_info(self, slice_node: ast.expr) -> Union[str, int]:
        """Extract offset information from slice node."""
        if isinstance(slice_node, ast.Constant):
            if isinstance(slice_node.value, (int, str)):
                return slice_node.value
            else:
                return str(slice_node.value) if slice_node.value is not None else "unknown"
        elif isinstance(slice_node, ast.Name):
            return slice_node.id
        else:
            return "complex_index"

visit_Assign(node)

Extract memory region declarations.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
def visit_Assign(self, node: ast.Assign) -> None:
    """Extract memory region declarations."""
    if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
        var_name = node.targets[0].id

        # Check for list/array initialization
        if isinstance(node.value, ast.List):
            size = len(node.value.elts)
            element_type = self._infer_element_type(node.value.elts)

            region = MemoryRegion(
                name=var_name,
                base_address=f"&{var_name}",
                size=size,
                element_type=element_type,
                initialization_status="initialized",
            )
            self.memory_regions[var_name] = region

        # Check for explicit array allocation patterns
        elif isinstance(node.value, ast.Call):
            if isinstance(node.value.func, ast.Name) and node.value.func.id == "list":
                # list(size) or similar
                if node.value.args:
                    list_size = self._extract_size_info(node.value.args[0])
                    region = MemoryRegion(
                        name=var_name,
                        base_address=f"&{var_name}",
                        size=list_size,
                        element_type="unknown",
                        initialization_status="uninitialized",
                    )
                    self.memory_regions[var_name] = region

    self.generic_visit(node)

visit_FunctionDef(node)

Track function context.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
    """Track function context."""
    self.current_function = node.name
    self.generic_visit(node)

visit_Subscript(node)

Extract memory access operations.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
def visit_Subscript(self, node: ast.Subscript) -> None:
    """Extract memory access operations."""
    if isinstance(node.value, ast.Name):
        array_name = node.value.id

        # Ensure we have a memory region for this array
        if array_name not in self.memory_regions:
            # Create unknown region
            self.memory_regions[array_name] = MemoryRegion(
                name=array_name, base_address=f"&{array_name}", size="unknown", element_type="unknown"
            )

        region = self.memory_regions[array_name]

        # Extract index information
        offset = self._extract_offset_info(node.slice)

        # Determine access type from context
        access_type = "read"  # Default
        # Note: In a more complete implementation, we'd analyze the context
        # to determine if this is a read or write access

        access = MemoryAccess(
            region=region,
            access_type=access_type,
            offset=offset,
            size=1,  # Assume single element access
            line_number=node.lineno,
        )
        self.memory_accesses.append(access)

    self.generic_visit(node)

MemoryRegion dataclass

Represents a memory region with bounds and properties.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
@dataclass
class MemoryRegion:
    """Represents a memory region with bounds and properties."""

    name: str
    base_address: Union[str, int]
    size: Union[str, int]
    element_type: str
    is_heap: bool = False
    is_stack: bool = True
    is_null_terminated: bool = False
    initialization_status: str = "unknown"  # "initialized", "uninitialized", "unknown"

MemorySafetyProof dataclass

Result of memory safety verification.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
@dataclass
class MemorySafetyProof:
    """Result of memory safety verification."""

    function_name: str
    safety_type: MemorySafetyType
    is_safe: bool
    proof_results: list[ProofResult]
    unsafe_accesses: list[MemoryAccess]
    recommendations: list[str]
    verification_time: float
    confidence: float  # 0.0 to 1.0

    @property
    def summary(self) -> str:
        """Human-readable summary of the proof."""
        status = "SAFE" if self.is_safe else "UNSAFE"
        return f"{self.function_name}: {self.safety_type.value} - {status} (confidence: {self.confidence:.2f})"

summary property

Human-readable summary of the proof.

MemorySafetyType

Bases: Enum

Types of memory safety properties.

Source code in src/multigen/frontend/verifiers/bounds_prover.py
class MemorySafetyType(Enum):
    """Types of memory safety properties."""

    BUFFER_OVERFLOW = "buffer_overflow"
    BOUNDS_VIOLATION = "bounds_violation"
    NULL_POINTER_DEREFERENCE = "null_pointer_dereference"
    USE_AFTER_FREE = "use_after_free"
    DOUBLE_FREE = "double_free"
    UNINITIALIZED_READ = "uninitialized_read"
    STACK_OVERFLOW = "stack_overflow"
    HEAP_CORRUPTION = "heap_corruption"

Theorem Prover

multigen.frontend.verifiers.theorem_prover

Z3 Theorem Prover Integration for Formal Verification.

This module provides integration with the Z3 theorem prover for formal verification of code properties, memory safety, and algorithm correctness.

ProofProperty dataclass

A property to be verified.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
@dataclass
class ProofProperty:
    """A property to be verified."""

    name: str
    property_type: PropertyType
    description: str
    z3_formula: Any  # Z3 formula
    context: dict[str, Any] | None = None

ProofResult dataclass

Result of a formal verification attempt.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
@dataclass
class ProofResult:
    """Result of a formal verification attempt."""

    proof_property: ProofProperty  # Renamed from 'property' to avoid conflict with @property decorator
    status: ProofStatus
    proof_time: float
    counterexample: dict[str, Any] | None = None
    error_message: str | None = None
    z3_model: Any | None = None
    verification_steps: list[str] | None = None

    def __post_init__(self) -> None:
        """Initialize verification_steps list if not provided."""
        if self.verification_steps is None:
            self.verification_steps = []

    @property
    def is_verified(self) -> bool:
        """Returns True if the property was successfully proved."""
        return self.status == ProofStatus.PROVED

    @property
    def is_safe(self) -> bool:
        """Returns True if no counterexample was found (proved or unknown)."""
        return self.status in [ProofStatus.PROVED, ProofStatus.UNKNOWN]

is_safe property

Returns True if no counterexample was found (proved or unknown).

is_verified property

Returns True if the property was successfully proved.

__post_init__()

Initialize verification_steps list if not provided.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def __post_init__(self) -> None:
    """Initialize verification_steps list if not provided."""
    if self.verification_steps is None:
        self.verification_steps = []

ProofStatus

Bases: Enum

Status of a formal proof attempt.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
class ProofStatus(Enum):
    """Status of a formal proof attempt."""

    PROVED = "proved"
    DISPROVED = "disproved"
    UNKNOWN = "unknown"
    TIMEOUT = "timeout"
    ERROR = "error"

PropertyType

Bases: Enum

Types of properties that can be verified.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
class PropertyType(Enum):
    """Types of properties that can be verified."""

    MEMORY_SAFETY = "memory_safety"
    BOUNDS_CHECKING = "bounds_checking"
    NULL_POINTER_SAFETY = "null_pointer_safety"
    OVERFLOW_SAFETY = "overflow_safety"
    ALGORITHM_CORRECTNESS = "algorithm_correctness"
    PRECONDITION_SATISFACTION = "precondition_satisfaction"
    POSTCONDITION_SATISFACTION = "postcondition_satisfaction"
    LOOP_INVARIANT = "loop_invariant"
    TERMINATION = "termination"
    FUNCTIONAL_CORRECTNESS = "functional_correctness"
    PARTIAL_CORRECTNESS = "partial_correctness"
    TOTAL_CORRECTNESS = "total_correctness"

SafetyPropertyExtractor

Bases: NodeVisitor

AST visitor to extract safety properties from Python code.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
class SafetyPropertyExtractor(ast.NodeVisitor):
    """AST visitor to extract safety properties from Python code."""

    def __init__(self) -> None:
        self.array_accesses: list[dict[str, Any]] = []
        self.arithmetic_ops: list[dict[str, Any]] = []
        self.function_calls: list[dict[str, Any]] = []
        self.loop_bounds: list[dict[str, Any]] = []

    def visit_Subscript(self, node: ast.Subscript) -> None:
        """Extract array access patterns."""
        if isinstance(node.value, ast.Name):
            array_name = node.value.id

            # Try to extract index information
            if isinstance(node.slice, ast.Name):
                index_name = node.slice.id
            elif isinstance(node.slice, ast.Constant):
                index_name = str(node.slice.value)
            else:
                index_name = "complex_index"

            self.array_accesses.append({"array": array_name, "index": index_name, "line": node.lineno})

        self.generic_visit(node)

    def visit_BinOp(self, node: ast.BinOp) -> None:
        """Extract arithmetic operations for overflow checking."""
        if isinstance(node.op, (ast.Add, ast.Sub, ast.Mult)):
            op_name = {ast.Add: "add", ast.Sub: "sub", ast.Mult: "mul"}[type(node.op)]

            operands = []
            if isinstance(node.left, ast.Name):
                operands.append(node.left.id)
            if isinstance(node.right, ast.Name):
                operands.append(node.right.id)

            if operands:
                self.arithmetic_ops.append({"op": op_name, "operands": operands, "line": node.lineno})

        self.generic_visit(node)

    def visit_For(self, node: ast.For) -> None:
        """Extract loop information for termination analysis."""
        if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
            if node.iter.func.id == "range":
                args = node.iter.args
                if args:
                    bound_info = {"type": "range", "args": len(args), "line": node.lineno}
                    self.loop_bounds.append(bound_info)

        self.generic_visit(node)

visit_BinOp(node)

Extract arithmetic operations for overflow checking.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def visit_BinOp(self, node: ast.BinOp) -> None:
    """Extract arithmetic operations for overflow checking."""
    if isinstance(node.op, (ast.Add, ast.Sub, ast.Mult)):
        op_name = {ast.Add: "add", ast.Sub: "sub", ast.Mult: "mul"}[type(node.op)]

        operands = []
        if isinstance(node.left, ast.Name):
            operands.append(node.left.id)
        if isinstance(node.right, ast.Name):
            operands.append(node.right.id)

        if operands:
            self.arithmetic_ops.append({"op": op_name, "operands": operands, "line": node.lineno})

    self.generic_visit(node)

visit_For(node)

Extract loop information for termination analysis.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def visit_For(self, node: ast.For) -> None:
    """Extract loop information for termination analysis."""
    if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
        if node.iter.func.id == "range":
            args = node.iter.args
            if args:
                bound_info = {"type": "range", "args": len(args), "line": node.lineno}
                self.loop_bounds.append(bound_info)

    self.generic_visit(node)

visit_Subscript(node)

Extract array access patterns.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def visit_Subscript(self, node: ast.Subscript) -> None:
    """Extract array access patterns."""
    if isinstance(node.value, ast.Name):
        array_name = node.value.id

        # Try to extract index information
        if isinstance(node.slice, ast.Name):
            index_name = node.slice.id
        elif isinstance(node.slice, ast.Constant):
            index_name = str(node.slice.value)
        else:
            index_name = "complex_index"

        self.array_accesses.append({"array": array_name, "index": index_name, "line": node.lineno})

    self.generic_visit(node)

TheoremProver

Z3-based theorem prover for formal verification.

Source code in src/multigen/frontend/verifiers/theorem_prover.py
class TheoremProver:
    """Z3-based theorem prover for formal verification."""

    def __init__(self, timeout: int = 30000):  # 30 seconds default
        """Initialize the theorem prover.

        Args:
            timeout: Timeout in milliseconds for Z3 solver
        """
        self.timeout = timeout
        self.z3_available = Z3_AVAILABLE
        self.solver = None

        if self.z3_available:
            self.solver = z3.Solver()
            self.solver.set("timeout", timeout)

        # Property templates for common verification patterns
        self.property_templates = {
            PropertyType.BOUNDS_CHECKING: self._create_bounds_check_template,
            PropertyType.NULL_POINTER_SAFETY: self._create_null_pointer_template,
            PropertyType.OVERFLOW_SAFETY: self._create_overflow_template,
            PropertyType.LOOP_INVARIANT: self._create_loop_invariant_template,
        }

    def verify_property(self, prop: ProofProperty) -> ProofResult:
        """Verify a single property using Z3.

        Args:
            prop: Property to verify

        Returns:
            ProofResult with verification outcome
        """
        if not self.z3_available:
            return ProofResult(
                proof_property=prop,
                status=ProofStatus.ERROR,
                proof_time=0.0,
                error_message="Z3 not available. Install with: pip install z3-solver",
            )

        start_time = time.time()

        try:
            # Create fresh solver for this property
            solver = z3.Solver()
            solver.set("timeout", self.timeout)

            # Add the negation of the property (looking for counterexample)
            negated_formula = z3.Not(prop.z3_formula)
            solver.add(negated_formula)

            # Check satisfiability
            result = solver.check()
            proof_time = time.time() - start_time

            if result == z3.sat:
                # Found counterexample - property is false
                model_result = solver.model()
                counterexample = self._extract_counterexample(model_result) if model_result else {}
                return ProofResult(
                    proof_property=prop,
                    status=ProofStatus.DISPROVED,
                    proof_time=proof_time,
                    counterexample=counterexample,
                    z3_model=model_result,
                )
            elif result == z3.unsat:
                # No counterexample - property is true
                return ProofResult(
                    proof_property=prop,
                    status=ProofStatus.PROVED,
                    proof_time=proof_time,
                    verification_steps=[f"Z3 proved property in {proof_time:.3f}s"],
                )
            else:
                # Unknown result (timeout or undecidable)
                return ProofResult(
                    proof_property=prop,
                    status=ProofStatus.UNKNOWN,
                    proof_time=proof_time,
                    error_message="Z3 could not determine satisfiability",
                )

        except Exception as e:
            return ProofResult(
                proof_property=prop, status=ProofStatus.ERROR, proof_time=time.time() - start_time, error_message=str(e)
            )

    def verify_multiple_properties(self, properties: list[ProofProperty]) -> list[ProofResult]:
        """Verify multiple properties efficiently.

        Args:
            properties: List of properties to verify

        Returns:
            List of ProofResults
        """
        results = []

        for prop in properties:
            result = self.verify_property(prop)
            results.append(result)

            # Early termination if we find a safety violation
            if result.status == ProofStatus.DISPROVED:
                break

        return results

    def create_bounds_check_property(
        self, array_name: str, index_expr: str, array_size: int | str, context: dict[str, Any] | None = None
    ) -> ProofProperty:
        """Create a bounds checking property.

        Args:
            array_name: Name of the array
            index_expr: Index expression
            array_size: Size of the array
            context: Additional context for the property

        Returns:
            ProofProperty for bounds checking
        """
        if not self.z3_available:
            # Return mock property if Z3 not available
            return ProofProperty(
                name=f"bounds_check_{array_name}",
                property_type=PropertyType.BOUNDS_CHECKING,
                description=f"Index {index_expr} is within bounds of {array_name}[{array_size}]",
                z3_formula="mock_formula",
            )

        # Create Z3 variables
        index = z3.Int(f"index_{array_name}")
        size = z3.Int(f"size_{array_name}") if isinstance(array_size, str) else z3.IntVal(array_size)

        # Bounds check: 0 <= index < size
        bounds_check = z3.And(index >= 0, index < size)

        return ProofProperty(
            name=f"bounds_check_{array_name}",
            property_type=PropertyType.BOUNDS_CHECKING,
            description=f"Index {index_expr} is within bounds of {array_name}[{array_size}]",
            z3_formula=bounds_check,
            context=context,
        )

    def create_overflow_safety_property(
        self, operation: str, operands: list[str], result_type: str = "int", context: dict[str, Any] | None = None
    ) -> ProofProperty:
        """Create an overflow safety property.

        Args:
            operation: Type of operation (add, mul, sub, etc.)
            operands: List of operand names
            result_type: Result type for overflow bounds
            context: Additional context

        Returns:
            ProofProperty for overflow safety
        """
        if not self.z3_available:
            return ProofProperty(
                name=f"overflow_safety_{operation}",
                property_type=PropertyType.OVERFLOW_SAFETY,
                description=f"Operation {operation} on {operands} does not overflow",
                z3_formula="mock_formula",
            )

        # Create Z3 variables for operands
        z3_operands = [z3.Int(name) for name in operands]

        # Define bounds for different types
        type_bounds = {
            "int": (-(2**31), 2**31 - 1),
            "long": (-(2**63), 2**63 - 1),
            "short": (-(2**15), 2**15 - 1),
        }

        min_val, max_val = type_bounds.get(result_type, type_bounds["int"])

        # Create operation formula
        if operation == "add" and len(z3_operands) == 2:
            result = z3_operands[0] + z3_operands[1]
        elif operation == "mul" and len(z3_operands) == 2:
            result = z3_operands[0] * z3_operands[1]
        elif operation == "sub" and len(z3_operands) == 2:
            result = z3_operands[0] - z3_operands[1]
        else:
            raise ValueError(f"Unsupported operation: {operation}")

        # Overflow safety: result within type bounds
        overflow_safety = z3.And(result >= min_val, result <= max_val)

        return ProofProperty(
            name=f"overflow_safety_{operation}",
            property_type=PropertyType.OVERFLOW_SAFETY,
            description=f"Operation {operation} on {operands} does not overflow",
            z3_formula=overflow_safety,
            context=context,
        )

    def analyze_function_safety(self, context: AnalysisContext) -> list[ProofResult]:
        """Analyze a function for various safety properties.

        Args:
            context: Analysis context with AST and metadata

        Returns:
            List of verification results for safety properties
        """
        properties = []

        # Extract potential safety properties from AST
        safety_visitor = SafetyPropertyExtractor()
        safety_visitor.visit(context.ast_node)

        # Create properties for array accesses
        for array_access in safety_visitor.array_accesses:
            prop = self.create_bounds_check_property(
                array_access["array"], array_access["index"], array_access.get("size", "unknown")
            )
            properties.append(prop)

        # Create properties for arithmetic operations
        for operation in safety_visitor.arithmetic_ops:
            prop = self.create_overflow_safety_property(operation["op"], operation["operands"])
            properties.append(prop)

        # Verify all properties
        return self.verify_multiple_properties(properties)

    def _extract_counterexample(self, model: Any) -> dict[str, Any]:
        """Extract counterexample from Z3 model."""
        if not model:
            return {}

        counterexample = {}
        for decl in model.decls():
            counterexample[str(decl)] = str(model[decl])

        return counterexample

    def _create_bounds_check_template(self, **kwargs: Any) -> ProofProperty:
        """Template for bounds checking properties."""
        return self.create_bounds_check_property(**kwargs)

    def _create_null_pointer_template(self, **kwargs: Any) -> ProofProperty:
        """Template for null pointer safety properties."""
        # Implementation for null pointer checks
        return ProofProperty(
            name="null_pointer_safety",
            property_type=PropertyType.NULL_POINTER_SAFETY,
            description="Null pointer safety check",
            z3_formula="true",  # Placeholder formula
            context=kwargs,
        )

    def _create_overflow_template(self, **kwargs: Any) -> ProofProperty:
        """Template for overflow safety properties."""
        return self.create_overflow_safety_property(**kwargs)

    def _create_loop_invariant_template(self, **kwargs: Any) -> ProofProperty:
        """Template for loop invariant properties."""
        # Implementation for loop invariants
        return ProofProperty(
            name="loop_invariant",
            property_type=PropertyType.LOOP_INVARIANT,
            description="Loop invariant property",
            z3_formula="true",  # Placeholder formula
            context=kwargs,
        )

__init__(timeout=30000)

Initialize the theorem prover.

Parameters:

Name Type Description Default
timeout int

Timeout in milliseconds for Z3 solver

30000
Source code in src/multigen/frontend/verifiers/theorem_prover.py
def __init__(self, timeout: int = 30000):  # 30 seconds default
    """Initialize the theorem prover.

    Args:
        timeout: Timeout in milliseconds for Z3 solver
    """
    self.timeout = timeout
    self.z3_available = Z3_AVAILABLE
    self.solver = None

    if self.z3_available:
        self.solver = z3.Solver()
        self.solver.set("timeout", timeout)

    # Property templates for common verification patterns
    self.property_templates = {
        PropertyType.BOUNDS_CHECKING: self._create_bounds_check_template,
        PropertyType.NULL_POINTER_SAFETY: self._create_null_pointer_template,
        PropertyType.OVERFLOW_SAFETY: self._create_overflow_template,
        PropertyType.LOOP_INVARIANT: self._create_loop_invariant_template,
    }

analyze_function_safety(context)

Analyze a function for various safety properties.

Parameters:

Name Type Description Default
context AnalysisContext

Analysis context with AST and metadata

required

Returns:

Type Description
list[ProofResult]

List of verification results for safety properties

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def analyze_function_safety(self, context: AnalysisContext) -> list[ProofResult]:
    """Analyze a function for various safety properties.

    Args:
        context: Analysis context with AST and metadata

    Returns:
        List of verification results for safety properties
    """
    properties = []

    # Extract potential safety properties from AST
    safety_visitor = SafetyPropertyExtractor()
    safety_visitor.visit(context.ast_node)

    # Create properties for array accesses
    for array_access in safety_visitor.array_accesses:
        prop = self.create_bounds_check_property(
            array_access["array"], array_access["index"], array_access.get("size", "unknown")
        )
        properties.append(prop)

    # Create properties for arithmetic operations
    for operation in safety_visitor.arithmetic_ops:
        prop = self.create_overflow_safety_property(operation["op"], operation["operands"])
        properties.append(prop)

    # Verify all properties
    return self.verify_multiple_properties(properties)

create_bounds_check_property(array_name, index_expr, array_size, context=None)

Create a bounds checking property.

Parameters:

Name Type Description Default
array_name str

Name of the array

required
index_expr str

Index expression

required
array_size int | str

Size of the array

required
context dict[str, Any] | None

Additional context for the property

None

Returns:

Type Description
ProofProperty

ProofProperty for bounds checking

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def create_bounds_check_property(
    self, array_name: str, index_expr: str, array_size: int | str, context: dict[str, Any] | None = None
) -> ProofProperty:
    """Create a bounds checking property.

    Args:
        array_name: Name of the array
        index_expr: Index expression
        array_size: Size of the array
        context: Additional context for the property

    Returns:
        ProofProperty for bounds checking
    """
    if not self.z3_available:
        # Return mock property if Z3 not available
        return ProofProperty(
            name=f"bounds_check_{array_name}",
            property_type=PropertyType.BOUNDS_CHECKING,
            description=f"Index {index_expr} is within bounds of {array_name}[{array_size}]",
            z3_formula="mock_formula",
        )

    # Create Z3 variables
    index = z3.Int(f"index_{array_name}")
    size = z3.Int(f"size_{array_name}") if isinstance(array_size, str) else z3.IntVal(array_size)

    # Bounds check: 0 <= index < size
    bounds_check = z3.And(index >= 0, index < size)

    return ProofProperty(
        name=f"bounds_check_{array_name}",
        property_type=PropertyType.BOUNDS_CHECKING,
        description=f"Index {index_expr} is within bounds of {array_name}[{array_size}]",
        z3_formula=bounds_check,
        context=context,
    )

create_overflow_safety_property(operation, operands, result_type='int', context=None)

Create an overflow safety property.

Parameters:

Name Type Description Default
operation str

Type of operation (add, mul, sub, etc.)

required
operands list[str]

List of operand names

required
result_type str

Result type for overflow bounds

'int'
context dict[str, Any] | None

Additional context

None

Returns:

Type Description
ProofProperty

ProofProperty for overflow safety

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def create_overflow_safety_property(
    self, operation: str, operands: list[str], result_type: str = "int", context: dict[str, Any] | None = None
) -> ProofProperty:
    """Create an overflow safety property.

    Args:
        operation: Type of operation (add, mul, sub, etc.)
        operands: List of operand names
        result_type: Result type for overflow bounds
        context: Additional context

    Returns:
        ProofProperty for overflow safety
    """
    if not self.z3_available:
        return ProofProperty(
            name=f"overflow_safety_{operation}",
            property_type=PropertyType.OVERFLOW_SAFETY,
            description=f"Operation {operation} on {operands} does not overflow",
            z3_formula="mock_formula",
        )

    # Create Z3 variables for operands
    z3_operands = [z3.Int(name) for name in operands]

    # Define bounds for different types
    type_bounds = {
        "int": (-(2**31), 2**31 - 1),
        "long": (-(2**63), 2**63 - 1),
        "short": (-(2**15), 2**15 - 1),
    }

    min_val, max_val = type_bounds.get(result_type, type_bounds["int"])

    # Create operation formula
    if operation == "add" and len(z3_operands) == 2:
        result = z3_operands[0] + z3_operands[1]
    elif operation == "mul" and len(z3_operands) == 2:
        result = z3_operands[0] * z3_operands[1]
    elif operation == "sub" and len(z3_operands) == 2:
        result = z3_operands[0] - z3_operands[1]
    else:
        raise ValueError(f"Unsupported operation: {operation}")

    # Overflow safety: result within type bounds
    overflow_safety = z3.And(result >= min_val, result <= max_val)

    return ProofProperty(
        name=f"overflow_safety_{operation}",
        property_type=PropertyType.OVERFLOW_SAFETY,
        description=f"Operation {operation} on {operands} does not overflow",
        z3_formula=overflow_safety,
        context=context,
    )

verify_multiple_properties(properties)

Verify multiple properties efficiently.

Parameters:

Name Type Description Default
properties list[ProofProperty]

List of properties to verify

required

Returns:

Type Description
list[ProofResult]

List of ProofResults

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def verify_multiple_properties(self, properties: list[ProofProperty]) -> list[ProofResult]:
    """Verify multiple properties efficiently.

    Args:
        properties: List of properties to verify

    Returns:
        List of ProofResults
    """
    results = []

    for prop in properties:
        result = self.verify_property(prop)
        results.append(result)

        # Early termination if we find a safety violation
        if result.status == ProofStatus.DISPROVED:
            break

    return results

verify_property(prop)

Verify a single property using Z3.

Parameters:

Name Type Description Default
prop ProofProperty

Property to verify

required

Returns:

Type Description
ProofResult

ProofResult with verification outcome

Source code in src/multigen/frontend/verifiers/theorem_prover.py
def verify_property(self, prop: ProofProperty) -> ProofResult:
    """Verify a single property using Z3.

    Args:
        prop: Property to verify

    Returns:
        ProofResult with verification outcome
    """
    if not self.z3_available:
        return ProofResult(
            proof_property=prop,
            status=ProofStatus.ERROR,
            proof_time=0.0,
            error_message="Z3 not available. Install with: pip install z3-solver",
        )

    start_time = time.time()

    try:
        # Create fresh solver for this property
        solver = z3.Solver()
        solver.set("timeout", self.timeout)

        # Add the negation of the property (looking for counterexample)
        negated_formula = z3.Not(prop.z3_formula)
        solver.add(negated_formula)

        # Check satisfiability
        result = solver.check()
        proof_time = time.time() - start_time

        if result == z3.sat:
            # Found counterexample - property is false
            model_result = solver.model()
            counterexample = self._extract_counterexample(model_result) if model_result else {}
            return ProofResult(
                proof_property=prop,
                status=ProofStatus.DISPROVED,
                proof_time=proof_time,
                counterexample=counterexample,
                z3_model=model_result,
            )
        elif result == z3.unsat:
            # No counterexample - property is true
            return ProofResult(
                proof_property=prop,
                status=ProofStatus.PROVED,
                proof_time=proof_time,
                verification_steps=[f"Z3 proved property in {proof_time:.3f}s"],
            )
        else:
            # Unknown result (timeout or undecidable)
            return ProofResult(
                proof_property=prop,
                status=ProofStatus.UNKNOWN,
                proof_time=proof_time,
                error_message="Z3 could not determine satisfiability",
            )

    except Exception as e:
        return ProofResult(
            proof_property=prop, status=ProofStatus.ERROR, proof_time=time.time() - start_time, error_message=str(e)
        )

Correctness Prover

multigen.frontend.verifiers.correctness_prover

Algorithm Correctness Verification.

This module provides formal verification of algorithm correctness using preconditions, postconditions, loop invariants, and functional specifications.

AlgorithmProof dataclass

Result of algorithm correctness verification.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
@dataclass
class AlgorithmProof:
    """Result of algorithm correctness verification."""

    function_name: str
    specification: FormalSpecification
    correctness_type: CorrectnessPropertyType
    is_correct: bool
    proof_results: list[ProofResult]
    failed_properties: list[str]
    loop_analysis: dict[int, dict[str, Any]]
    verification_time: float
    confidence: float

    @property
    def summary(self) -> str:
        """Human-readable summary of the proof."""
        status = "CORRECT" if self.is_correct else "INCORRECT"
        return f"{self.function_name}: {self.correctness_type.value} - {status} (confidence: {self.confidence:.2f})"

summary property

Human-readable summary of the proof.

AlgorithmStructureExtractor

Bases: NodeVisitor

Extract algorithm structure for correctness verification.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
class AlgorithmStructureExtractor(ast.NodeVisitor):
    """Extract algorithm structure for correctness verification."""

    def __init__(self) -> None:
        self.loops: list[dict[str, Any]] = []
        self.recursive_calls: list[dict[str, Any]] = []
        self.assertions: list[dict[str, Any]] = []
        self.variables: set[str] = set()
        self.current_function: Optional[str] = None

    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
        """Track function context."""
        self.current_function = node.name
        self.generic_visit(node)

    def visit_For(self, node: ast.For) -> None:
        """Extract for loop information."""
        loop_info: dict[str, Any] = {
            "type": "for",
            "line": node.lineno,
            "variable": None,
            "iterable": None,
            "body_lines": [],
        }

        # Extract loop variable
        if isinstance(node.target, ast.Name):
            loop_info["variable"] = node.target.id

        # Extract iterable information
        if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
            if node.iter.func.id == "range":
                loop_info["iterable"] = "range"
                loop_info["range_args"] = len(node.iter.args)

        # Extract body line numbers
        for stmt in node.body:
            loop_info["body_lines"].append(stmt.lineno)

        self.loops.append(loop_info)
        self.generic_visit(node)

    def visit_While(self, node: ast.While) -> None:
        """Extract while loop information."""
        loop_info: dict[str, Any] = {
            "type": "while",
            "line": node.lineno,
            "condition": ast.unparse(node.test) if hasattr(ast, "unparse") else str(node.test),
            "body_lines": [],
        }

        # Extract body line numbers
        for stmt in node.body:
            loop_info["body_lines"].append(stmt.lineno)

        self.loops.append(loop_info)
        self.generic_visit(node)

    def visit_Call(self, node: ast.Call) -> None:
        """Extract function calls (including recursive calls)."""
        if isinstance(node.func, ast.Name):
            if node.func.id == self.current_function:
                # Recursive call
                self.recursive_calls.append({"line": node.lineno, "function": node.func.id, "args": len(node.args)})

        self.generic_visit(node)

    def visit_Assert(self, node: ast.Assert) -> None:
        """Extract assertion statements."""
        assertion_info: dict[str, Any] = {
            "line": node.lineno,
            "condition": ast.unparse(node.test) if hasattr(ast, "unparse") else str(node.test),
            "message": None,
        }

        if node.msg:
            assertion_info["message"] = ast.unparse(node.msg) if hasattr(ast, "unparse") else str(node.msg)

        self.assertions.append(assertion_info)
        self.generic_visit(node)

    def visit_Name(self, node: ast.Name) -> None:
        """Extract variable names."""
        self.variables.add(node.id)
        self.generic_visit(node)

    def get_loop_info(self, line_no: int) -> Optional[dict[str, Any]]:
        """Get loop information for a specific line number."""
        for loop in self.loops:
            if loop["line"] == line_no:
                return loop
        return None

get_loop_info(line_no)

Get loop information for a specific line number.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def get_loop_info(self, line_no: int) -> Optional[dict[str, Any]]:
    """Get loop information for a specific line number."""
    for loop in self.loops:
        if loop["line"] == line_no:
            return loop
    return None

visit_Assert(node)

Extract assertion statements.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_Assert(self, node: ast.Assert) -> None:
    """Extract assertion statements."""
    assertion_info: dict[str, Any] = {
        "line": node.lineno,
        "condition": ast.unparse(node.test) if hasattr(ast, "unparse") else str(node.test),
        "message": None,
    }

    if node.msg:
        assertion_info["message"] = ast.unparse(node.msg) if hasattr(ast, "unparse") else str(node.msg)

    self.assertions.append(assertion_info)
    self.generic_visit(node)

visit_Call(node)

Extract function calls (including recursive calls).

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_Call(self, node: ast.Call) -> None:
    """Extract function calls (including recursive calls)."""
    if isinstance(node.func, ast.Name):
        if node.func.id == self.current_function:
            # Recursive call
            self.recursive_calls.append({"line": node.lineno, "function": node.func.id, "args": len(node.args)})

    self.generic_visit(node)

visit_For(node)

Extract for loop information.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_For(self, node: ast.For) -> None:
    """Extract for loop information."""
    loop_info: dict[str, Any] = {
        "type": "for",
        "line": node.lineno,
        "variable": None,
        "iterable": None,
        "body_lines": [],
    }

    # Extract loop variable
    if isinstance(node.target, ast.Name):
        loop_info["variable"] = node.target.id

    # Extract iterable information
    if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
        if node.iter.func.id == "range":
            loop_info["iterable"] = "range"
            loop_info["range_args"] = len(node.iter.args)

    # Extract body line numbers
    for stmt in node.body:
        loop_info["body_lines"].append(stmt.lineno)

    self.loops.append(loop_info)
    self.generic_visit(node)

visit_FunctionDef(node)

Track function context.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
    """Track function context."""
    self.current_function = node.name
    self.generic_visit(node)

visit_Name(node)

Extract variable names.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_Name(self, node: ast.Name) -> None:
    """Extract variable names."""
    self.variables.add(node.id)
    self.generic_visit(node)

visit_While(node)

Extract while loop information.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def visit_While(self, node: ast.While) -> None:
    """Extract while loop information."""
    loop_info: dict[str, Any] = {
        "type": "while",
        "line": node.lineno,
        "condition": ast.unparse(node.test) if hasattr(ast, "unparse") else str(node.test),
        "body_lines": [],
    }

    # Extract body line numbers
    for stmt in node.body:
        loop_info["body_lines"].append(stmt.lineno)

    self.loops.append(loop_info)
    self.generic_visit(node)

CorrectnessPropertyType

Bases: Enum

Types of correctness properties.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
class CorrectnessPropertyType(Enum):
    """Types of correctness properties."""

    PRECONDITION = "precondition"
    POSTCONDITION = "postcondition"
    LOOP_INVARIANT = "loop_invariant"
    ASSERTION = "assertion"
    FUNCTIONAL_CORRECTNESS = "functional_correctness"
    TERMINATION = "termination"
    PARTIAL_CORRECTNESS = "partial_correctness"
    TOTAL_CORRECTNESS = "total_correctness"

CorrectnessProver

Formal verification of algorithm correctness.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class CorrectnessProver:
    """Formal verification of algorithm correctness."""

    def __init__(self, theorem_prover: Optional[TheoremProver] = None):
        """Initialize the correctness prover.

        Args:
            theorem_prover: Z3 theorem prover instance
        """
        self.theorem_prover = theorem_prover or TheoremProver()
        self.z3_available = Z3_AVAILABLE

        # Built-in algorithm specifications
        self.algorithm_specs = {
            "factorial": self._factorial_specification,
            "fibonacci": self._fibonacci_specification,
            "binary_search": self._binary_search_specification,
            "merge_sort": self._merge_sort_specification,
            "quicksort": self._quicksort_specification,
        }

    def verify_algorithm_correctness(
        self, context: AnalysisContext, specification: Optional[FormalSpecification] = None
    ) -> AlgorithmProof:
        """Verify algorithm correctness using formal methods.

        Args:
            context: Analysis context with AST and metadata
            specification: Optional formal specification

        Returns:
            AlgorithmProof with verification results
        """
        start_time = time.time()

        function_name = self._extract_function_name(context)

        # Auto-detect specification if not provided
        if specification is None:
            specification = self._infer_specification(function_name, context)

        # Extract algorithm structure
        algorithm_extractor = AlgorithmStructureExtractor()
        algorithm_extractor.visit(context.ast_node)

        # Verify different aspects of correctness
        all_proof_results = []
        failed_properties = []

        # 1. Verify preconditions
        precondition_results = self._verify_preconditions(specification, context)
        all_proof_results.extend(precondition_results)

        # 2. Verify postconditions
        postcondition_results = self._verify_postconditions(specification, context)
        all_proof_results.extend(postcondition_results)

        # 3. Verify loop invariants
        loop_results, loop_analysis = self._verify_loop_invariants(specification, algorithm_extractor, context)
        all_proof_results.extend(loop_results)

        # 4. Verify termination
        termination_results = self._verify_termination(specification, algorithm_extractor, context)
        all_proof_results.extend(termination_results)

        # 5. Verify functional correctness if specified
        if specification.functional_spec:
            functional_results = self._verify_functional_correctness(specification, context)
            all_proof_results.extend(functional_results)

        # Determine overall correctness
        is_correct = all(result.is_verified for result in all_proof_results)

        # Collect failed properties
        for result in all_proof_results:
            if not result.is_verified:
                failed_properties.append(result.proof_property.name)

        # Calculate confidence
        confidence = self._calculate_correctness_confidence(all_proof_results)

        verification_time = time.time() - start_time

        return AlgorithmProof(
            function_name=function_name,
            specification=specification,
            correctness_type=CorrectnessPropertyType.TOTAL_CORRECTNESS,
            is_correct=is_correct,
            proof_results=all_proof_results,
            failed_properties=failed_properties,
            loop_analysis=loop_analysis,
            verification_time=verification_time,
            confidence=confidence,
        )

    def _verify_preconditions(self, spec: FormalSpecification, context: AnalysisContext) -> list[ProofResult]:
        """Verify that preconditions are satisfied."""
        results = []

        for i, precondition in enumerate(spec.preconditions):
            prop = self._create_precondition_property(f"{spec.name}_pre_{i}", precondition)
            result = self.theorem_prover.verify_property(prop)
            results.append(result)

        return results

    def _verify_postconditions(self, spec: FormalSpecification, context: AnalysisContext) -> list[ProofResult]:
        """Verify that postconditions are satisfied."""
        results = []

        for i, postcondition in enumerate(spec.postconditions):
            prop = self._create_postcondition_property(f"{spec.name}_post_{i}", postcondition)
            result = self.theorem_prover.verify_property(prop)
            results.append(result)

        return results

    def _verify_loop_invariants(
        self, spec: FormalSpecification, extractor: "AlgorithmStructureExtractor", context: AnalysisContext
    ) -> tuple[list[ProofResult], dict[int, dict[str, Any]]]:
        """Verify loop invariants."""
        results = []
        loop_analysis = {}

        for line_no, invariants in spec.loop_invariants.items():
            loop_info = extractor.get_loop_info(line_no)
            if loop_info:
                loop_analysis[line_no] = loop_info

                for i, invariant in enumerate(invariants):
                    # Verify invariant holds at loop entry
                    entry_prop = self._create_loop_invariant_property(
                        f"{spec.name}_loop_{line_no}_entry_{i}", invariant, "entry", loop_info
                    )
                    entry_result = self.theorem_prover.verify_property(entry_prop)
                    results.append(entry_result)

                    # Verify invariant is maintained
                    maintenance_prop = self._create_loop_invariant_property(
                        f"{spec.name}_loop_{line_no}_maintenance_{i}", invariant, "maintenance", loop_info
                    )
                    maintenance_result = self.theorem_prover.verify_property(maintenance_prop)
                    results.append(maintenance_result)

        return results, loop_analysis

    def _verify_termination(
        self, spec: FormalSpecification, extractor: "AlgorithmStructureExtractor", context: AnalysisContext
    ) -> list[ProofResult]:
        """Verify algorithm termination."""
        results = []

        # Verify termination conditions
        for i, termination_condition in enumerate(spec.termination_conditions):
            prop = self._create_termination_property(f"{spec.name}_termination_{i}", termination_condition)
            result = self.theorem_prover.verify_property(prop)
            results.append(result)

        # Verify loop termination using ranking functions
        for loop_info in extractor.loops:
            ranking_prop = self._create_ranking_function_property(loop_info)
            result = self.theorem_prover.verify_property(ranking_prop)
            results.append(result)

        return results

    def _verify_functional_correctness(self, spec: FormalSpecification, context: AnalysisContext) -> list[ProofResult]:
        """Verify functional correctness specification."""
        func_spec = spec.functional_spec if spec.functional_spec is not None else ""
        prop = self._create_functional_correctness_property(spec.name, func_spec)
        result = self.theorem_prover.verify_property(prop)
        return [result]

    def _create_precondition_property(self, name: str, precondition: str) -> ProofProperty:
        """Create a precondition verification property."""
        if not self.z3_available:
            return ProofProperty(
                name=name,
                property_type=PropertyType.PRECONDITION_SATISFACTION,
                description=f"Precondition: {precondition}",
                z3_formula="mock_formula",
            )

        # Parse precondition into Z3 formula
        z3_formula = self._parse_condition_to_z3(precondition)

        return ProofProperty(
            name=name,
            property_type=PropertyType.PRECONDITION_SATISFACTION,
            description=f"Precondition: {precondition}",
            z3_formula=z3_formula,
        )

    def _create_postcondition_property(self, name: str, postcondition: str) -> ProofProperty:
        """Create a postcondition verification property."""
        if not self.z3_available:
            return ProofProperty(
                name=name,
                property_type=PropertyType.POSTCONDITION_SATISFACTION,
                description=f"Postcondition: {postcondition}",
                z3_formula="mock_formula",
            )

        z3_formula = self._parse_condition_to_z3(postcondition)

        return ProofProperty(
            name=name,
            property_type=PropertyType.POSTCONDITION_SATISFACTION,
            description=f"Postcondition: {postcondition}",
            z3_formula=z3_formula,
        )

    def _create_loop_invariant_property(
        self, name: str, invariant: str, phase: str, loop_info: dict[str, Any]
    ) -> ProofProperty:
        """Create a loop invariant verification property."""
        if not self.z3_available:
            return ProofProperty(
                name=name,
                property_type=PropertyType.LOOP_INVARIANT,
                description=f"Loop invariant ({phase}): {invariant}",
                z3_formula="mock_formula",
            )

        z3_formula = self._parse_condition_to_z3(invariant)

        return ProofProperty(
            name=name,
            property_type=PropertyType.LOOP_INVARIANT,
            description=f"Loop invariant ({phase}): {invariant}",
            z3_formula=z3_formula,
            context={"loop_info": loop_info, "phase": phase},
        )

    def _create_termination_property(self, name: str, termination_condition: str) -> ProofProperty:
        """Create a termination verification property."""
        if not self.z3_available:
            return ProofProperty(
                name=name,
                property_type=PropertyType.TERMINATION,
                description=f"Termination: {termination_condition}",
                z3_formula="mock_formula",
            )

        z3_formula = self._parse_condition_to_z3(termination_condition)

        return ProofProperty(
            name=name,
            property_type=PropertyType.TERMINATION,
            description=f"Termination: {termination_condition}",
            z3_formula=z3_formula,
        )

    def _create_ranking_function_property(self, loop_info: dict[str, Any]) -> ProofProperty:
        """Create a ranking function property for loop termination."""
        if not self.z3_available:
            return ProofProperty(
                name=f"ranking_function_loop_{loop_info.get('line', 'unknown')}",
                property_type=PropertyType.TERMINATION,
                description="Ranking function ensures loop termination",
                z3_formula="mock_formula",
            )

        # Create a simple ranking function based on loop variable
        loop_var = loop_info.get("variable", "i")
        ranking_var = z3.Int(f"ranking_{loop_var}")

        # Ranking function must be non-negative and decrease each iteration
        ranking_formula: Any = z3.And(
            ranking_var >= 0,
            # Additional constraints would be added based on loop analysis
        )

        return ProofProperty(
            name=f"ranking_function_loop_{loop_info.get('line', 'unknown')}",
            property_type=PropertyType.TERMINATION,
            description="Ranking function ensures loop termination",
            z3_formula=ranking_formula,
            context={"loop_info": loop_info},
        )

    def _create_functional_correctness_property(self, name: str, functional_spec: str) -> ProofProperty:
        """Create a functional correctness property."""
        if not self.z3_available:
            return ProofProperty(
                name=f"{name}_functional_correctness",
                property_type=PropertyType.FUNCTIONAL_CORRECTNESS,
                description=f"Functional correctness: {functional_spec}",
                z3_formula="mock_formula",
            )

        z3_formula = self._parse_condition_to_z3(functional_spec)

        return ProofProperty(
            name=f"{name}_functional_correctness",
            property_type=PropertyType.FUNCTIONAL_CORRECTNESS,
            description=f"Functional correctness: {functional_spec}",
            z3_formula=z3_formula,
        )

    def _parse_condition_to_z3(self, condition: str) -> Any:
        """Parse a textual condition into a Z3 formula."""
        if not self.z3_available:
            return "mock_formula"

        # Simplified parser - in practice, this would be more sophisticated
        # For now, return a placeholder formula
        return z3.Bool(f"condition_{hash(condition) % 1000}")

    def _extract_function_name(self, context: AnalysisContext) -> str:
        """Extract function name from context."""
        if context.analysis_result and hasattr(context.analysis_result, "functions"):
            if context.analysis_result.functions:
                return list(context.analysis_result.functions.keys())[0]
        elif hasattr(context.ast_node, "name"):
            name: str = context.ast_node.name
            return name
        return "unknown_function"

    def _infer_specification(self, function_name: str, context: AnalysisContext) -> FormalSpecification:
        """Infer formal specification for known algorithms."""
        if function_name in self.algorithm_specs:
            return self.algorithm_specs[function_name]()

        # Create a basic specification
        return FormalSpecification(
            name=function_name,
            preconditions=["true"],  # Default: no preconditions
            postconditions=["true"],  # Default: no postconditions
            loop_invariants={},
            assertions={},
            termination_conditions=["true"],
        )

    def _calculate_correctness_confidence(self, proof_results: list[ProofResult]) -> float:
        """Calculate confidence in correctness verification."""
        if not proof_results:
            return 0.0

        proved_count = sum(1 for result in proof_results if result.status == ProofStatus.PROVED)
        total_count = len(proof_results)

        return proved_count / total_count

    # Built-in algorithm specifications

    def _factorial_specification(self) -> FormalSpecification:
        """Formal specification for factorial function."""
        return FormalSpecification(
            name="factorial",
            preconditions=["n >= 0"],
            postconditions=["result >= 1", "result == factorial_math(n)"],
            loop_invariants={},
            assertions={},
            termination_conditions=["n decreases towards 0"],
            functional_spec="result == n! (mathematical factorial)",
        )

    def _fibonacci_specification(self) -> FormalSpecification:
        """Formal specification for Fibonacci function."""
        return FormalSpecification(
            name="fibonacci",
            preconditions=["n >= 0"],
            postconditions=["result >= 0", "result == fib_math(n)"],
            loop_invariants={},
            assertions={},
            termination_conditions=["n decreases towards base case"],
            functional_spec="result == nth Fibonacci number",
        )

    def _binary_search_specification(self) -> FormalSpecification:
        """Formal specification for binary search."""
        return FormalSpecification(
            name="binary_search",
            preconditions=["array is sorted", "0 <= left <= right < len(array)"],
            postconditions=["result == -1 OR array[result] == target"],
            loop_invariants={
                # Line numbers would be determined during analysis
            },
            assertions={},
            termination_conditions=["search space decreases each iteration"],
            functional_spec="finds target in sorted array or returns -1",
        )

    def _merge_sort_specification(self) -> FormalSpecification:
        """Formal specification for merge sort."""
        return FormalSpecification(
            name="merge_sort",
            preconditions=["array is valid"],
            postconditions=["array is sorted", "array contains same elements as input"],
            loop_invariants={},
            assertions={},
            termination_conditions=["recursive calls reduce problem size"],
            functional_spec="sorts array in O(n log n) time",
        )

    def _quicksort_specification(self) -> FormalSpecification:
        """Formal specification for quicksort."""
        return FormalSpecification(
            name="quicksort",
            preconditions=["array is valid", "left <= right"],
            postconditions=["array[left:right+1] is sorted", "permutation preserved"],
            loop_invariants={},
            assertions={},
            termination_conditions=["partition reduces problem size"],
            functional_spec="sorts array in average O(n log n) time",
        )

__init__(theorem_prover=None)

Initialize the correctness prover.

Parameters:

Name Type Description Default
theorem_prover Optional[TheoremProver]

Z3 theorem prover instance

None
Source code in src/multigen/frontend/verifiers/correctness_prover.py
def __init__(self, theorem_prover: Optional[TheoremProver] = None):
    """Initialize the correctness prover.

    Args:
        theorem_prover: Z3 theorem prover instance
    """
    self.theorem_prover = theorem_prover or TheoremProver()
    self.z3_available = Z3_AVAILABLE

    # Built-in algorithm specifications
    self.algorithm_specs = {
        "factorial": self._factorial_specification,
        "fibonacci": self._fibonacci_specification,
        "binary_search": self._binary_search_specification,
        "merge_sort": self._merge_sort_specification,
        "quicksort": self._quicksort_specification,
    }

verify_algorithm_correctness(context, specification=None)

Verify algorithm correctness using formal methods.

Parameters:

Name Type Description Default
context AnalysisContext

Analysis context with AST and metadata

required
specification Optional[FormalSpecification]

Optional formal specification

None

Returns:

Type Description
AlgorithmProof

AlgorithmProof with verification results

Source code in src/multigen/frontend/verifiers/correctness_prover.py
def verify_algorithm_correctness(
    self, context: AnalysisContext, specification: Optional[FormalSpecification] = None
) -> AlgorithmProof:
    """Verify algorithm correctness using formal methods.

    Args:
        context: Analysis context with AST and metadata
        specification: Optional formal specification

    Returns:
        AlgorithmProof with verification results
    """
    start_time = time.time()

    function_name = self._extract_function_name(context)

    # Auto-detect specification if not provided
    if specification is None:
        specification = self._infer_specification(function_name, context)

    # Extract algorithm structure
    algorithm_extractor = AlgorithmStructureExtractor()
    algorithm_extractor.visit(context.ast_node)

    # Verify different aspects of correctness
    all_proof_results = []
    failed_properties = []

    # 1. Verify preconditions
    precondition_results = self._verify_preconditions(specification, context)
    all_proof_results.extend(precondition_results)

    # 2. Verify postconditions
    postcondition_results = self._verify_postconditions(specification, context)
    all_proof_results.extend(postcondition_results)

    # 3. Verify loop invariants
    loop_results, loop_analysis = self._verify_loop_invariants(specification, algorithm_extractor, context)
    all_proof_results.extend(loop_results)

    # 4. Verify termination
    termination_results = self._verify_termination(specification, algorithm_extractor, context)
    all_proof_results.extend(termination_results)

    # 5. Verify functional correctness if specified
    if specification.functional_spec:
        functional_results = self._verify_functional_correctness(specification, context)
        all_proof_results.extend(functional_results)

    # Determine overall correctness
    is_correct = all(result.is_verified for result in all_proof_results)

    # Collect failed properties
    for result in all_proof_results:
        if not result.is_verified:
            failed_properties.append(result.proof_property.name)

    # Calculate confidence
    confidence = self._calculate_correctness_confidence(all_proof_results)

    verification_time = time.time() - start_time

    return AlgorithmProof(
        function_name=function_name,
        specification=specification,
        correctness_type=CorrectnessPropertyType.TOTAL_CORRECTNESS,
        is_correct=is_correct,
        proof_results=all_proof_results,
        failed_properties=failed_properties,
        loop_analysis=loop_analysis,
        verification_time=verification_time,
        confidence=confidence,
    )

FormalSpecification dataclass

Formal specification of a function or algorithm.

Source code in src/multigen/frontend/verifiers/correctness_prover.py
@dataclass
class FormalSpecification:
    """Formal specification of a function or algorithm."""

    name: str
    preconditions: list[str]  # Conditions that must hold at function entry
    postconditions: list[str]  # Conditions that must hold at function exit
    loop_invariants: dict[int, list[str]]  # Line number -> invariants
    assertions: dict[int, str]  # Line number -> assertion
    termination_conditions: list[str]  # Conditions ensuring termination
    functional_spec: Optional[str] = None  # High-level functional specification

Performance Analyzer

multigen.frontend.verifiers.performance_analyzer

Performance Bound Analysis and Verification.

This module provides formal analysis of algorithm performance bounds, complexity verification, and resource usage guarantees.

AlgorithmStructureExtractor

Bases: NodeVisitor

Extract algorithm structure for performance analysis.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
class AlgorithmStructureExtractor(ast.NodeVisitor):
    """Extract algorithm structure for performance analysis."""

    def __init__(self) -> None:
        self.loops: list[dict[str, Any]] = []
        self.recursive_calls: list[dict[str, Any]] = []
        self.function_calls: list[dict[str, Any]] = []
        self.variables: set[str] = set()
        self.current_function: Optional[str] = None

    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
        """Visit a function definition node."""
        self.current_function = node.name
        self.generic_visit(node)

    def visit_For(self, node: ast.For) -> None:
        """Visit a for loop node."""
        self.loops.append({"type": "for", "line": node.lineno, "iterable": self._extract_iterable_info(node.iter)})
        self.generic_visit(node)

    def visit_While(self, node: ast.While) -> None:
        """Visit a while loop node."""
        self.loops.append({"type": "while", "line": node.lineno})
        self.generic_visit(node)

    def visit_Call(self, node: ast.Call) -> None:
        """Visit a function call node."""
        if isinstance(node.func, ast.Name):
            if node.func.id == self.current_function:
                self.recursive_calls.append({"line": node.lineno, "function": node.func.id})
            else:
                self.function_calls.append({"line": node.lineno, "function": node.func.id})
        self.generic_visit(node)

    def _extract_iterable_info(self, iter_node: ast.expr) -> str:
        if isinstance(iter_node, ast.Call) and isinstance(iter_node.func, ast.Name):
            if iter_node.func.id == "range":
                return "range"
        return "unknown"

visit_Call(node)

Visit a function call node.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_Call(self, node: ast.Call) -> None:
    """Visit a function call node."""
    if isinstance(node.func, ast.Name):
        if node.func.id == self.current_function:
            self.recursive_calls.append({"line": node.lineno, "function": node.func.id})
        else:
            self.function_calls.append({"line": node.lineno, "function": node.func.id})
    self.generic_visit(node)

visit_For(node)

Visit a for loop node.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_For(self, node: ast.For) -> None:
    """Visit a for loop node."""
    self.loops.append({"type": "for", "line": node.lineno, "iterable": self._extract_iterable_info(node.iter)})
    self.generic_visit(node)

visit_FunctionDef(node)

Visit a function definition node.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
    """Visit a function definition node."""
    self.current_function = node.name
    self.generic_visit(node)

visit_While(node)

Visit a while loop node.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_While(self, node: ast.While) -> None:
    """Visit a while loop node."""
    self.loops.append({"type": "while", "line": node.lineno})
    self.generic_visit(node)

ComplexityClass

Bases: Enum

Algorithm complexity classes.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
class ComplexityClass(Enum):
    """Algorithm complexity classes."""

    CONSTANT = "O(1)"
    LOGARITHMIC = "O(log n)"
    LINEAR = "O(n)"
    LINEARITHMIC = "O(n log n)"
    QUADRATIC = "O(n²)"
    CUBIC = "O(n³)"
    POLYNOMIAL = "O(n^k)"
    EXPONENTIAL = "O(2^n)"
    FACTORIAL = "O(n!)"
    UNKNOWN = "O(?)"

ComplexityPatternAnalyzer

Bases: NodeVisitor

Analyze complexity patterns in code.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
class ComplexityPatternAnalyzer(ast.NodeVisitor):
    """Analyze complexity patterns in code."""

    def __init__(self) -> None:
        self.loop_depth = 0
        self.max_loop_depth = 0
        self.loops: list[dict[str, Any]] = []
        self.array_accesses: list[dict[str, Any]] = []
        self.array_allocations: list[dict[str, Any]] = []
        self.function_calls: list[dict[str, Any]] = []
        self.current_depth = 0

    def visit_For(self, node: ast.For) -> None:
        """Visit a for loop node and track nesting depth."""
        self.current_depth += 1
        self.loop_depth = max(self.loop_depth, self.current_depth)
        self.max_loop_depth = max(self.max_loop_depth, self.current_depth)

        loop_info = {"type": "for", "depth": self.current_depth, "line": node.lineno}

        if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
            if node.iter.func.id == "range":
                loop_info["iterable"] = "range"

        self.loops.append(loop_info)
        self.generic_visit(node)
        self.current_depth -= 1

    def visit_While(self, node: ast.While) -> None:
        """Visit a while loop node and track nesting depth."""
        self.current_depth += 1
        self.loop_depth = max(self.loop_depth, self.current_depth)
        self.max_loop_depth = max(self.max_loop_depth, self.current_depth)

        self.loops.append({"type": "while", "depth": self.current_depth, "line": node.lineno})

        self.generic_visit(node)
        self.current_depth -= 1

    def visit_Subscript(self, node: ast.Subscript) -> None:
        """Visit a subscript node and track array accesses."""
        if isinstance(node.value, ast.Name):
            self.array_accesses.append({"array": node.value.id, "line": node.lineno, "depth": self.current_depth})
        self.generic_visit(node)

    def visit_Assign(self, node: ast.Assign) -> None:
        """Visit an assignment node and track array allocations."""
        # Check for array/list allocations
        if isinstance(node.value, ast.List):
            if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
                self.array_allocations.append(
                    {"variable": node.targets[0].id, "size": len(node.value.elts), "line": node.lineno}
                )
        self.generic_visit(node)

    def visit_Call(self, node: ast.Call) -> None:
        """Visit a function call node and track function calls."""
        if isinstance(node.func, ast.Name):
            self.function_calls.append({"function": node.func.id, "line": node.lineno, "depth": self.current_depth})
        self.generic_visit(node)

visit_Assign(node)

Visit an assignment node and track array allocations.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_Assign(self, node: ast.Assign) -> None:
    """Visit an assignment node and track array allocations."""
    # Check for array/list allocations
    if isinstance(node.value, ast.List):
        if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name):
            self.array_allocations.append(
                {"variable": node.targets[0].id, "size": len(node.value.elts), "line": node.lineno}
            )
    self.generic_visit(node)

visit_Call(node)

Visit a function call node and track function calls.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_Call(self, node: ast.Call) -> None:
    """Visit a function call node and track function calls."""
    if isinstance(node.func, ast.Name):
        self.function_calls.append({"function": node.func.id, "line": node.lineno, "depth": self.current_depth})
    self.generic_visit(node)

visit_For(node)

Visit a for loop node and track nesting depth.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_For(self, node: ast.For) -> None:
    """Visit a for loop node and track nesting depth."""
    self.current_depth += 1
    self.loop_depth = max(self.loop_depth, self.current_depth)
    self.max_loop_depth = max(self.max_loop_depth, self.current_depth)

    loop_info = {"type": "for", "depth": self.current_depth, "line": node.lineno}

    if isinstance(node.iter, ast.Call) and isinstance(node.iter.func, ast.Name):
        if node.iter.func.id == "range":
            loop_info["iterable"] = "range"

    self.loops.append(loop_info)
    self.generic_visit(node)
    self.current_depth -= 1

visit_Subscript(node)

Visit a subscript node and track array accesses.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_Subscript(self, node: ast.Subscript) -> None:
    """Visit a subscript node and track array accesses."""
    if isinstance(node.value, ast.Name):
        self.array_accesses.append({"array": node.value.id, "line": node.lineno, "depth": self.current_depth})
    self.generic_visit(node)

visit_While(node)

Visit a while loop node and track nesting depth.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def visit_While(self, node: ast.While) -> None:
    """Visit a while loop node and track nesting depth."""
    self.current_depth += 1
    self.loop_depth = max(self.loop_depth, self.current_depth)
    self.max_loop_depth = max(self.max_loop_depth, self.current_depth)

    self.loops.append({"type": "while", "depth": self.current_depth, "line": node.lineno})

    self.generic_visit(node)
    self.current_depth -= 1

PerformanceAnalysisResult dataclass

Result of performance bound analysis.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
@dataclass
class PerformanceAnalysisResult:
    """Result of performance bound analysis."""

    function_name: str
    proven_bounds: dict[ResourceType, PerformanceBound]
    complexity_proofs: list[ProofResult]
    resource_usage: dict[str, Any]
    bottlenecks: list[str]
    optimization_opportunities: list[str]
    verification_time: float
    confidence: float

    @property
    def time_complexity(self) -> ComplexityClass:
        """Get the time complexity of the algorithm."""
        time_bound = self.proven_bounds.get(ResourceType.TIME)
        return time_bound.complexity_class if time_bound else ComplexityClass.UNKNOWN

    @property
    def space_complexity(self) -> ComplexityClass:
        """Get the space complexity of the algorithm."""
        space_bound = self.proven_bounds.get(ResourceType.SPACE)
        return space_bound.complexity_class if space_bound else ComplexityClass.UNKNOWN

space_complexity property

Get the space complexity of the algorithm.

time_complexity property

Get the time complexity of the algorithm.

PerformanceAnalyzer

Formal analysis of algorithm performance bounds.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class PerformanceAnalyzer:
    """Formal analysis of algorithm performance bounds."""

    def __init__(self, theorem_prover: Optional[TheoremProver] = None):
        """Initialize the performance analyzer.

        Args:
            theorem_prover: Z3 theorem prover instance
        """
        self.theorem_prover = theorem_prover or TheoremProver()
        self.z3_available = Z3_AVAILABLE

        # Complexity analysis patterns
        self.complexity_patterns = {
            "single_loop": ComplexityClass.LINEAR,
            "nested_loops_2": ComplexityClass.QUADRATIC,
            "nested_loops_3": ComplexityClass.CUBIC,
            "divide_conquer": ComplexityClass.LINEARITHMIC,
            "binary_search": ComplexityClass.LOGARITHMIC,
            "recursive_factorial": ComplexityClass.LINEAR,
            "recursive_fibonacci": ComplexityClass.EXPONENTIAL,
        }

        # Known algorithm complexities
        self.algorithm_complexities = {
            "factorial": {
                ResourceType.TIME: ComplexityClass.LINEAR,
                ResourceType.SPACE: ComplexityClass.LINEAR,  # For recursive version
            },
            "fibonacci": {ResourceType.TIME: ComplexityClass.EXPONENTIAL, ResourceType.SPACE: ComplexityClass.LINEAR},
            "binary_search": {
                ResourceType.TIME: ComplexityClass.LOGARITHMIC,
                ResourceType.SPACE: ComplexityClass.CONSTANT,
            },
            "merge_sort": {ResourceType.TIME: ComplexityClass.LINEARITHMIC, ResourceType.SPACE: ComplexityClass.LINEAR},
            "quicksort": {
                ResourceType.TIME: ComplexityClass.LINEARITHMIC,  # Average case
                ResourceType.SPACE: ComplexityClass.LOGARITHMIC,
            },
        }

    def analyze_performance_bounds(self, context: AnalysisContext) -> PerformanceAnalysisResult:
        """Analyze performance bounds for a function.

        Args:
            context: Analysis context with AST and metadata

        Returns:
            PerformanceAnalysisResult with proven bounds
        """
        start_time = time.time()

        function_name = self._extract_function_name(context)

        # Extract algorithm structure
        structure_extractor = AlgorithmStructureExtractor()
        structure_extractor.visit(context.ast_node)

        # Analyze complexity patterns
        complexity_analyzer = ComplexityPatternAnalyzer()
        complexity_analyzer.visit(context.ast_node)

        # Determine complexity bounds
        proven_bounds = {}
        complexity_proofs = []

        # Time complexity analysis
        time_bound, time_proofs = self._analyze_time_complexity(function_name, structure_extractor, complexity_analyzer)
        if time_bound:
            proven_bounds[ResourceType.TIME] = time_bound
            complexity_proofs.extend(time_proofs)

        # Space complexity analysis
        space_bound, space_proofs = self._analyze_space_complexity(
            function_name, structure_extractor, complexity_analyzer
        )
        if space_bound:
            proven_bounds[ResourceType.SPACE] = space_bound
            complexity_proofs.extend(space_proofs)

        # Memory access analysis
        memory_bound, memory_proofs = self._analyze_memory_complexity(structure_extractor, complexity_analyzer)
        if memory_bound:
            proven_bounds[ResourceType.MEMORY] = memory_bound
            complexity_proofs.extend(memory_proofs)

        # Resource usage analysis
        resource_usage = self._analyze_resource_usage(structure_extractor, complexity_analyzer)

        # Identify bottlenecks
        bottlenecks = self._identify_bottlenecks(structure_extractor, complexity_analyzer)

        # Find optimization opportunities
        optimization_opportunities = self._find_optimization_opportunities(
            structure_extractor, complexity_analyzer, proven_bounds
        )

        # Calculate confidence
        confidence = self._calculate_performance_confidence(complexity_proofs)

        verification_time = time.time() - start_time

        return PerformanceAnalysisResult(
            function_name=function_name,
            proven_bounds=proven_bounds,
            complexity_proofs=complexity_proofs,
            resource_usage=resource_usage,
            bottlenecks=bottlenecks,
            optimization_opportunities=optimization_opportunities,
            verification_time=verification_time,
            confidence=confidence,
        )

    def _analyze_time_complexity(
        self, function_name: str, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> tuple[Optional[PerformanceBound], list[ProofResult]]:
        """Analyze time complexity."""
        proofs = []

        # Check known algorithms first
        if function_name in self.algorithm_complexities:
            known_complexity = self.algorithm_complexities[function_name][ResourceType.TIME]

            # Create proof for known complexity
            prop = self._create_complexity_property(
                f"{function_name}_time_complexity", ResourceType.TIME, known_complexity
            )
            result = self.theorem_prover.verify_property(prop)
            proofs.append(result)

            bound = PerformanceBound(
                resource_type=ResourceType.TIME,
                complexity_class=known_complexity,
                best_case=known_complexity.value,
                average_case=known_complexity.value,
                worst_case=known_complexity.value,
                conditions=[f"Proven for {function_name} algorithm"],
            )
            return bound, proofs

        # Pattern-based analysis
        time_complexity = self._infer_time_complexity_from_patterns(structure, complexity)

        if time_complexity != ComplexityClass.UNKNOWN:
            prop = self._create_complexity_property(
                f"{function_name}_inferred_time_complexity", ResourceType.TIME, time_complexity
            )
            result = self.theorem_prover.verify_property(prop)
            proofs.append(result)

            bound = PerformanceBound(
                resource_type=ResourceType.TIME,
                complexity_class=time_complexity,
                best_case=time_complexity.value,
                average_case=time_complexity.value,
                worst_case=time_complexity.value,
                conditions=["Inferred from code patterns"],
            )
            return bound, proofs

        return None, proofs

    def _analyze_space_complexity(
        self, function_name: str, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> tuple[Optional[PerformanceBound], list[ProofResult]]:
        """Analyze space complexity."""
        proofs = []

        # Check known algorithms first
        if function_name in self.algorithm_complexities:
            known_complexity = self.algorithm_complexities[function_name][ResourceType.SPACE]

            prop = self._create_complexity_property(
                f"{function_name}_space_complexity", ResourceType.SPACE, known_complexity
            )
            result = self.theorem_prover.verify_property(prop)
            proofs.append(result)

            bound = PerformanceBound(
                resource_type=ResourceType.SPACE,
                complexity_class=known_complexity,
                best_case=known_complexity.value,
                average_case=known_complexity.value,
                worst_case=known_complexity.value,
                conditions=[f"Proven for {function_name} algorithm"],
            )
            return bound, proofs

        # Infer space complexity from patterns
        space_complexity = self._infer_space_complexity_from_patterns(structure, complexity)

        if space_complexity != ComplexityClass.UNKNOWN:
            prop = self._create_complexity_property(
                f"{function_name}_inferred_space_complexity", ResourceType.SPACE, space_complexity
            )
            result = self.theorem_prover.verify_property(prop)
            proofs.append(result)

            bound = PerformanceBound(
                resource_type=ResourceType.SPACE,
                complexity_class=space_complexity,
                best_case=space_complexity.value,
                average_case=space_complexity.value,
                worst_case=space_complexity.value,
                conditions=["Inferred from code patterns"],
            )
            return bound, proofs

        return None, proofs

    def _analyze_memory_complexity(
        self, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> tuple[Optional[PerformanceBound], list[ProofResult]]:
        """Analyze memory access complexity."""
        # Simplified memory analysis
        if complexity.array_accesses:
            # Linear memory access pattern
            bound = PerformanceBound(
                resource_type=ResourceType.MEMORY,
                complexity_class=ComplexityClass.LINEAR,
                best_case="O(n)",
                average_case="O(n)",
                worst_case="O(n)",
                conditions=["Based on array access patterns"],
            )
            return bound, []

        return None, []

    def _infer_time_complexity_from_patterns(
        self, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> ComplexityClass:
        """Infer time complexity from code patterns."""
        # Single loop
        if complexity.loop_depth == 1 and complexity.loops:
            loop = complexity.loops[0]
            if loop.get("iterable") == "range":
                return ComplexityClass.LINEAR

        # Nested loops
        if complexity.loop_depth == 2:
            return ComplexityClass.QUADRATIC
        elif complexity.loop_depth == 3:
            return ComplexityClass.CUBIC
        elif complexity.loop_depth > 3:
            return ComplexityClass.POLYNOMIAL

        # Recursive patterns
        if structure.recursive_calls:
            # Simple tail recursion (like factorial)
            if len(structure.recursive_calls) == 1:
                return ComplexityClass.LINEAR
            # Multiple recursive calls (like fibonacci)
            elif len(structure.recursive_calls) > 1:
                return ComplexityClass.EXPONENTIAL

        # No loops or recursion
        if complexity.loop_depth == 0 and not structure.recursive_calls:
            return ComplexityClass.CONSTANT

        return ComplexityClass.UNKNOWN

    def _infer_space_complexity_from_patterns(
        self, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> ComplexityClass:
        """Infer space complexity from code patterns."""
        # Recursive calls create stack frames
        if structure.recursive_calls:
            return ComplexityClass.LINEAR

        # Arrays allocated proportional to input
        if complexity.array_allocations:
            return ComplexityClass.LINEAR

        # Constant space
        return ComplexityClass.CONSTANT

    def _analyze_resource_usage(
        self, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> dict[str, Any]:
        """Analyze detailed resource usage."""
        return {
            "loop_count": len(complexity.loops),
            "max_loop_depth": complexity.loop_depth,
            "recursive_calls": len(structure.recursive_calls),
            "array_accesses": len(complexity.array_accesses),
            "function_calls": len(complexity.function_calls),
            "memory_allocations": len(complexity.array_allocations),
        }

    def _identify_bottlenecks(
        self, structure: "AlgorithmStructureExtractor", complexity: "ComplexityPatternAnalyzer"
    ) -> list[str]:
        """Identify performance bottlenecks."""
        bottlenecks = []

        if complexity.loop_depth >= 2:
            bottlenecks.append(f"Nested loops (depth {complexity.loop_depth}) - quadratic or higher complexity")

        if len(structure.recursive_calls) > 1:
            bottlenecks.append("Multiple recursive calls - potential exponential complexity")

        if len(complexity.array_accesses) > 10:
            bottlenecks.append("High number of array accesses - potential cache misses")

        return bottlenecks

    def _find_optimization_opportunities(
        self,
        structure: "AlgorithmStructureExtractor",
        complexity: "ComplexityPatternAnalyzer",
        bounds: dict[ResourceType, PerformanceBound],
    ) -> list[str]:
        """Find optimization opportunities."""
        opportunities = []

        # Time complexity optimizations
        time_bound = bounds.get(ResourceType.TIME)
        if time_bound:
            if time_bound.complexity_class == ComplexityClass.EXPONENTIAL:
                opportunities.append("Consider memoization to reduce exponential complexity")
            elif time_bound.complexity_class == ComplexityClass.QUADRATIC:
                opportunities.append("Look for divide-and-conquer approach to reduce to O(n log n)")

        # Space complexity optimizations
        space_bound = bounds.get(ResourceType.SPACE)
        if space_bound:
            if space_bound.complexity_class == ComplexityClass.LINEAR and structure.recursive_calls:
                opportunities.append("Consider iterative implementation to reduce stack space")

        # Loop optimizations
        if complexity.loop_depth >= 2:
            opportunities.append("Consider loop unrolling or vectorization for nested loops")

        if len(complexity.array_accesses) > 5:
            opportunities.append("Consider loop tiling for better cache locality")

        return opportunities

    def _create_complexity_property(
        self, name: str, resource_type: ResourceType, complexity_class: ComplexityClass
    ) -> ProofProperty:
        """Create a complexity verification property."""
        if not self.z3_available:
            return ProofProperty(
                name=name,
                property_type=PropertyType.ALGORITHM_CORRECTNESS,
                description=f"{resource_type.value} complexity is {complexity_class.value}",
                z3_formula="mock_formula",
            )

        # Create Z3 variables for complexity analysis
        n = z3.Int("input_size")
        resource_usage = z3.Int(f"{resource_type.value}_usage")

        # Create complexity bound based on class
        complexity_bound: Any
        if complexity_class == ComplexityClass.CONSTANT:
            complexity_bound = resource_usage <= z3.IntVal(1)
        elif complexity_class == ComplexityClass.LINEAR:
            complexity_bound = resource_usage <= n * z3.IntVal(10)
        elif complexity_class == ComplexityClass.QUADRATIC:
            complexity_bound = resource_usage <= n * n * z3.IntVal(10)
        elif complexity_class == ComplexityClass.LOGARITHMIC:
            # Approximate log with linear bound for large n
            complexity_bound = resource_usage <= n
        else:
            # Default bound
            complexity_bound = resource_usage >= 0

        return ProofProperty(
            name=name,
            property_type=PropertyType.ALGORITHM_CORRECTNESS,
            description=f"{resource_type.value} complexity is {complexity_class.value}",
            z3_formula=complexity_bound,
            context={"resource_type": resource_type, "complexity_class": complexity_class},
        )

    def _calculate_performance_confidence(self, proof_results: list[ProofResult]) -> float:
        """Calculate confidence in performance analysis."""
        if not proof_results:
            return 0.8  # Default confidence for pattern-based analysis

        verified_count = sum(1 for result in proof_results if result.is_verified)
        total_count = len(proof_results)

        return verified_count / total_count if total_count > 0 else 0.8

    def _extract_function_name(self, context: AnalysisContext) -> str:
        """Extract function name from context."""
        if context.analysis_result and hasattr(context.analysis_result, "functions"):
            if context.analysis_result.functions:
                return list(context.analysis_result.functions.keys())[0]
        elif hasattr(context.ast_node, "name"):
            name: str = context.ast_node.name
            return name
        return "unknown_function"

__init__(theorem_prover=None)

Initialize the performance analyzer.

Parameters:

Name Type Description Default
theorem_prover Optional[TheoremProver]

Z3 theorem prover instance

None
Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def __init__(self, theorem_prover: Optional[TheoremProver] = None):
    """Initialize the performance analyzer.

    Args:
        theorem_prover: Z3 theorem prover instance
    """
    self.theorem_prover = theorem_prover or TheoremProver()
    self.z3_available = Z3_AVAILABLE

    # Complexity analysis patterns
    self.complexity_patterns = {
        "single_loop": ComplexityClass.LINEAR,
        "nested_loops_2": ComplexityClass.QUADRATIC,
        "nested_loops_3": ComplexityClass.CUBIC,
        "divide_conquer": ComplexityClass.LINEARITHMIC,
        "binary_search": ComplexityClass.LOGARITHMIC,
        "recursive_factorial": ComplexityClass.LINEAR,
        "recursive_fibonacci": ComplexityClass.EXPONENTIAL,
    }

    # Known algorithm complexities
    self.algorithm_complexities = {
        "factorial": {
            ResourceType.TIME: ComplexityClass.LINEAR,
            ResourceType.SPACE: ComplexityClass.LINEAR,  # For recursive version
        },
        "fibonacci": {ResourceType.TIME: ComplexityClass.EXPONENTIAL, ResourceType.SPACE: ComplexityClass.LINEAR},
        "binary_search": {
            ResourceType.TIME: ComplexityClass.LOGARITHMIC,
            ResourceType.SPACE: ComplexityClass.CONSTANT,
        },
        "merge_sort": {ResourceType.TIME: ComplexityClass.LINEARITHMIC, ResourceType.SPACE: ComplexityClass.LINEAR},
        "quicksort": {
            ResourceType.TIME: ComplexityClass.LINEARITHMIC,  # Average case
            ResourceType.SPACE: ComplexityClass.LOGARITHMIC,
        },
    }

analyze_performance_bounds(context)

Analyze performance bounds for a function.

Parameters:

Name Type Description Default
context AnalysisContext

Analysis context with AST and metadata

required

Returns:

Type Description
PerformanceAnalysisResult

PerformanceAnalysisResult with proven bounds

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def analyze_performance_bounds(self, context: AnalysisContext) -> PerformanceAnalysisResult:
    """Analyze performance bounds for a function.

    Args:
        context: Analysis context with AST and metadata

    Returns:
        PerformanceAnalysisResult with proven bounds
    """
    start_time = time.time()

    function_name = self._extract_function_name(context)

    # Extract algorithm structure
    structure_extractor = AlgorithmStructureExtractor()
    structure_extractor.visit(context.ast_node)

    # Analyze complexity patterns
    complexity_analyzer = ComplexityPatternAnalyzer()
    complexity_analyzer.visit(context.ast_node)

    # Determine complexity bounds
    proven_bounds = {}
    complexity_proofs = []

    # Time complexity analysis
    time_bound, time_proofs = self._analyze_time_complexity(function_name, structure_extractor, complexity_analyzer)
    if time_bound:
        proven_bounds[ResourceType.TIME] = time_bound
        complexity_proofs.extend(time_proofs)

    # Space complexity analysis
    space_bound, space_proofs = self._analyze_space_complexity(
        function_name, structure_extractor, complexity_analyzer
    )
    if space_bound:
        proven_bounds[ResourceType.SPACE] = space_bound
        complexity_proofs.extend(space_proofs)

    # Memory access analysis
    memory_bound, memory_proofs = self._analyze_memory_complexity(structure_extractor, complexity_analyzer)
    if memory_bound:
        proven_bounds[ResourceType.MEMORY] = memory_bound
        complexity_proofs.extend(memory_proofs)

    # Resource usage analysis
    resource_usage = self._analyze_resource_usage(structure_extractor, complexity_analyzer)

    # Identify bottlenecks
    bottlenecks = self._identify_bottlenecks(structure_extractor, complexity_analyzer)

    # Find optimization opportunities
    optimization_opportunities = self._find_optimization_opportunities(
        structure_extractor, complexity_analyzer, proven_bounds
    )

    # Calculate confidence
    confidence = self._calculate_performance_confidence(complexity_proofs)

    verification_time = time.time() - start_time

    return PerformanceAnalysisResult(
        function_name=function_name,
        proven_bounds=proven_bounds,
        complexity_proofs=complexity_proofs,
        resource_usage=resource_usage,
        bottlenecks=bottlenecks,
        optimization_opportunities=optimization_opportunities,
        verification_time=verification_time,
        confidence=confidence,
    )

PerformanceBound dataclass

Represents a performance bound for an algorithm.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
@dataclass
class PerformanceBound:
    """Represents a performance bound for an algorithm."""

    resource_type: ResourceType
    complexity_class: ComplexityClass
    best_case: str
    average_case: str
    worst_case: str
    input_size_variable: str = "n"
    constants: Optional[dict[str, float]] = None
    conditions: Optional[list[str]] = None

    def __post_init__(self) -> None:
        """Initialize constants and conditions dictionaries if not provided."""
        if self.constants is None:
            self.constants = {}
        if self.conditions is None:
            self.conditions = []

__post_init__()

Initialize constants and conditions dictionaries if not provided.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
def __post_init__(self) -> None:
    """Initialize constants and conditions dictionaries if not provided."""
    if self.constants is None:
        self.constants = {}
    if self.conditions is None:
        self.conditions = []

ResourceType

Bases: Enum

Types of computational resources.

Source code in src/multigen/frontend/verifiers/performance_analyzer.py
class ResourceType(Enum):
    """Types of computational resources."""

    TIME = "time"
    SPACE = "space"
    MEMORY = "memory"
    STACK_DEPTH = "stack_depth"
    CACHE_MISSES = "cache_misses"
    MEMORY_BANDWIDTH = "memory_bandwidth"

Z3 Formula Generator

multigen.frontend.verifiers.z3_formula_generator

Z3 Formula Generation from Python AST.

Converts Python AST nodes into Z3 formulas for formal verification.

Z3FormulaGenerator

Generates Z3 formulas from Python AST nodes.

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
class Z3FormulaGenerator:
    """Generates Z3 formulas from Python AST nodes."""

    def __init__(self) -> None:
        """Initialize the formula generator."""
        self.z3_available = Z3_AVAILABLE
        self.variables: dict[str, Any] = {}  # name -> z3 variable
        self.constraints: list[Any] = []

    def get_or_create_var(self, name: str, var_type: str = "int") -> Any:
        """Get or create a Z3 variable.

        Args:
            name: Variable name
            var_type: Variable type ("int", "bool", etc.)

        Returns:
            Z3 variable or None if Z3 is not available
        """
        if not self.z3_available:
            return None

        if name not in self.variables:
            if var_type == "int":
                self.variables[name] = z3.Int(name)
            elif var_type == "bool":
                if hasattr(z3, "Bool"):
                    self.variables[name] = z3.Bool(name)
                else:
                    self.variables[name] = z3.Int(name)  # Fallback for mock
            else:
                self.variables[name] = z3.Int(name)  # Default to int
        return self.variables[name]

    def generate_array_bounds_formula(
        self, array_name: str, index_expr: ast.expr, array_len_name: str
    ) -> Optional[Any]:
        """Generate Z3 formula for array bounds safety.

        Args:
            array_name: Name of the array
            index_expr: AST expression for the index
            array_len_name: Name of variable holding array length

        Returns:
            Z3 formula asserting bounds safety
        """
        if not self.z3_available:
            return None

        # Get array length variable
        arr_len = self.get_or_create_var(array_len_name, "int")

        # Convert index expression to Z3
        index_z3 = self._expr_to_z3(index_expr)
        if index_z3 is None:
            return None

        # Generate bounds check: 0 <= index < arr_len
        return z3.And(index_z3 >= 0, index_z3 < arr_len)

    def generate_loop_bounds_formula(
        self, loop_var: str, start: int, end_expr: ast.expr, array_len_name: str
    ) -> Optional[Any]:
        """Generate Z3 formula for loop bounds safety.

        For: for i in range(start, end)
        Prove: all accesses arr[i] are safe

        Args:
            loop_var: Loop variable name (e.g., "i")
            start: Loop start value
            end_expr: AST expression for loop end
            array_len_name: Name of variable holding array length

        Returns:
            Z3 formula asserting all loop iterations are safe
        """
        if not self.z3_available:
            return None

        i = self.get_or_create_var(loop_var, "int")
        arr_len = self.get_or_create_var(array_len_name, "int")

        # Convert end expression to Z3
        end_z3 = self._expr_to_z3(end_expr)
        if end_z3 is None:
            return None

        # For all i in [start, end), prove: start <= i < end => 0 <= i < arr_len
        loop_range = z3.And(i >= start, i < end_z3)
        bounds_safe = z3.And(i >= 0, i < arr_len)

        return z3.ForAll([i], z3.Implies(loop_range, bounds_safe))

    def generate_preconditions(self, array_len_name: str, min_len_expr: ast.expr) -> Optional[Any]:
        """Generate preconditions for array safety.

        Args:
            array_len_name: Name of variable holding array length
            min_len_expr: AST expression for minimum required length

        Returns:
            Z3 formula for preconditions
        """
        if not self.z3_available:
            return None

        arr_len = self.get_or_create_var(array_len_name, "int")
        min_len = self._expr_to_z3(min_len_expr)

        if min_len is None:
            return None

        # Preconditions: arr_len >= min_len and arr_len >= 0
        return z3.And(arr_len >= min_len, arr_len >= 0)

    def _expr_to_z3(self, expr: ast.expr) -> Optional[Any]:
        """Convert Python AST expression to Z3 expression.

        Args:
            expr: Python AST expression node

        Returns:
            Z3 expression or None if conversion fails
        """
        if not self.z3_available:
            return None

        if isinstance(expr, ast.Constant):
            # Constant value
            if isinstance(expr.value, int):
                return expr.value
            return None

        elif isinstance(expr, ast.Name):
            # Variable reference
            return self.get_or_create_var(expr.id, "int")

        elif isinstance(expr, ast.BinOp):
            # Binary operation
            left = self._expr_to_z3(expr.left)
            right = self._expr_to_z3(expr.right)

            if left is None or right is None:
                return None

            if isinstance(expr.op, ast.Add):
                return left + right
            elif isinstance(expr.op, ast.Sub):
                return left - right
            elif isinstance(expr.op, ast.Mult):
                return left * right
            elif isinstance(expr.op, ast.Div):
                # Integer division - use Z3 division if available
                if hasattr(left, "__truediv__"):
                    return left / right
                return None
            else:
                return None

        elif isinstance(expr, ast.Compare):
            # Comparison operation
            if len(expr.ops) != 1 or len(expr.comparators) != 1:
                return None  # Only handle simple comparisons

            left = self._expr_to_z3(expr.left)
            right = self._expr_to_z3(expr.comparators[0])

            if left is None or right is None:
                return None

            op = expr.ops[0]
            if isinstance(op, ast.Lt):
                return left < right
            elif isinstance(op, ast.LtE):
                return left <= right
            elif isinstance(op, ast.Gt):
                return left > right
            elif isinstance(op, ast.GtE):
                return left >= right
            elif isinstance(op, ast.Eq):
                return left == right
            elif isinstance(op, ast.NotEq):
                return left != right
            else:
                return None

        elif isinstance(expr, ast.UnaryOp):
            # Unary operation
            operand = self._expr_to_z3(expr.operand)
            if operand is None:
                return None

            if isinstance(expr.op, ast.USub):
                return -operand
            elif isinstance(expr.op, ast.UAdd):
                return operand
            else:
                return None

        else:
            # Unsupported expression type
            return None

    def add_constraint(self, constraint: Any) -> None:
        """Add a constraint to the formula.

        Args:
            constraint: Z3 constraint
        """
        if constraint is not None:
            self.constraints.append(constraint)

    def get_all_constraints(self) -> list[Any]:
        """Get all accumulated constraints.

        Returns:
            List of Z3 constraints
        """
        return self.constraints

    def clear_constraints(self) -> None:
        """Clear all constraints."""
        self.constraints = []

    def clear_variables(self) -> None:
        """Clear all variables."""
        self.variables = {}

__init__()

Initialize the formula generator.

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def __init__(self) -> None:
    """Initialize the formula generator."""
    self.z3_available = Z3_AVAILABLE
    self.variables: dict[str, Any] = {}  # name -> z3 variable
    self.constraints: list[Any] = []

add_constraint(constraint)

Add a constraint to the formula.

Parameters:

Name Type Description Default
constraint Any

Z3 constraint

required
Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def add_constraint(self, constraint: Any) -> None:
    """Add a constraint to the formula.

    Args:
        constraint: Z3 constraint
    """
    if constraint is not None:
        self.constraints.append(constraint)

clear_constraints()

Clear all constraints.

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def clear_constraints(self) -> None:
    """Clear all constraints."""
    self.constraints = []

clear_variables()

Clear all variables.

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def clear_variables(self) -> None:
    """Clear all variables."""
    self.variables = {}

generate_array_bounds_formula(array_name, index_expr, array_len_name)

Generate Z3 formula for array bounds safety.

Parameters:

Name Type Description Default
array_name str

Name of the array

required
index_expr expr

AST expression for the index

required
array_len_name str

Name of variable holding array length

required

Returns:

Type Description
Optional[Any]

Z3 formula asserting bounds safety

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def generate_array_bounds_formula(
    self, array_name: str, index_expr: ast.expr, array_len_name: str
) -> Optional[Any]:
    """Generate Z3 formula for array bounds safety.

    Args:
        array_name: Name of the array
        index_expr: AST expression for the index
        array_len_name: Name of variable holding array length

    Returns:
        Z3 formula asserting bounds safety
    """
    if not self.z3_available:
        return None

    # Get array length variable
    arr_len = self.get_or_create_var(array_len_name, "int")

    # Convert index expression to Z3
    index_z3 = self._expr_to_z3(index_expr)
    if index_z3 is None:
        return None

    # Generate bounds check: 0 <= index < arr_len
    return z3.And(index_z3 >= 0, index_z3 < arr_len)

generate_loop_bounds_formula(loop_var, start, end_expr, array_len_name)

Generate Z3 formula for loop bounds safety.

For: for i in range(start, end) Prove: all accesses arr[i] are safe

Parameters:

Name Type Description Default
loop_var str

Loop variable name (e.g., "i")

required
start int

Loop start value

required
end_expr expr

AST expression for loop end

required
array_len_name str

Name of variable holding array length

required

Returns:

Type Description
Optional[Any]

Z3 formula asserting all loop iterations are safe

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def generate_loop_bounds_formula(
    self, loop_var: str, start: int, end_expr: ast.expr, array_len_name: str
) -> Optional[Any]:
    """Generate Z3 formula for loop bounds safety.

    For: for i in range(start, end)
    Prove: all accesses arr[i] are safe

    Args:
        loop_var: Loop variable name (e.g., "i")
        start: Loop start value
        end_expr: AST expression for loop end
        array_len_name: Name of variable holding array length

    Returns:
        Z3 formula asserting all loop iterations are safe
    """
    if not self.z3_available:
        return None

    i = self.get_or_create_var(loop_var, "int")
    arr_len = self.get_or_create_var(array_len_name, "int")

    # Convert end expression to Z3
    end_z3 = self._expr_to_z3(end_expr)
    if end_z3 is None:
        return None

    # For all i in [start, end), prove: start <= i < end => 0 <= i < arr_len
    loop_range = z3.And(i >= start, i < end_z3)
    bounds_safe = z3.And(i >= 0, i < arr_len)

    return z3.ForAll([i], z3.Implies(loop_range, bounds_safe))

generate_preconditions(array_len_name, min_len_expr)

Generate preconditions for array safety.

Parameters:

Name Type Description Default
array_len_name str

Name of variable holding array length

required
min_len_expr expr

AST expression for minimum required length

required

Returns:

Type Description
Optional[Any]

Z3 formula for preconditions

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def generate_preconditions(self, array_len_name: str, min_len_expr: ast.expr) -> Optional[Any]:
    """Generate preconditions for array safety.

    Args:
        array_len_name: Name of variable holding array length
        min_len_expr: AST expression for minimum required length

    Returns:
        Z3 formula for preconditions
    """
    if not self.z3_available:
        return None

    arr_len = self.get_or_create_var(array_len_name, "int")
    min_len = self._expr_to_z3(min_len_expr)

    if min_len is None:
        return None

    # Preconditions: arr_len >= min_len and arr_len >= 0
    return z3.And(arr_len >= min_len, arr_len >= 0)

get_all_constraints()

Get all accumulated constraints.

Returns:

Type Description
list[Any]

List of Z3 constraints

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def get_all_constraints(self) -> list[Any]:
    """Get all accumulated constraints.

    Returns:
        List of Z3 constraints
    """
    return self.constraints

get_or_create_var(name, var_type='int')

Get or create a Z3 variable.

Parameters:

Name Type Description Default
name str

Variable name

required
var_type str

Variable type ("int", "bool", etc.)

'int'

Returns:

Type Description
Any

Z3 variable or None if Z3 is not available

Source code in src/multigen/frontend/verifiers/z3_formula_generator.py
def get_or_create_var(self, name: str, var_type: str = "int") -> Any:
    """Get or create a Z3 variable.

    Args:
        name: Variable name
        var_type: Variable type ("int", "bool", etc.)

    Returns:
        Z3 variable or None if Z3 is not available
    """
    if not self.z3_available:
        return None

    if name not in self.variables:
        if var_type == "int":
            self.variables[name] = z3.Int(name)
        elif var_type == "bool":
            if hasattr(z3, "Bool"):
                self.variables[name] = z3.Bool(name)
            else:
                self.variables[name] = z3.Int(name)  # Fallback for mock
        else:
            self.variables[name] = z3.Int(name)  # Default to int
    return self.variables[name]

Example Usage

Verifying array bounds:

from multigen.frontend.verifiers.bounds_prover import BoundsProver
import ast

code = '''
def sum_array(arr: list[int]) -> int:
    total: int = 0
    for i in range(len(arr)):
        total += arr[i]
    return total
'''

tree = ast.parse(code)
prover = BoundsProver()
result = prover.verify_memory_safety(tree.body[0])

if result.success:
    print("Array access is SAFE")
else:
    print("Verification failed:", result.errors)

Using with pipeline:

from multigen.pipeline import MultiGenPipeline, PipelineConfig

config = PipelineConfig(
    target_language="c",
    enable_formal_verification=True,
    strict_verification=True,
)

pipeline = MultiGenPipeline(config=config)
result = pipeline.convert("example.py")

Z3 Availability

Check if Z3 is available:

from multigen.frontend.verifiers.bounds_prover import Z3_AVAILABLE

if Z3_AVAILABLE:
    print("Z3 is available")
else:
    print("Install with: pip install multigen[z3]")

Verification is automatically skipped if Z3 is not installed.

See Also