Skip to content

Pipeline API

The pipeline module provides the main entry point for code conversion.

MultiGenPipeline

multigen.pipeline.MultiGenPipeline

Multi-language Python code generation pipeline.

Source code in src/multigen/pipeline.py
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
class MultiGenPipeline:
    """Multi-language Python code generation pipeline."""

    def __init__(self, config: Optional[PipelineConfig] = None, target_language: str = "c"):
        """Initialize the pipeline with configuration and target language."""
        if config is None:
            config = PipelineConfig(target_language=target_language)
        elif target_language != "c":
            config.target_language = target_language

        self.config = config
        self.log = log.config(self.__class__.__name__)
        self._init_components()

    def _report_progress(self, phase: PipelinePhase, message: str) -> None:
        """Report progress to callback if configured.

        Args:
            phase: Current pipeline phase
            message: Progress message
        """
        if self.config.progress_callback:
            self.config.progress_callback(phase, message)

    def _init_components(self) -> None:
        """Initialize pipeline components."""
        # Get backend for target language
        try:
            self.backend = registry.get_backend(self.config.target_language, self.config.backend_preferences)
            self.emitter = self.backend.get_emitter()
            self.builder = self.backend.get_builder()
            self.factory = self.backend.get_factory()
            self.container_system = self.backend.get_container_system()
        except ValueError as e:
            raise ValueError(f"Unsupported target language '{self.config.target_language}': {e}") from e

        # Initialize frontend analysis components if available
        if FRONTEND_AVAILABLE and self.config.enable_advanced_analysis:
            self.subset_validator = StaticPythonSubsetValidator()
            # Note: constraint_checker deprecated in v0.1.64 (replaced by PythonConstraintChecker + backend-specific checkers)
            self.ast_analyzer = ASTAnalyzer()

            # Advanced static analysis components
            self.static_analyzer = StaticAnalyzer()
            self.symbolic_executor = SymbolicExecutor()
            self.bounds_checker = BoundsChecker()
            self.call_graph_analyzer = CallGraphAnalyzer()

            # Flow-sensitive type inference
            self.type_inference_engine = TypeInferenceEngine(enable_flow_sensitive=True)

            if self.config.enable_optimizations:
                self.compile_time_evaluator = CompileTimeEvaluator()
                self.loop_analyzer = LoopAnalyzer()
                self.function_specializer = FunctionSpecializer()
                self.vectorization_detector = VectorizationDetector()

            # Formal verification components (optional, requires Z3)
            if self.config.enable_formal_verification:
                if not Z3_AVAILABLE:
                    self.log.warning(
                        "Formal verification requested but Z3 not available. Install with: pip install multigen[z3]"
                    )
                    if self.config.strict_verification:
                        self.log.error(
                            "Strict verification mode requires Z3. "
                            "Code generation will fail without formal verification."
                        )
                    self.bounds_prover = None
                    self.correctness_prover = None
                    self.theorem_prover = None
                else:
                    mode = "strict mode" if self.config.strict_verification else "warning mode"
                    self.log.info(f"Formal verification enabled in {mode} (Z3 available)")
                    self.bounds_prover = BoundsProver()
                    self.correctness_prover = CorrectnessProver()
                    self.theorem_prover = TheoremProver()
            else:
                if self.config.strict_verification:
                    self.log.warning(
                        "Strict verification mode enabled but formal verification is disabled. "
                        "Set enable_formal_verification=True to use strict mode."
                    )
                self.bounds_prover = None
                self.correctness_prover = None
                self.theorem_prover = None
        else:
            self.log.debug("Using simplified analysis pipeline")

    def convert(self, input_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None) -> PipelineResult:
        """Convert Python module through complete pipeline.

        Args:
            input_path: Path to Python file or module
            output_path: Output directory or file path

        Returns:
            PipelineResult with all outputs and metadata
        """
        input_path = Path(input_path)
        if not input_path.exists():
            self.log.error(f"Input file not found: {input_path}")
            return PipelineResult(
                success=False,
                input_file=str(input_path),
                output_files={},
                target_language=self.config.target_language,
                errors=[f"Input file not found: {input_path}"],
            )

        # Determine output directory
        if output_path:
            output_dir = Path(output_path)
        elif self.config.output_dir:
            output_dir = Path(self.config.output_dir)
        else:
            # Default to build/src structure
            output_dir = Path("build") / "src"

        output_dir.mkdir(parents=True, exist_ok=True)

        result = PipelineResult(
            success=True, input_file=str(input_path), output_files={}, target_language=self.config.target_language
        )

        try:
            self.log.info(f"Starting pipeline conversion for: {input_path} -> {self.config.target_language}")

            # Read input file
            source_code = input_path.read_text()

            # Phase 1: Validation
            self.log.debug("Starting validation phase")
            self._report_progress(PipelinePhase.VALIDATION, "Validating Python code")
            if not self._validation_phase(source_code, input_path, result):
                self.log.error("Validation phase failed")
                return result

            # Phase 2: Analysis
            self.log.debug("Starting analysis phase")
            self._report_progress(PipelinePhase.ANALYSIS, "Analyzing AST and types")
            analysis_result = self._analysis_phase(source_code, result)
            if analysis_result is None:
                self.log.error("Analysis phase failed")
                return result

            # Phase 3: Python Optimization
            self.log.debug("Starting Python optimization phase")
            self._report_progress(PipelinePhase.PYTHON_OPTIMIZATION, "Optimizing Python IR")
            optimized_analysis = self._python_optimization_phase(source_code, analysis_result, result)

            # Phase 4: Mapping (language-agnostic to target-specific)
            self.log.debug("Starting mapping phase")
            self._report_progress(PipelinePhase.MAPPING, f"Mapping to {self.config.target_language.upper()}")
            mapped_result = self._mapping_phase(optimized_analysis, result)

            # Phase 5: Target Optimization
            self.log.debug("Starting target optimization phase")
            self._report_progress(
                PipelinePhase.TARGET_OPTIMIZATION, f"Optimizing {self.config.target_language.upper()} code"
            )
            target_optimized = self._target_optimization_phase(mapped_result, result)

            # Phase 6: Generation
            self.log.debug("Starting generation phase")
            self._report_progress(PipelinePhase.GENERATION, f"Generating {self.config.target_language.upper()} source")
            if not self._generation_phase(source_code, target_optimized, output_dir, result):
                self.log.error("Generation phase failed")
                return result

            # Phase 7: Build
            if self.config.build_mode != BuildMode.NONE:
                self.log.debug(f"Starting build phase with mode: {self.config.build_mode}")
                build_msg = "Compiling" if self.config.build_mode == BuildMode.DIRECT else "Generating build file"
                self._report_progress(PipelinePhase.BUILD, build_msg)
                if not self._build_phase(output_dir, result):
                    self.log.error("Build phase failed")
                    return result

            self.log.info(f"Pipeline conversion completed successfully for: {input_path}")
            return result

        except Exception as e:
            self.log.error(f"Pipeline error: {str(e)}")
            result.success = False
            result.errors.append(f"Pipeline error: {str(e)}")
            return result

    def _validation_phase(self, source_code: str, input_path: Path, result: PipelineResult) -> bool:
        """Phase 1: Validate static-python style and translatability."""
        # Track validation state for typed result
        memory_safety_errors: list[str] = []
        memory_safety_warnings: list[str] = []
        memory_safety_checked = False
        violations: list[str] = []
        warnings: list[str] = []

        try:
            # Parse AST for validation
            ast.parse(source_code)

            if FRONTEND_AVAILABLE and self.config.enable_advanced_analysis:
                # Validate Python subset compatibility
                frontend_validation = self.subset_validator.validate_code(source_code)

                if not frontend_validation.is_valid:
                    violations = [str(v) for v in frontend_validation.violations]
                    result.success = False
                    result.errors.extend(violations)
                    result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                        is_valid=False,
                        parse_success=True,
                        violations=violations,
                    )
                    return False

                # Check memory safety constraints for C/C++ targets
                if MEMORY_SAFETY_AVAILABLE and self.config.target_language in ["c", "cpp"]:
                    memory_safety_checked = True
                    memory_safety_checker = MemorySafetyChecker(language=self.config.target_language)
                    memory_check_results = memory_safety_checker.check_code(source_code)

                    # Add memory safety warnings/errors
                    for warning in memory_check_results:
                        msg = f"[{warning.violation_type.value}] {warning.message} (line {warning.line})"
                        if warning.severity == "error":
                            memory_safety_errors.append(msg)
                            result.errors.append(msg)
                        else:
                            memory_safety_warnings.append(msg)
                            result.warnings.append(msg)

                    # Fail if there are critical memory safety errors
                    if memory_safety_errors:
                        result.success = False
                        result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                            is_valid=False,
                            parse_success=True,
                            memory_safety_checked=True,
                            memory_safety_errors=memory_safety_errors,
                            memory_safety_warnings=memory_safety_warnings,
                        )
                        return False

                # Run formal verification if enabled
                if self.config.enable_formal_verification and self.bounds_prover is not None:
                    self.log.info("Running formal verification with Z3...")

                    # Parse AST for verification
                    tree = ast.parse(source_code)

                    # Verify each function
                    for node in ast.walk(tree):
                        if isinstance(node, ast.FunctionDef):
                            # Create analysis context for verifier
                            from .frontend.base import AnalysisLevel

                            context = AnalysisContext(
                                ast_node=node,
                                source_code=source_code,
                                analysis_level=AnalysisLevel.BASIC,
                                analysis_result=None,
                            )

                            # Run bounds verification
                            proof = self.bounds_prover.verify_memory_safety(context)

                            # Report verification results
                            if not proof.is_safe:
                                for unsafe_access in proof.unsafe_accesses:
                                    error_msg = (
                                        f"[FORMAL_VERIFICATION] Potential unsafe memory access "
                                        f"in '{proof.function_name}' at line {unsafe_access.line_number}"
                                    )
                                    if self.config.strict_verification:
                                        result.errors.append(error_msg)
                                    else:
                                        result.warnings.append(error_msg)

                                # In strict mode, halt on first unsafe function
                                if self.config.strict_verification:
                                    result.success = False
                                    result.errors.append(
                                        f"[FORMAL_VERIFICATION] Code generation halted due to verification failures in '{proof.function_name}'. "
                                        f"Fix unsafe memory accesses or disable strict_verification mode."
                                    )
                                    self.log.error(f"Verification failed in strict mode: {proof.summary}")
                                    return False

                            # Add verification recommendations
                            for recommendation in proof.recommendations:
                                if self.config.strict_verification and not proof.is_safe:
                                    result.errors.append(f"[FORMAL_VERIFICATION] {recommendation}")
                                else:
                                    result.warnings.append(f"[FORMAL_VERIFICATION] {recommendation}")

                            self.log.debug(f"Verification complete: {proof.summary}")

                # Success with advanced analysis
                result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                    is_valid=True,
                    parse_success=True,
                    memory_safety_checked=memory_safety_checked,
                    memory_safety_errors=memory_safety_errors,
                    memory_safety_warnings=memory_safety_warnings,
                    warnings=warnings,
                )
            else:
                # Basic validation - just check if it parses
                result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                    is_valid=True,
                    parse_success=True,
                )

            return True

        except SyntaxError as e:
            result.success = False
            result.errors.append(f"Syntax error: {str(e)}")
            result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                is_valid=False,
                parse_success=False,
                violations=[f"Syntax error: {str(e)}"],
            )
            return False
        except Exception as e:
            result.success = False
            result.errors.append(f"Validation phase error: {str(e)}")
            result.phase_results[PipelinePhase.VALIDATION] = ValidationPhaseResult(
                is_valid=False,
                parse_success=False,
                violations=[f"Validation phase error: {str(e)}"],
            )
            return False

    def _analysis_phase(self, source_code: str, result: PipelineResult) -> Optional[Any]:
        """Phase 2: AST parsing and semantic element breakdown with advanced static analysis."""
        try:
            if FRONTEND_AVAILABLE and self.config.enable_advanced_analysis:
                # Use comprehensive AST analysis
                analysis_result = self.ast_analyzer.analyze(source_code)

                if not analysis_result.convertible:
                    result.success = False
                    result.errors.extend(analysis_result.errors)
                    result.warnings.extend(analysis_result.warnings)
                    result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                        success=False,
                        convertible=False,
                        errors=list(analysis_result.errors),
                        warnings=list(analysis_result.warnings),
                    )
                    return None

                # Run advanced static analysis
                advanced_analysis: dict[str, Any] = {}

                # Parse AST for advanced analysis
                ast_root = ast.parse(source_code)

                # Create AnalysisContext for advanced analyzers
                analysis_context = AnalysisContext(
                    source_code=source_code,
                    ast_node=ast_root,
                    analysis_result=analysis_result,
                    analysis_level=AnalysisLevel.INTERMEDIATE,
                    optimization_level=FrontendOptimizationLevel.MODERATE,
                )

                # Static analysis (control flow, data flow)
                static_report = self.static_analyzer.analyze(analysis_context)
                advanced_analysis["static_analysis"] = static_report

                # Symbolic execution
                symbolic_report = self.symbolic_executor.analyze(analysis_context)
                advanced_analysis["symbolic_execution"] = symbolic_report

                # Bounds checking
                bounds_report = self.bounds_checker.analyze(analysis_context)
                advanced_analysis["bounds_checking"] = bounds_report

                # Call graph analysis
                call_graph_report = self.call_graph_analyzer.analyze(analysis_context)
                advanced_analysis["call_graph"] = call_graph_report

                # Flow-sensitive type inference
                type_inference_results: dict[str, dict[str, Any]] = {}
                for node in ast.walk(ast_root):
                    if isinstance(node, ast.FunctionDef):
                        func_inference = self.type_inference_engine.analyze_function_signature_enhanced(node)
                        type_inference_results[node.name] = func_inference
                        self.log.debug(f"Type inference completed for function: {node.name}")

                advanced_analysis["type_inference"] = type_inference_results

                # Immutability analysis
                immutability_analyzer = ImmutabilityAnalyzer()
                immutability_results = immutability_analyzer.analyze_module(ast_root)
                advanced_analysis["immutability"] = immutability_results
                self.log.debug(f"Immutability analysis completed for {len(immutability_results)} functions")

                # Python constraint checking (uses immutability results)
                python_constraint_checker = PythonConstraintChecker(immutability_results=immutability_results)
                constraint_violations = python_constraint_checker.check_code(source_code)
                advanced_analysis["python_constraints"] = constraint_violations

                # Add warnings/errors from constraint violations
                analysis_errors: list[str] = []
                analysis_warnings: list[str] = []
                for violation in constraint_violations:
                    if violation.severity == "error":
                        msg = f"[{violation.rule_id}] {violation.message} (line {violation.line})"
                        analysis_errors.append(msg)
                        result.errors.append(msg)
                    else:
                        msg = f"[{violation.rule_id}] {violation.message} (line {violation.line})"
                        analysis_warnings.append(msg)
                        result.warnings.append(msg)

                # Fail if there are critical constraint violations
                if analysis_errors:
                    result.success = False
                    self.log.error(f"Found {len(analysis_errors)} critical constraint violations")
                    result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                        success=False,
                        convertible=True,
                        errors=analysis_errors,
                        warnings=analysis_warnings,
                        advanced_analysis=advanced_analysis,
                    )
                    return None

                # Extract counts from analysis result if available
                function_count = len(getattr(analysis_result, "functions", {}))
                imports = list(getattr(analysis_result, "imports", []))

                # Store typed analysis result
                result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                    success=True,
                    convertible=True,
                    function_count=function_count,
                    imports=imports,
                    errors=analysis_errors,
                    warnings=analysis_warnings,
                    advanced_analysis=advanced_analysis,
                )

                return analysis_result
            else:
                # Simple analysis using built-in frontend function
                try:
                    simple_analysis = analyze_python_code(source_code)

                    # Create a simple analysis result object
                    class SimpleAnalysisResult:
                        def __init__(self, code: str, analysis: Any) -> None:
                            self.source_code = code
                            self.ast_root = ast.parse(code)
                            self.analysis = analysis
                            self.convertible = analysis.convertible
                            self.errors = analysis.errors if hasattr(analysis, "errors") else []
                            self.warnings = analysis.warnings if hasattr(analysis, "warnings") else []

                    result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                        success=True,
                        convertible=simple_analysis.convertible,
                    )
                    return SimpleAnalysisResult(source_code, simple_analysis)
                except Exception:
                    # Fallback to basic analysis if simplified result creation fails
                    basic_result = type(
                        "BasicAnalysis",
                        (),
                        {
                            "source_code": source_code,
                            "ast_root": ast.parse(source_code),
                            "convertible": True,
                            "errors": [],
                            "warnings": [],
                        },
                    )()
                    result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                        success=True,
                        convertible=True,
                    )
                    return basic_result

        except Exception as e:
            result.success = False
            result.errors.append(f"Analysis phase error: {str(e)}")
            result.phase_results[PipelinePhase.ANALYSIS] = AnalysisPhaseResult(
                success=False,
                convertible=False,
                errors=[f"Analysis phase error: {str(e)}"],
            )
            return None

    def _python_optimization_phase(self, source_code: str, analysis_result: Any, result: PipelineResult) -> Any:
        """Phase 3: Python-level optimizations."""
        try:
            if FRONTEND_AVAILABLE and self.config.enable_optimizations:
                # Create analysis context
                context = AnalysisContext(
                    source_code=source_code,
                    ast_node=ast.parse(source_code),
                    analysis_result=analysis_result,
                    optimization_level=_map_optimization_level(self.config.optimization_level),
                )

                optimizations = {}

                # Compile-time evaluation
                compile_time_result = self.compile_time_evaluator.optimize(context)
                optimizations["compile_time"] = compile_time_result

                # Loop analysis
                loop_result = self.loop_analyzer.optimize(context)
                optimizations["loops"] = loop_result

                # Function specialization
                specialization_result = self.function_specializer.optimize(context)
                optimizations["specialization"] = specialization_result

                # Vectorization detection
                vectorization_result = self.vectorization_detector.optimize(context)
                optimizations["vectorization"] = vectorization_result

                result.phase_results[PipelinePhase.PYTHON_OPTIMIZATION] = PythonOptimizationPhaseResult(
                    enabled=True,
                    optimizations_applied=list(optimizations.keys()),
                    compile_time=compile_time_result,
                    loops=loop_result,
                    specialization=specialization_result,
                    vectorization=vectorization_result,
                )
            else:
                result.phase_results[PipelinePhase.PYTHON_OPTIMIZATION] = PythonOptimizationPhaseResult(
                    enabled=False,
                )

            return analysis_result  # Return analysis for next phase

        except Exception as e:
            result.warnings.append(f"Python optimization phase warning: {str(e)}")
            return analysis_result  # Continue with unoptimized version

    def _mapping_phase(self, analysis_result: Any, result: PipelineResult) -> Any:
        """Phase 4: Map Python semantics to target language semantics.

        This phase transforms Python type information into target language types,
        computing container mappings, function return types, and variable types
        using the backend's type system and container system.
        """
        try:
            # Create semantic mapping structure
            semantic_mapping = SemanticMapping(target_language=self.config.target_language)

            # Standard Python type mappings using backend's emitter
            python_types = ["int", "float", "bool", "str", "None", "Any"]
            for py_type in python_types:
                try:
                    target_type = self.emitter.map_python_type(py_type)
                    semantic_mapping.type_mappings[py_type] = target_type
                except Exception:
                    # Fallback for types the backend doesn't explicitly handle
                    semantic_mapping.type_mappings[py_type] = py_type

            # Extract type information from analysis result if available
            if analysis_result is not None:
                self._extract_function_types(analysis_result, semantic_mapping)
                self._extract_variable_types(analysis_result, semantic_mapping, result)
                self._extract_container_types(analysis_result, semantic_mapping, result)

            # Get required imports from container system
            try:
                semantic_mapping.required_imports = self.container_system.get_required_imports()
            except Exception:
                pass  # Container system may not have imports

            # Add backend-specific semantic notes
            semantic_mapping.semantic_notes["backend"] = self.backend.get_name()
            semantic_mapping.semantic_notes["extension"] = self.backend.get_file_extension()

            # Store mapping in result
            result.semantic_mapping = semantic_mapping
            result.phase_results[PipelinePhase.MAPPING] = {
                "target_language": self.config.target_language,
                "type_mappings": semantic_mapping.type_mappings,
                "container_mappings": {
                    k: {"python_type": v.python_type, "target_type": v.target_type}
                    for k, v in semantic_mapping.container_mappings.items()
                },
                "function_return_types": semantic_mapping.function_return_types,
                "variable_count": len(semantic_mapping.variable_types),
                "required_imports": semantic_mapping.required_imports,
            }

            self.log.debug(
                f"Phase 4 mapping complete: {len(semantic_mapping.type_mappings)} types, "
                f"{len(semantic_mapping.container_mappings)} containers, "
                f"{len(semantic_mapping.function_return_types)} functions"
            )

            return analysis_result

        except Exception as e:
            result.warnings.append(f"Mapping phase warning: {str(e)}")
            return analysis_result

    def _extract_function_types(self, analysis_result: Any, semantic_mapping: SemanticMapping) -> None:
        """Extract function return types from analysis result."""
        # Try to get function information from analysis result
        if hasattr(analysis_result, "functions"):
            for func_name, func_info in analysis_result.functions.items():
                if hasattr(func_info, "return_type") and func_info.return_type:
                    py_return_type = str(func_info.return_type)
                    try:
                        target_return_type = self.emitter.map_python_type(py_return_type)
                        semantic_mapping.function_return_types[func_name] = target_return_type
                    except Exception:
                        semantic_mapping.function_return_types[func_name] = py_return_type

    def _extract_variable_types(
        self, analysis_result: Any, semantic_mapping: SemanticMapping, result: PipelineResult
    ) -> None:
        """Extract variable types from analysis result."""
        # Try type inference results from advanced analysis
        analysis_phase = result.phase_results.get(PipelinePhase.ANALYSIS)
        if isinstance(analysis_phase, AnalysisPhaseResult) and analysis_phase.advanced_analysis:
            type_inference = analysis_phase.advanced_analysis.get("type_inference", {})
        else:
            type_inference = {}

        for func_name, func_inference in type_inference.items():
            if isinstance(func_inference, dict):
                # Extract parameter types
                param_types = func_inference.get("parameter_types", {})
                for param_name, param_type in param_types.items():
                    var_key = f"{func_name}.{param_name}"
                    try:
                        target_type = self.emitter.map_python_type(str(param_type))
                        semantic_mapping.variable_types[var_key] = target_type
                    except Exception:
                        semantic_mapping.variable_types[var_key] = str(param_type)

    def _extract_container_types(
        self, analysis_result: Any, semantic_mapping: SemanticMapping, result: PipelineResult
    ) -> None:
        """Extract container type mappings from analysis result."""
        # Common container patterns to look for
        container_patterns = {
            "list[int]": ("int",),
            "list[str]": ("str",),
            "list[float]": ("float",),
            "dict[str, int]": ("str", "int"),
            "dict[str, str]": ("str", "str"),
            "dict[int, int]": ("int", "int"),
            "set[int]": ("int",),
            "set[str]": ("str",),
        }

        # Map containers using the backend's container system
        for py_type, element_types in container_patterns.items():
            try:
                if py_type.startswith("list["):
                    elem_type = self.emitter.map_python_type(element_types[0])
                    target_type = self.container_system.get_list_type(elem_type)
                elif py_type.startswith("dict["):
                    key_type = self.emitter.map_python_type(element_types[0])
                    val_type = self.emitter.map_python_type(element_types[1])
                    target_type = self.container_system.get_dict_type(key_type, val_type)
                elif py_type.startswith("set["):
                    elem_type = self.emitter.map_python_type(element_types[0])
                    target_type = self.container_system.get_set_type(elem_type)
                else:
                    continue

                semantic_mapping.container_mappings[py_type] = ContainerMapping(
                    python_type=py_type,
                    target_type=target_type,
                    element_types=[self.emitter.map_python_type(t) for t in element_types],
                )
            except Exception:
                # Backend may not support all container types
                pass

    def _target_optimization_phase(self, analysis_result: Any, result: PipelineResult) -> Any:
        """Phase 5: Target language-specific optimizations."""
        try:
            # Get optimization level as integer
            opt_level = self._get_opt_level_int()

            # Get optimizer from backend if available
            optimizer = self.backend.get_optimizer()

            if optimizer is not None:
                # Backend has optimizer support
                opt_info = optimizer.get_optimization_info()
                phase_result = TargetOptimizationPhaseResult(
                    target_language=self.config.target_language,
                    optimization_level=opt_level,
                    optimizer_available=True,
                    optimizations_applied=opt_info.passes_applied,
                    optimizer_info=opt_info,
                )
                self.log.debug(
                    f"Target optimization: {self.config.target_language} with "
                    f"{opt_info.level_name}, {len(opt_info.passes_applied)} passes"
                )
            else:
                # No optimizer available for this backend
                phase_result = TargetOptimizationPhaseResult(
                    target_language=self.config.target_language,
                    optimization_level=opt_level,
                    optimizer_available=False,
                    optimizations_applied=[],
                    optimizer_info=None,
                )

            result.phase_results[PipelinePhase.TARGET_OPTIMIZATION] = phase_result
            return analysis_result

        except Exception as e:
            result.warnings.append(f"Target optimization phase warning: {str(e)}")
            return analysis_result

    def _get_opt_level_int(self) -> int:
        """Convert OptimizationLevel enum to integer (0-3).

        Returns:
            Integer optimization level: 0=none, 1=basic, 2=moderate, 3=aggressive
        """
        opt_level_map = {
            OptimizationLevel.NONE: 0,
            OptimizationLevel.BASIC: 1,
            OptimizationLevel.MODERATE: 2,
            OptimizationLevel.AGGRESSIVE: 3,
        }
        return opt_level_map.get(self.config.optimization_level, 2)

    def _generation_phase(
        self, source_code: str, analysis_result: Any, output_dir: Path, result: PipelineResult
    ) -> bool:
        """Phase 6: Target language code generation."""
        try:
            # Generate code using selected backend, passing semantic mapping from Phase 4
            generated_code = self.emitter.emit_module(
                source_code, analysis_result, semantic_mapping=result.semantic_mapping
            )

            # Write source file with correct extension
            file_extension = self.backend.get_file_extension()
            source_file_path = output_dir / (Path(result.input_file).stem + file_extension)
            source_file_path.write_text(generated_code)

            result.generated_code = generated_code
            result.output_files[f"{self.config.target_language}_source"] = str(source_file_path)
            result.generated_files.append(str(source_file_path))
            result.phase_results[PipelinePhase.GENERATION] = GenerationPhaseResult(
                success=True,
                source_file=str(source_file_path),
                backend_name=self.backend.get_name(),
                file_extension=file_extension,
                generated_lines=len(generated_code.splitlines()),
            )
            return True

        except Exception as e:
            result.success = False
            result.errors.append(f"Generation phase error: {str(e)}")
            return False

    def _build_phase(self, output_dir: Path, result: PipelineResult) -> bool:
        """Phase 7: Build executable or generate build file."""
        try:
            source_key = f"{self.config.target_language}_source"
            source_file = result.output_files.get(source_key)
            if not source_file:
                result.errors.append("No source file available for build phase")
                return False

            source_file_path = Path(source_file)

            if self.config.build_mode == BuildMode.MAKEFILE:
                # Generate build file using backend
                build_content = self.builder.generate_build_file([str(source_file_path)], source_file_path.stem)
                build_file_path = output_dir / self.builder.get_build_filename()
                build_file_path.write_text(build_content)

                result.build_file_content = build_content
                result.output_files["build_file"] = str(build_file_path)

            elif self.config.build_mode == BuildMode.DIRECT:
                # Direct compilation using backend
                # Pass optimization level to builder (LLVM backend uses this)
                opt_level = self._get_opt_level_int()

                # Check if builder supports opt_level parameter (LLVM does)
                import inspect

                sig = inspect.signature(self.builder.compile_direct)
                if "opt_level" in sig.parameters:
                    success = self.builder.compile_direct(str(source_file_path), str(output_dir), opt_level=opt_level)
                else:
                    success = self.builder.compile_direct(str(source_file_path), str(output_dir))

                if success:
                    executable_path = output_dir / source_file_path.stem
                    result.executable_path = str(executable_path)
                    result.output_files["executable"] = str(executable_path)
                else:
                    result.errors.append("Direct compilation failed")
                    return False

            result.phase_results[PipelinePhase.BUILD] = BuildPhaseResult(
                success=True,
                mode=self.config.build_mode.value,
                outputs=dict(result.output_files),
            )

            return True

        except Exception as e:
            result.success = False
            result.errors.append(f"Build phase error: {str(e)}")
            return False

__init__(config=None, target_language='c')

Initialize the pipeline with configuration and target language.

Source code in src/multigen/pipeline.py
def __init__(self, config: Optional[PipelineConfig] = None, target_language: str = "c"):
    """Initialize the pipeline with configuration and target language."""
    if config is None:
        config = PipelineConfig(target_language=target_language)
    elif target_language != "c":
        config.target_language = target_language

    self.config = config
    self.log = log.config(self.__class__.__name__)
    self._init_components()

convert(input_path, output_path=None)

Convert Python module through complete pipeline.

Parameters:

Name Type Description Default
input_path Union[str, Path]

Path to Python file or module

required
output_path Optional[Union[str, Path]]

Output directory or file path

None

Returns:

Type Description
PipelineResult

PipelineResult with all outputs and metadata

Source code in src/multigen/pipeline.py
def convert(self, input_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None) -> PipelineResult:
    """Convert Python module through complete pipeline.

    Args:
        input_path: Path to Python file or module
        output_path: Output directory or file path

    Returns:
        PipelineResult with all outputs and metadata
    """
    input_path = Path(input_path)
    if not input_path.exists():
        self.log.error(f"Input file not found: {input_path}")
        return PipelineResult(
            success=False,
            input_file=str(input_path),
            output_files={},
            target_language=self.config.target_language,
            errors=[f"Input file not found: {input_path}"],
        )

    # Determine output directory
    if output_path:
        output_dir = Path(output_path)
    elif self.config.output_dir:
        output_dir = Path(self.config.output_dir)
    else:
        # Default to build/src structure
        output_dir = Path("build") / "src"

    output_dir.mkdir(parents=True, exist_ok=True)

    result = PipelineResult(
        success=True, input_file=str(input_path), output_files={}, target_language=self.config.target_language
    )

    try:
        self.log.info(f"Starting pipeline conversion for: {input_path} -> {self.config.target_language}")

        # Read input file
        source_code = input_path.read_text()

        # Phase 1: Validation
        self.log.debug("Starting validation phase")
        self._report_progress(PipelinePhase.VALIDATION, "Validating Python code")
        if not self._validation_phase(source_code, input_path, result):
            self.log.error("Validation phase failed")
            return result

        # Phase 2: Analysis
        self.log.debug("Starting analysis phase")
        self._report_progress(PipelinePhase.ANALYSIS, "Analyzing AST and types")
        analysis_result = self._analysis_phase(source_code, result)
        if analysis_result is None:
            self.log.error("Analysis phase failed")
            return result

        # Phase 3: Python Optimization
        self.log.debug("Starting Python optimization phase")
        self._report_progress(PipelinePhase.PYTHON_OPTIMIZATION, "Optimizing Python IR")
        optimized_analysis = self._python_optimization_phase(source_code, analysis_result, result)

        # Phase 4: Mapping (language-agnostic to target-specific)
        self.log.debug("Starting mapping phase")
        self._report_progress(PipelinePhase.MAPPING, f"Mapping to {self.config.target_language.upper()}")
        mapped_result = self._mapping_phase(optimized_analysis, result)

        # Phase 5: Target Optimization
        self.log.debug("Starting target optimization phase")
        self._report_progress(
            PipelinePhase.TARGET_OPTIMIZATION, f"Optimizing {self.config.target_language.upper()} code"
        )
        target_optimized = self._target_optimization_phase(mapped_result, result)

        # Phase 6: Generation
        self.log.debug("Starting generation phase")
        self._report_progress(PipelinePhase.GENERATION, f"Generating {self.config.target_language.upper()} source")
        if not self._generation_phase(source_code, target_optimized, output_dir, result):
            self.log.error("Generation phase failed")
            return result

        # Phase 7: Build
        if self.config.build_mode != BuildMode.NONE:
            self.log.debug(f"Starting build phase with mode: {self.config.build_mode}")
            build_msg = "Compiling" if self.config.build_mode == BuildMode.DIRECT else "Generating build file"
            self._report_progress(PipelinePhase.BUILD, build_msg)
            if not self._build_phase(output_dir, result):
                self.log.error("Build phase failed")
                return result

        self.log.info(f"Pipeline conversion completed successfully for: {input_path}")
        return result

    except Exception as e:
        self.log.error(f"Pipeline error: {str(e)}")
        result.success = False
        result.errors.append(f"Pipeline error: {str(e)}")
        return result

PipelineConfig

multigen.pipeline.PipelineConfig dataclass

Configuration for the MultiGen pipeline.

Source code in src/multigen/pipeline.py
@dataclass
class PipelineConfig:
    """Configuration for the MultiGen pipeline."""

    optimization_level: OptimizationLevel = OptimizationLevel.MODERATE
    target_language: str = "c"
    output_dir: Optional[str] = None
    build_mode: BuildMode = BuildMode.NONE
    compiler: Optional[str] = None
    compiler_flags: Optional[list[str]] = None
    include_dirs: Optional[list[str]] = None
    libraries: Optional[list[str]] = None
    enable_advanced_analysis: bool = True
    enable_optimizations: bool = True
    enable_formal_verification: bool = False  # Z3-based formal verification (optional)
    strict_verification: bool = (
        False  # Halt code generation on verification failures (requires enable_formal_verification)
    )
    backend_preferences: Optional[BackendPreferences] = None
    progress_callback: Optional[Callable[[PipelinePhase, str], None]] = None

    def __post_init__(self) -> None:
        """Initialize default values."""
        if self.compiler_flags is None:
            self.compiler_flags = []
        if self.include_dirs is None:
            self.include_dirs = []
        if self.libraries is None:
            self.libraries = []
        if self.compiler is None:
            # Set default compiler based on target language
            compiler_defaults = {"c": "gcc", "rust": "rustc", "go": "go", "cpp": "g++"}
            self.compiler = compiler_defaults.get(self.target_language, "gcc")

__post_init__()

Initialize default values.

Source code in src/multigen/pipeline.py
def __post_init__(self) -> None:
    """Initialize default values."""
    if self.compiler_flags is None:
        self.compiler_flags = []
    if self.include_dirs is None:
        self.include_dirs = []
    if self.libraries is None:
        self.libraries = []
    if self.compiler is None:
        # Set default compiler based on target language
        compiler_defaults = {"c": "gcc", "rust": "rustc", "go": "go", "cpp": "g++"}
        self.compiler = compiler_defaults.get(self.target_language, "gcc")

PipelineResult

multigen.pipeline.PipelineResult dataclass

Result from pipeline execution.

Source code in src/multigen/pipeline.py
@dataclass
class PipelineResult:
    """Result from pipeline execution."""

    success: bool
    input_file: str
    output_files: dict[str, str]  # file_type -> file_path
    target_language: str = "c"
    generated_code: Optional[str] = None
    build_file_content: Optional[str] = None
    executable_path: Optional[str] = None
    phase_results: dict[PipelinePhase, Any] = field(default_factory=dict)
    errors: list[str] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)
    generated_files: list[str] = field(default_factory=list)
    # Semantic mapping from Phase 4 (optional, for advanced usage)
    semantic_mapping: Optional[SemanticMapping] = None

Example Usage

Basic conversion:

from multigen.pipeline import MultiGenPipeline, PipelineConfig

config = PipelineConfig(target_language="c")
pipeline = MultiGenPipeline(config=config)
result = pipeline.convert("example.py")

if result.success:
    print("Conversion successful")
    for path in result.output_files:
        print(f"Generated: {path}")
else:
    print("Errors:", result.errors)

With verification:

config = PipelineConfig(
    target_language="rust",
    enable_formal_verification=True,
    strict_verification=True,
)
pipeline = MultiGenPipeline(config=config)
result = pipeline.convert("example.py")

Pipeline Phases

The pipeline executes in 7 phases:

  1. Validation: Parse and validate Python AST
  2. Analysis: Analyze types, dependencies, control flow
  3. Python Optimization: Optimize Python AST
  4. Mapping: Map Python constructs to target language
  5. Target Optimization: Optimize target language constructs
  6. Generation: Generate target language code
  7. Build: Optionally compile/build the generated code

See Also