Skip to content

Pipeline Architecture

The RWA calculator processes exposures through a well-defined pipeline with discrete stages. This document details each stage and how they interact.

Pipeline Overview

flowchart TD
    subgraph Input
        A[Raw Data Files]
    end

    subgraph Stage1[Stage 1: Data Loading]
        B[Loader]
        B1[RawDataBundle]
    end

    subgraph Stage2[Stage 2: Hierarchy Resolution]
        C[Hierarchy Resolver]
        C1[ResolvedHierarchyBundle]
    end

    subgraph Stage3[Stage 3: Classification]
        D[Classifier]
        D1[ClassifiedExposuresBundle]
    end

    subgraph Stage4[Stage 4: CRM Processing]
        E[CRM Processor]
        E1[CRMAdjustedBundle]
    end

    subgraph Stage5[Stage 5: Split-Once + Parallel Calculate]
        M["Materialise barrier<br/>(collect → lazy)"]
        U["SA calculate_unified<br/>(Basel 3.1 floor only)"]
        SP[Split once by approach]
        F["SA branch<br/>calculate_branch()"]
        G["IRB branch<br/>calculate_branch()"]
        H["Slotting branch<br/>calculate_branch()"]
        CA["pl.collect_all()<br/>(parallel collection)"]
    end

    subgraph Equity[Equity — Separate Path]
        L[Equity Calculator]
        L1[EquityResultBundle]
    end

    subgraph Stage6[Stage 6: Aggregation]
        I["_aggregate_single_pass()"]
        I1[AggregatedResultBundle]
    end

    A --> B --> B1
    B1 --> C --> C1
    C1 --> D --> D1
    D1 --> E --> E1
    E1 --> M --> U --> SP
    SP --> F & G & H
    F & G & H --> CA
    E1 --> L --> L1
    CA & L1 --> I --> I1

Pipeline Orchestration

The pipeline is orchestrated by PipelineOrchestrator (see pipeline.py:80-594):

from rwa_calc.engine.pipeline import create_pipeline

# Create pipeline with default components
pipeline = create_pipeline()

# Run with configuration
result = pipeline.run(config)

# Or with pre-loaded data
result = pipeline.run_with_data(raw_data, config)

Pipeline Implementation

For full API documentation, see Pipeline API Reference.

Full Implementation (pipeline.py)
    stage: str
    error_type: str
    message: str
    context: dict = field(default_factory=dict)


# =============================================================================
# Pipeline Orchestrator Implementation
# =============================================================================


class PipelineOrchestrator:
    """
    Orchestrate the complete RWA calculation pipeline.

    Implements PipelineProtocol for:
    - Full pipeline execution from data loading to final aggregation
    - Pre-loaded data execution (bypassing loader)
    - Component dependency management
    - Error accumulation across stages

    Pipeline stages:
    1. Loader: Load raw data from files/databases
    2. HierarchyResolver: Resolve counterparty and facility hierarchies
    3. Classifier: Classify exposures and assign approaches
    4. CRMProcessor: Apply credit risk mitigation
    5. SACalculator: Calculate SA RWA
    6. IRBCalculator: Calculate IRB RWA
    7. SlottingCalculator: Calculate Slotting RWA
    8. Aggregation: Combine results, apply floor, generate summaries

    Usage:
        orchestrator = PipelineOrchestrator(
            loader=ParquetLoader(base_path),
            hierarchy_resolver=HierarchyResolver(),
            classifier=ExposureClassifier(),
            crm_processor=CRMProcessor(),
            sa_calculator=SACalculator(),
            irb_calculator=IRBCalculator(),
            slotting_calculator=SlottingCalculator(),
        )
        result = orchestrator.run(config)
    """

    def __init__(
        self,
        loader: LoaderProtocol | None = None,
        hierarchy_resolver: HierarchyResolverProtocol | None = None,
        classifier: ClassifierProtocol | None = None,
        crm_processor: CRMProcessorProtocol | None = None,
        sa_calculator: SACalculatorProtocol | None = None,
        irb_calculator: IRBCalculatorProtocol | None = None,
        slotting_calculator: SlottingCalculatorProtocol | None = None,
        equity_calculator: EquityCalculatorProtocol | None = None,
        output_aggregator: OutputAggregatorProtocol | None = None,
    ) -> None:
        """
        Initialize pipeline with components.

        Components can be injected for testing or customization.
        If not provided, defaults will be created on first use.

        Args:
            loader: Data loader (optional - required for run())
            hierarchy_resolver: Hierarchy resolver
            classifier: Exposure classifier
            crm_processor: CRM processor
            sa_calculator: SA calculator
            irb_calculator: IRB calculator
            slotting_calculator: Slotting calculator
            equity_calculator: Equity calculator
            output_aggregator: Output aggregator
        """
        self._loader = loader
        self._hierarchy_resolver = hierarchy_resolver
        self._classifier = classifier
        self._crm_processor = crm_processor
        self._sa_calculator = sa_calculator
        self._irb_calculator = irb_calculator
        self._slotting_calculator = slotting_calculator
        self._equity_calculator = equity_calculator
        self._output_aggregator = output_aggregator
        self._errors: list[PipelineError] = []

    # =========================================================================
    # Public API
    # =========================================================================

    def run(self, config: CalculationConfig) -> AggregatedResultBundle:
        """
        Execute the complete RWA calculation pipeline.

        Requires a loader to be configured.

        Args:
            config: Calculation configuration

        Returns:
            AggregatedResultBundle with all results and audit trail

        Raises:
            ValueError: If no loader is configured
        """
        if self._loader is None:
            raise ValueError("No loader configured. Use run_with_data() or provide a loader.")

        # Reset errors for new run
        self._errors = []

        # Stage 1: Load data
        try:
            raw_data = self._loader.load()
        except Exception as e:
            self._errors.append(
                PipelineError(
                    stage="loader",
                    error_type="load_error",
                    message=str(e),
                )
            )
            return self._create_error_result()

        return self.run_with_data(raw_data, config)

    def run_with_data(
        self,
        data: RawDataBundle,
        config: CalculationConfig,
    ) -> AggregatedResultBundle:
        """
        Execute pipeline with pre-loaded data.

        Uses single-pass architecture: all approach-specific calculators
        run sequentially on one unified LazyFrame, avoiding plan tree
        duplication and mid-pipeline materialisation.

        Args:
            data: Pre-loaded raw data bundle
            config: Calculation configuration

        Returns:
            AggregatedResultBundle with all results and audit trail
        """
        # Reset errors for new run
        self._errors = []

        try:
            # Ensure components are initialized (config needed for framework-specific CRM)
            self._ensure_components_initialized(config)

            # Validate input data values
            self._validate_input_data(data)

            # IRB mode requires model_permissions data; without it, fall back to SA
            if config.permission_mode == PermissionMode.IRB and data.model_permissions is None:
                logger.warning(
                    "IRB permission mode selected but no model_permissions data provided. "
                    "All exposures will fall back to Standardised Approach."
                )
                config = dataclasses.replace(config, permission_mode=PermissionMode.STANDARDISED)

            # Stage 2: Resolve hierarchies
            resolved = self._run_hierarchy_resolver(data, config)
            if resolved is None:
                return self._create_error_result()

            # Stage 3: Classify exposures
            classified = self._run_classifier(resolved, config)
            if classified is None:
                return self._create_error_result()

            # Stage 4: Apply CRM (unified — no fan-out split)
            crm_adjusted = self._run_crm_processor_unified(classified, config)
            if crm_adjusted is None:
                return self._create_error_result()

            # Stages 5-8: Single-pass calculation and aggregation
            result = self._run_single_pass(crm_adjusted, config)

            # Add pipeline errors to result
            if self._errors:
                all_errors = list(result.errors) + [

Stage 1: Data Loading

Purpose

Load raw data from Parquet/CSV files into LazyFrames.

Input

File paths to data files.

Output

RawDataBundle containing: - counterparties: Counterparty master data - facilities: Credit facility data - loans: Individual loan/draw data - contingents: Off-balance sheet items - collateral: Collateral information - guarantees: Guarantee data - provisions: Provision allocations - ratings: External and internal ratings - org_mapping: Organization hierarchy - lending_mapping: Retail lending groups

Implementation

class ParquetLoader:
    def load(self, path: Path) -> RawDataBundle:
        return RawDataBundle(
            counterparties=pl.scan_parquet(path / "counterparties.parquet"),
            facilities=pl.scan_parquet(path / "facilities.parquet"),
            loans=pl.scan_parquet(path / "loans.parquet"),
            # ... other data sources
        )

Validation

  • Schema validation against defined schemas
  • Required field checks
  • Data type validation

Stage 2: Hierarchy Resolution

Purpose

Resolve counterparty and facility hierarchies, inherit ratings, unify exposures, and prepare enriched data for classification.

Input

RawDataBundle

Output

ResolvedHierarchyBundle with: - exposures: Unified LazyFrame (loans + contingents + facility undrawn) - counterparty_lookup: Enriched counterparties with hierarchy metadata and inherited ratings - collateral, guarantees, provisions: FX-converted CRM data - lending_group_totals: Aggregated group exposure for retail threshold testing - hierarchy_errors: Accumulated non-blocking errors

Processing Steps

Step 1: Build counterparty hierarchy lookup
  ├── _build_ultimate_parent_lazy()      → traverse org_mappings (up to 10 levels)
  ├── _build_rating_inheritance_lazy()   → inherit ratings (own → parent → unrated)
  └── _enrich_counterparties_with_hierarchy() → add hierarchy metadata

Step 2: Unify exposures
  ├── Standardise loans         → exposure_type = "loan"
  ├── Standardise contingents   → exposure_type = "contingent"
  ├── _build_facility_root_lookup()       → traverse facility-to-facility hierarchies
  ├── _calculate_facility_undrawn()       → limit - sum(descendant drawn amounts)
  └── pl.concat(all exposure types)       → single unified LazyFrame

Step 2a: FX conversion          → convert all monetary values to base currency
Step 2b: Add collateral LTV     → direct → facility → counterparty priority

Step 3: Residential property coverage
  └── Separate residential vs all-property collateral for threshold exclusion

Step 4: Lending group totals
  └── Aggregate by group, exclude residential from retail threshold (CRR Art. 123(c))

Step 5: Add lending group totals to exposures

Counterparty Hierarchy

The hierarchy resolution uses iterative Polars LazyFrame joins for performance. See hierarchy.py:258-327 for the full implementation.

Build ultimate parent mapping using eager graph traversal.

Collects the small edge data eagerly, resolves the full graph via dict traversal, and returns the result as a LazyFrame for downstream joins.

Returns LazyFrame with columns: - counterparty_reference: The entity - ultimate_parent_reference: Its ultimate parent - hierarchy_depth: Number of levels traversed

Actual Implementation (hierarchy.py)
            child_col="child_counterparty_reference",
            parent_col="parent_counterparty_reference",
            max_depth=max_depth,
        )

        return resolved.rename(
            {
                "entity": "counterparty_reference",
                "root": "ultimate_parent_reference",
                "depth": "hierarchy_depth",
            }
        ).lazy()

    def _build_rating_inheritance_lazy(
        self,
        counterparties: pl.LazyFrame,
        ratings: pl.LazyFrame,
        ultimate_parents: pl.LazyFrame,
    ) -> pl.LazyFrame:
        """
        Build rating lookup with dual per-type resolution and inheritance.

        Resolves the best internal and best external rating separately per
        counterparty, then inherits each type independently from the ultimate
        parent when the entity has no own rating of that type.

        Returns LazyFrame with columns:
        - counterparty_reference: The entity
        - internal_pd: Best internal PD (own or inherited from parent)
        - internal_model_id: Model ID for the internal rating
        - external_cqs: Best external CQS (own or inherited from parent)
        - cqs: Alias of external_cqs
        - pd: Alias of internal_pd
        """
        sort_cols = ["rating_date", "rating_reference"]

        # Ensure model_id column exists on ratings (may be absent in legacy data)
        if "model_id" not in ratings.collect_schema().names():
            ratings = ratings.with_columns(pl.lit(None).cast(pl.String).alias("model_id"))

        # Best internal rating per counterparty (no CQS — that's external only)
        best_internal = (
            ratings.filter(pl.col("rating_type") == "internal")
            .sort(sort_cols, descending=[True, True])
            .group_by("counterparty_reference")
            .first()
            .select(
                [
                    pl.col("counterparty_reference").alias("_int_cp"),
                    pl.col("pd").alias("internal_pd"),
                    pl.col("model_id").alias("internal_model_id"),
                ]
            )
        )

        # Best external rating per counterparty
        best_external = (
            ratings.filter(pl.col("rating_type") == "external")
            .sort(sort_cols, descending=[True, True])
            .group_by("counterparty_reference")
            .first()
            .select(
                [
                    pl.col("counterparty_reference").alias("_ext_cp"),
                    pl.col("cqs").alias("external_cqs"),
                ]
            )
        )

        # Materialise the per-counterparty best-rating aggregates before joining.

Rating Inheritance

Ratings are inherited from parent entities when not directly available. See hierarchy.py:329-431.

The inheritance priority is: 1. Entity's own rating 2. Ultimate parent's rating 3. Mark as unrated

Actual Implementation (hierarchy.py)
        # Polars re-evaluates the filter→sort→group_by chain per reference.
        best_int_df, best_ext_df = pl.collect_all([best_internal, best_external])
        best_internal = best_int_df.lazy()
        best_external = best_ext_df.lazy()

        # Start with all counterparties, join own ratings per type
        result = counterparties.select("counterparty_reference")
        result = result.join(
            best_internal, left_on="counterparty_reference", right_on="_int_cp", how="left"
        )
        result = result.join(
            best_external, left_on="counterparty_reference", right_on="_ext_cp", how="left"
        )

        # Join with ultimate parents for inheritance
        result = result.join(
            ultimate_parents.select(
                [
                    pl.col("counterparty_reference").alias("_cp"),
                    pl.col("ultimate_parent_reference"),
                ]
            ),
            left_on="counterparty_reference",
            right_on="_cp",
            how="left",
        )

        # Parent's best internal
        parent_internal = best_internal.select(
            [
                pl.col("_int_cp").alias("_p_int_cp"),
                pl.col("internal_pd").alias("parent_internal_pd"),
                pl.col("internal_model_id").alias("parent_internal_model_id"),
            ]
        )
        result = result.join(
            parent_internal,
            left_on="ultimate_parent_reference",
            right_on="_p_int_cp",
            how="left",
        )

        # Parent's best external
        parent_external = best_external.select(
            [
                pl.col("_ext_cp").alias("_p_ext_cp"),
                pl.col("external_cqs").alias("parent_external_cqs"),
            ]
        )
        result = result.join(
            parent_external,
            left_on="ultimate_parent_reference",
            right_on="_p_ext_cp",
            how="left",
        )

        # Per-type inheritance: coalesce own → parent for each type
        result = result.with_columns(
            [
                pl.coalesce(pl.col("internal_pd"), pl.col("parent_internal_pd")).alias(
                    "internal_pd"
                ),
                pl.coalesce(pl.col("internal_model_id"), pl.col("parent_internal_model_id")).alias(
                    "internal_model_id"
                ),
                pl.coalesce(pl.col("external_cqs"), pl.col("parent_external_cqs")).alias(
                    "external_cqs"
                ),
            ]
        )

        # Derive convenience aliases
        result = result.with_columns(
            [
                pl.col("external_cqs").alias("cqs"),
                pl.col("internal_pd").alias("pd"),
            ]
        )

        return result.select(
            [
                "counterparty_reference",
                "internal_pd",
                "internal_model_id",
                "external_cqs",
                "cqs",
                "pd",
            ]
        )

    def _build_facility_root_lookup(
        self,
        facility_mappings: pl.LazyFrame,
        max_depth: int = 10,
    ) -> pl.LazyFrame:
        """
        Build root facility lookup using eager graph traversal.

        Collects the small facility edge data eagerly, resolves the full graph
        via dict traversal, and returns the result as a LazyFrame.

        Args:
            facility_mappings: Facility mappings with parent_facility_reference,

Facility Hierarchy

Facilities can form multi-level hierarchies (e.g., master facility → sub-facilities). The resolver traverses these using the same iterative join pattern as counterparty hierarchies. See hierarchy.py:433-537.

Key behaviour: - Facility-to-facility edges identified from facility_mappings where child_type = "facility" - Traverses up to 10 levels to find the root facility for each sub-facility - Drawn amounts from all descendant loans are aggregated to the root facility - Sub-facilities are excluded from producing their own undrawn exposure records - Only root/standalone facilities with undrawn_amount > 0 generate facility_undrawn exposures

Actual Implementation (hierarchy.py)
            max_depth: Maximum hierarchy depth to traverse

        Returns:
            LazyFrame with columns:
            - child_facility_reference: The sub-facility
            - root_facility_reference: Its ultimate root facility
            - facility_hierarchy_depth: Number of levels traversed
        """
        empty_result = pl.LazyFrame(
            schema={
                "child_facility_reference": pl.String,
                "root_facility_reference": pl.String,
                "facility_hierarchy_depth": pl.Int32,
            }
        )

        type_col = _detect_type_column(set(facility_mappings.collect_schema().names()))
        if type_col is None:
            return empty_result

        # Filter to facility→facility relationships and collect (small data)
        facility_edges = (
            facility_mappings.filter(
                pl.col(type_col).fill_null("").str.to_lowercase() == "facility"
            )
            .select(
                [
                    pl.col("child_reference").alias("child_facility_reference"),
                    pl.col("parent_facility_reference"),
                ]
            )
            .unique()
            .collect()
        )

        if facility_edges.height == 0:
            return empty_result

        resolved = _resolve_graph_eager(
            facility_edges,
            child_col="child_facility_reference",
            parent_col="parent_facility_reference",
            max_depth=max_depth,
        )

        return resolved.rename(
            {
                "entity": "child_facility_reference",
                "root": "root_facility_reference",
                "depth": "facility_hierarchy_depth",
            }
        ).lazy()

    def _enrich_counterparties_with_hierarchy(
        self,
        counterparties: pl.LazyFrame,
        org_mappings: pl.LazyFrame,
        ratings: pl.LazyFrame,
        ultimate_parents: pl.LazyFrame,
        rating_inheritance: pl.LazyFrame,
    ) -> pl.LazyFrame:
        """
        Enrich counterparties with hierarchy and rating information.

        Adds columns:
        - counterparty_has_parent: bool
        - parent_counterparty_reference: str | null
        - ultimate_parent_reference: str | null
        - counterparty_hierarchy_depth: int
        - cqs, pd, internal_pd, external_cqs, internal_model_id: from ratings
        """
        # Join with org_mappings to get parent
        enriched = counterparties.join(
            org_mappings.select(
                [
                    pl.col("child_counterparty_reference"),
                    pl.col("parent_counterparty_reference"),
                ]
            ),
            left_on="counterparty_reference",
            right_on="child_counterparty_reference",
            how="left",
        )

        # Join with ultimate parents and rating inheritance in sequence,
        # then derive flags in a single with_columns batch.
        enriched = (
            enriched.join(
                ultimate_parents.select(
                    [
                        pl.col("counterparty_reference").alias("_up_cp"),
                        pl.col("ultimate_parent_reference"),
                        pl.col("hierarchy_depth").alias("counterparty_hierarchy_depth"),
                    ]
                ),
                left_on="counterparty_reference",
                right_on="_up_cp",
                how="left",
            )
            .join(
                rating_inheritance.select(
                    [
                        pl.col("counterparty_reference").alias("_ri_cp"),
                        pl.col("cqs"),
                        pl.col("pd"),

Stage 3: Classification

Purpose

Assign exposure classes and determine calculation approach.

Input

ResolvedHierarchyBundle

Output

ClassifiedExposuresBundle with: - exposure_class: Regulatory exposure class - approach: SA, F-IRB, A-IRB, or Slotting - Grouped exposures by approach

Classification Logic

The classifier assigns exposure classes and calculation approaches. See classifier.py:195-244 for the core classification logic.

rwa_calc.engine.classifier.ExposureClassifier

Classify exposures by exposure class and approach.

Implements ClassifierProtocol for: - Mapping counterparty types to exposure classes - Checking SME criteria (turnover thresholds) - Checking retail criteria (aggregate exposure thresholds) - Determining IRB eligibility based on permissions - Identifying specialised lending for slotting - Splitting exposures by calculation approach

All operations use Polars LazyFrames for deferred execution. The classifier batches expressions into 4 .with_columns() calls to keep the query plan shallow (5 nodes instead of 21).

classify(data, config)

Classify exposures and split by approach.

Parameters:

Name Type Description Default
data ResolvedHierarchyBundle

Hierarchy-resolved data from HierarchyResolver

required
config CalculationConfig

Calculation configuration

required

Returns:

Type Description
ClassifiedExposuresBundle

ClassifiedExposuresBundle with exposures split by approach

Classification Priority Order

  1. Central Govt / Central Bank: Government entities, central banks
  2. RGLA: Regional governments and local authorities
  3. PSE: Public sector entities
  4. MDB: Multilateral development banks
  5. Institution: Banks, regulated financial institutions, CCPs
  6. Retail: Individuals, small businesses meeting retail criteria
  7. Corporate: Non-financial corporates
  8. Specialised Lending: Project finance, object finance, etc.
Actual Implementation (classifier.py)
            pl.col("approach").is_in([ApproachType.FIRB.value, ApproachType.AIRB.value])
        )
        slotting_exposures = classified.filter(pl.col("approach") == ApproachType.SLOTTING.value)
        classification_audit = self._build_audit_trail(classified)

        return ClassifiedExposuresBundle(
            all_exposures=classified,
            sa_exposures=sa_exposures,
            irb_exposures=irb_exposures,
            slotting_exposures=slotting_exposures,
            equity_exposures=data.equity_exposures,
            ciu_holdings=data.ciu_holdings,
            collateral=data.collateral,
            guarantees=data.guarantees,
            provisions=data.provisions,
            counterparty_lookup=data.counterparty_lookup,
            classification_audit=classification_audit,
            classification_errors=[],
        )

    # =========================================================================
    # Phase 1: Counterparty join (retained unchanged)
    # =========================================================================

    def _add_counterparty_attributes(
        self,
        exposures: pl.LazyFrame,
        counterparties: pl.LazyFrame,
    ) -> pl.LazyFrame:
        """
        Add counterparty attributes needed for classification.

        Joins exposures with counterparty data to get:
        - entity_type (single source of truth for exposure class)
        - annual_revenue (for SME check)
        - total_assets (for large financial sector entity threshold)
        - default_status
        - country_code
        - apply_fi_scalar (for FI scalar - LFSE/unregulated FSE)
        - is_managed_as_retail (for SME retail treatment)
        """
        cp_schema = counterparties.collect_schema()
        cp_col_names = cp_schema.names()

        select_cols = [
            pl.col("counterparty_reference"),
            pl.col("entity_type").str.to_lowercase().alias("cp_entity_type"),
            pl.col("country_code").alias("cp_country_code"),
            pl.col("annual_revenue").alias("cp_annual_revenue"),
            pl.col("total_assets").alias("cp_total_assets"),

Stage 4: CRM Processing

Purpose

Apply credit risk mitigation: collateral, guarantees, provisions.

Input

ClassifiedExposuresBundle plus CRM data

Output

CRMAdjustedBundle with: - Adjusted EAD values - Applied haircuts - Substituted risk weights (guarantees) - Provision adjustments

Processing Order

Provisions are resolved before CCF application so that the nominal amount is adjusted before credit conversion. The full CRM pipeline order is:

  1. Resolve provisions — drawn-first deduction (SA only); IRB tracks but does not deduct
  2. Apply CCFs — uses nominal_after_provision for off-balance sheet conversion
  3. Initialize EAD waterfall — set ead_pre_crm from drawn + interest + CCF contribution
  4. Apply collateral with haircuts and overcollateralisation
  5. Apply guarantees (substitution approach, cross-approach CCF substitution)
  6. Finalize EAD — floor at zero; provisions already baked into ead_pre_crm
class CRMProcessor:
    def get_crm_adjusted_bundle(
        self,
        data: ClassifiedExposuresBundle,
        config: CalculationConfig,
    ) -> CRMAdjustedBundle:
        """Apply CRM in correct order (Art. 111(2) compliant).

        Returns CRMAdjustedBundle with exposures split by approach."""

        # Step 1: Resolve provisions (before CCF)
        #   SA: drawn-first deduction, remainder reduces nominal
        #   IRB/Slotting: tracked but not deducted from EAD
        after_provisions = self._resolve_provisions(exposures, provisions, config)

        # Step 2: Apply CCFs (uses nominal_after_provision)
        after_ccf = self._apply_ccf(after_provisions, config)

        # Step 3: Initialize EAD waterfall — includes collect barrier
        #   (flattens deep lazy plan to prevent 3× re-evaluation downstream)
        after_init = self._initialize_ead(after_ccf)

        # Step 4: Collateral (3 lookup collects: direct/facility/counterparty)
        after_collateral = self._apply_collateral(after_init, collateral, config)

        # Step 5: Guarantees (cross-approach CCF substitution)
        after_guarantees = self._apply_guarantees(
            after_collateral, guarantees, counterparty_lookup, config
        )

        # Step 6: Finalize (no provision subtraction — already in ead_pre_crm)
        return self._finalize_ead(after_guarantees)

    def get_crm_unified_bundle(
        self,
        data: ClassifiedExposuresBundle,
        config: CalculationConfig,
    ) -> CRMAdjustedBundle:
        """Unified CRM — does not split by approach.

        Used for Basel 3.1 output floor: SA-equivalent RW needed on all rows."""

Stage 5: RWA Calculation

Purpose

Calculate RWA using appropriate approach for each exposure.

SA Calculator

class SACalculator:
    def calculate(
        self,
        exposures: pl.LazyFrame,
        config: CalculationConfig
    ) -> SAResultBundle:
        """Calculate SA RWA."""
        result = (
            exposures
            .with_columns(
                risk_weight=self._lookup_risk_weight(
                    pl.col("exposure_class"),
                    pl.col("cqs")
                )
            )
            .with_columns(
                rwa=pl.col("ead") * pl.col("risk_weight")
            )
        )

        # Apply supporting factors (CRR only)
        if config.framework == RegulatoryFramework.CRR:
            result = self._apply_supporting_factors(result, config)

        return SAResultBundle(data=result)

IRB Calculator

class IRBCalculator:
    def calculate(
        self,
        exposures: pl.LazyFrame,
        config: CalculationConfig
    ) -> IRBResultBundle:
        """Calculate IRB RWA."""
        result = (
            exposures
            .with_columns(
                # Apply PD floor
                pd_floored=pl.max_horizontal(
                    pl.col("pd"),
                    pl.lit(config.pd_floors.get_floor(pl.col("exposure_class")))
                )
            )
            .with_columns(
                # Calculate correlation
                correlation=self._calculate_correlation(
                    pl.col("exposure_class"),
                    pl.col("pd_floored"),
                    pl.col("turnover")
                ),
                # Calculate maturity adjustment
                ma=self._calculate_maturity_adjustment(
                    pl.col("pd_floored"),
                    pl.col("effective_maturity")
                )
            )
            .with_columns(
                # Calculate K
                k=self._calculate_k(
                    pl.col("pd_floored"),
                    pl.col("lgd"),
                    pl.col("correlation")
                )
            )
            .with_columns(
                # Calculate RWA
                rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") *
                    config.scaling_factor
            )
        )

        return IRBResultBundle(data=result)

Slotting Calculator

class SlottingCalculator:
    def calculate(
        self,
        exposures: pl.LazyFrame,
        config: CalculationConfig
    ) -> SlottingResultBundle:
        """Calculate Slotting RWA."""
        result = (
            exposures
            .with_columns(
                risk_weight=self._lookup_slotting_weight(
                    pl.col("lending_type"),
                    pl.col("slotting_category"),
                    config.framework
                )
            )
            .with_columns(
                rwa=pl.col("ead") * pl.col("risk_weight")
            )
        )

        return SlottingResultBundle(data=result)

Stage 6: Aggregation

Purpose

Combine results, apply output floor, produce final output.

Input

Result bundles from all calculators

Output

AggregatedResultBundle with: - Total RWA - RWA by approach - RWA by exposure class - Floor impact (Basel 3.1) - Detailed breakdown

Output Floor (Basel 3.1)

def apply_output_floor(
    irb_rwa: pl.LazyFrame,
    sa_rwa: pl.LazyFrame,
    floor_percentage: float
) -> pl.LazyFrame:
    """Apply output floor to IRB results."""
    return (
        irb_rwa
        .join(sa_rwa, on="exposure_reference", suffix="_sa")
        .with_columns(
            floor=pl.col("rwa_sa") * floor_percentage,
            rwa_floored=pl.max_horizontal(
                pl.col("rwa"),
                pl.col("rwa_sa") * floor_percentage
            )
        )
    )

Error Handling

Each stage accumulates errors:

@dataclass
class LazyFrameResult:
    frame: pl.LazyFrame
    errors: list[CalculationError] = field(default_factory=list)

# Pipeline accumulates across stages
all_errors = []
all_errors.extend(loader_result.errors)
all_errors.extend(hierarchy_result.errors)
# ... etc.

Next Steps