Skip to content

Pipeline API

The pipeline module provides the main entry points for RWA calculation. See pipeline.py for the full implementation.

Module: rwa_calc.engine.pipeline

create_pipeline

Factory function to create a pipeline with default components:

Create a pipeline orchestrator with default components.

Parameters:

Name Type Description Default
data_path str | Path | None

Path to data directory (creates ParquetLoader)

None
loader LoaderProtocol | None

Pre-configured loader (overrides data_path)

None

Returns:

Type Description
PipelineOrchestrator

PipelineOrchestrator ready for use

Usage

With data path (uses ParquetLoader)

pipeline = create_pipeline(data_path="/path/to/data")

With custom loader

pipeline = create_pipeline(loader=CSVLoader("/path/to/data"))

Without loader (use run_with_data)

pipeline = create_pipeline()

PipelineOrchestrator

The main pipeline class (note: the class is PipelineOrchestrator, not RWAPipeline):

rwa_calc.engine.pipeline.PipelineOrchestrator

Orchestrate the complete RWA calculation pipeline.

Implements PipelineProtocol for: - Full pipeline execution from data loading to final aggregation - Pre-loaded data execution (bypassing loader) - Component dependency management - Error accumulation across stages

Pipeline stages: 1. Loader: Load raw data from files/databases 2. HierarchyResolver: Resolve counterparty and facility hierarchies 3. Classifier: Classify exposures and assign approaches 4. CRMProcessor: Apply credit risk mitigation 5. SACalculator: Calculate SA RWA 6. IRBCalculator: Calculate IRB RWA 7. SlottingCalculator: Calculate Slotting RWA 8. Aggregation: Combine results, apply floor, generate summaries

Usage

orchestrator = PipelineOrchestrator( loader=ParquetLoader(base_path), hierarchy_resolver=HierarchyResolver(), classifier=ExposureClassifier(), crm_processor=CRMProcessor(), sa_calculator=SACalculator(), irb_calculator=IRBCalculator(), slotting_calculator=SlottingCalculator(), ) result = orchestrator.run(config)

run(config)

Execute the complete RWA calculation pipeline.

Requires a loader to be configured.

Parameters:

Name Type Description Default
config CalculationConfig

Calculation configuration

required

Returns:

Type Description
AggregatedResultBundle

AggregatedResultBundle with all results and audit trail

Raises:

Type Description
ValueError

If no loader is configured

run_with_data(data, config)

Execute pipeline with pre-loaded data.

Uses single-pass architecture: all approach-specific calculators run sequentially on one unified LazyFrame, avoiding plan tree duplication and mid-pipeline materialisation.

Parameters:

Name Type Description Default
data RawDataBundle

Pre-loaded raw data bundle

required
config CalculationConfig

Calculation configuration

required

Returns:

Type Description
AggregatedResultBundle

AggregatedResultBundle with all results and audit trail

Pipeline Implementation (pipeline.py)
    stage: str
    error_type: str
    message: str
    context: dict = field(default_factory=dict)


# =============================================================================
# Pipeline Orchestrator Implementation
# =============================================================================


class PipelineOrchestrator:
    """
    Orchestrate the complete RWA calculation pipeline.

    Implements PipelineProtocol for:
    - Full pipeline execution from data loading to final aggregation
    - Pre-loaded data execution (bypassing loader)
    - Component dependency management
    - Error accumulation across stages

    Pipeline stages:
    1. Loader: Load raw data from files/databases
    2. HierarchyResolver: Resolve counterparty and facility hierarchies
    3. Classifier: Classify exposures and assign approaches
    4. CRMProcessor: Apply credit risk mitigation
    5. SACalculator: Calculate SA RWA
    6. IRBCalculator: Calculate IRB RWA
    7. SlottingCalculator: Calculate Slotting RWA
    8. Aggregation: Combine results, apply floor, generate summaries

    Usage:
        orchestrator = PipelineOrchestrator(
            loader=ParquetLoader(base_path),
            hierarchy_resolver=HierarchyResolver(),
            classifier=ExposureClassifier(),
            crm_processor=CRMProcessor(),
            sa_calculator=SACalculator(),
            irb_calculator=IRBCalculator(),
            slotting_calculator=SlottingCalculator(),
        )
        result = orchestrator.run(config)
    """

    def __init__(
        self,
        loader: LoaderProtocol | None = None,
        hierarchy_resolver: HierarchyResolverProtocol | None = None,
        classifier: ClassifierProtocol | None = None,
        crm_processor: CRMProcessorProtocol | None = None,
        sa_calculator: SACalculatorProtocol | None = None,
        irb_calculator: IRBCalculatorProtocol | None = None,
        slotting_calculator: SlottingCalculatorProtocol | None = None,
        equity_calculator: EquityCalculatorProtocol | None = None,
        output_aggregator: OutputAggregatorProtocol | None = None,
    ) -> None:
        """
        Initialize pipeline with components.

        Components can be injected for testing or customization.
        If not provided, defaults will be created on first use.

        Args:
            loader: Data loader (optional - required for run())
            hierarchy_resolver: Hierarchy resolver
            classifier: Exposure classifier
            crm_processor: CRM processor
            sa_calculator: SA calculator
            irb_calculator: IRB calculator
            slotting_calculator: Slotting calculator
            equity_calculator: Equity calculator
            output_aggregator: Output aggregator
        """
        self._loader = loader
        self._hierarchy_resolver = hierarchy_resolver
        self._classifier = classifier
        self._crm_processor = crm_processor
        self._sa_calculator = sa_calculator
        self._irb_calculator = irb_calculator
        self._slotting_calculator = slotting_calculator
        self._equity_calculator = equity_calculator
        self._output_aggregator = output_aggregator
        self._errors: list[PipelineError] = []

    # =========================================================================
    # Public API
    # =========================================================================

    def run(self, config: CalculationConfig) -> AggregatedResultBundle:
        """
        Execute the complete RWA calculation pipeline.

        Requires a loader to be configured.

        Args:
            config: Calculation configuration

        Returns:
            AggregatedResultBundle with all results and audit trail

        Raises:

Usage Examples

Basic Usage

from datetime import date
from rwa_calc.engine.pipeline import create_pipeline
from rwa_calc.contracts.config import CalculationConfig

# Create pipeline
pipeline = create_pipeline()

# Configure for CRR
config = CalculationConfig.crr(
    reporting_date=date(2026, 12, 31),
)

# Run calculation
result = pipeline.run(config)

# Access results
print(f"Total RWA: {result.total_rwa:,.0f}")
print(f"SA RWA: {result.sa_rwa:,.0f}")
print(f"IRB RWA: {result.irb_rwa:,.0f}")

With Custom Data Path

from pathlib import Path
from rwa_calc.engine.loader import ParquetLoader

# Load data from custom path
loader = ParquetLoader()
raw_data = loader.load(Path("/path/to/data"))

# Run with pre-loaded data
result = pipeline.run_with_data(raw_data, config)

Framework Comparison

# Run under both frameworks
config_crr = CalculationConfig.crr(date(2026, 12, 31))
config_b31 = CalculationConfig.basel_3_1(date(2027, 1, 1))

result_crr = pipeline.run(config_crr)
result_b31 = pipeline.run(config_b31)

# Compare
impact = (result_b31.total_rwa / result_crr.total_rwa - 1) * 100
print(f"Basel 3.1 impact: {impact:+.1f}%")

Error Handling

result = pipeline.run(config)

# Check for errors
if result.has_errors:
    print("Calculation completed with errors:")
    for error in result.errors:
        print(f"  - {error.exposure_id}: {error.message}")

# Check for warnings
if result.has_warnings:
    print("Warnings:")
    for warning in result.warnings:
        print(f"  - {warning.message}")