Skip to content

Data Flow

This document describes how data flows through the RWA calculator, including schemas, transformations, and validation points.

Data Flow Overview

flowchart TD
    subgraph External
        A[Parquet Files]
        B[CSV Files]
    end

    subgraph Loading
        C[Schema Validation]
        D[RawDataBundle]
    end

    subgraph Processing
        E[Hierarchy Resolution]
        F[Classification]
        G[CRM Application]
    end

    subgraph Calculation
        H[SA/IRB/Slotting/Equity]
    end

    subgraph Output
        I[Aggregation]
        J[Results]
    end

    A & B --> C --> D
    D --> E --> F --> G --> H --> I --> J

Input Data

Required Files

File Description Required
counterparties.parquet Counterparty master data Yes
facilities.parquet Credit facilities Yes
loans.parquet Individual loans/draws Yes

Optional Files

File Description
contingents.parquet Off-balance sheet items
collateral.parquet Collateral details
guarantees.parquet Guarantee information
provisions.parquet Provision allocations
ratings.parquet Credit ratings
org_mapping.parquet Organization hierarchy
lending_mapping.parquet Retail lending groups

Schema Definitions

Schemas are defined in data/schemas.py. The key column names use _reference suffixes (e.g., counterparty_reference, facility_reference) rather than _id.

Counterparty Schema

COUNTERPARTY_SCHEMA = {
    "counterparty_reference": pl.String,  # Unique identifier
    "counterparty_name": pl.String,       # Legal name
    "entity_type": pl.String,             # Single source of truth: sovereign, institution, corporate, etc.
    "country_code": pl.String,            # ISO country code
    "annual_revenue": pl.Float64,         # For SME classification (EUR 50m threshold)
    "total_assets": pl.Float64,           # For large FSE threshold (EUR 70bn)
    "default_status": pl.Boolean,         # Default indicator
    "sector_code": pl.String,             # Based on SIC
    "apply_fi_scalar": pl.Boolean,        # User flag: True = apply 1.25x FI correlation scalar
    "is_managed_as_retail": pl.Boolean,   # SME managed on pooled retail basis - 75% RW
    "scra_grade": pl.String,              # SCRA grade for unrated institutions: A, B, C
    "is_investment_grade": pl.Boolean,    # Publicly traded + investment grade → 65% SA RW
}

Facility Schema

FACILITY_SCHEMA = {
    "facility_reference": pl.String,      # Unique identifier
    "product_type": pl.String,            # Product classification
    "book_code": pl.String,               # Book identifier
    "counterparty_reference": pl.String,  # Link to counterparty
    "value_date": pl.Date,                # Facility start
    "maturity_date": pl.Date,             # Final maturity
    "currency": pl.String,                # ISO currency code
    "limit": pl.Float64,                  # Total commitment
    "committed": pl.Boolean,              # Committed flag
    "lgd": pl.Float64,                    # A-IRB modelled LGD
    "beel": pl.Float64,                   # Best estimate expected loss
    "is_revolving": pl.Boolean,           # Revolving facility flag
    "seniority": pl.String,              # senior/subordinated - affects F-IRB LGD
    "risk_type": pl.String,              # FR/MR/MLR/LR - determines CCF
    "ccf_modelled": pl.Float64,          # A-IRB modelled CCF (0.0-1.5)
    "is_short_term_trade_lc": pl.Boolean, # 20% CCF under F-IRB (Art. 166(9))
    "is_buy_to_let": pl.Boolean,         # BTL - excluded from SME supporting factor
    "is_qrre_transactor": pl.Boolean,    # QRRE transactor flag (CRE30.55)
}

Loan Schema

LOAN_SCHEMA = {
    "loan_reference": pl.String,          # Unique identifier
    "product_type": pl.String,            # Product classification
    "book_code": pl.String,               # Book identifier
    "counterparty_reference": pl.String,  # Link to counterparty
    "value_date": pl.Date,                # Loan start
    "maturity_date": pl.Date,             # Final maturity
    "currency": pl.String,                # ISO currency code
    "drawn_amount": pl.Float64,           # Outstanding principal
    "interest": pl.Float64,              # Accrued interest (adds to EAD)
    "lgd": pl.Float64,                   # A-IRB modelled LGD
    "beel": pl.Float64,                  # Best estimate expected loss
    "seniority": pl.String,             # senior/subordinated
    "is_buy_to_let": pl.Boolean,        # BTL property lending
    "has_netting_agreement": pl.Boolean, # Netting agreement flag
    "netting_facility_reference": pl.String, # Netting facility reference
}

Rating Schema

RATINGS_SCHEMA = {
    "rating_reference": pl.String,        # Unique identifier
    "counterparty_reference": pl.String,  # Link to counterparty
    "rating_type": pl.String,             # internal/external
    "rating_agency": pl.String,           # internal, S&P, Moodys, Fitch, DBRS, etc.
    "rating_value": pl.String,            # AAA, AA+, Aa1, etc.
    "cqs": pl.Int8,                       # Credit Quality Step 1-6
    "pd": pl.Float64,                     # Probability of Default (for internal ratings)
    "rating_date": pl.Date,               # Rating as-of date
    "is_solicited": pl.Boolean,           # Whether rating was solicited
    "model_id": pl.String,                # IRB model identifier
}

Collateral Schema

COLLATERAL_SCHEMA = {
    "collateral_reference": pl.String,    # Unique identifier
    "collateral_type": pl.String,         # cash, gold, equity, bond, real_estate, etc.
    "currency": pl.String,                # ISO currency code
    "maturity_date": pl.Date,             # Collateral maturity
    "market_value": pl.Float64,           # Current market value
    "nominal_value": pl.Float64,          # Nominal value
    "pledge_percentage": pl.Float64,      # Fraction of beneficiary EAD (0.5 = 50%)
    "beneficiary_type": pl.String,        # counterparty/loan/facility/contingent
    "beneficiary_reference": pl.String,   # Reference to linked entity
    "issuer_cqs": pl.Int8,               # CQS of issuer (1-6) for haircut lookup
    "issuer_type": pl.String,            # sovereign/pse/corporate/securitisation
    "residual_maturity_years": pl.Float64, # For haircut bands
    "property_type": pl.String,          # residential/commercial (RE collateral)
    "property_ltv": pl.Float64,          # Loan-to-value ratio
    "is_income_producing": pl.Boolean,   # Material income dependence
    "is_adc": pl.Boolean,               # Acquisition/Development/Construction
    "is_eligible_financial_collateral": pl.Boolean, # SA eligibility (CRR Art 197)
    "is_eligible_irb_collateral": pl.Boolean,       # IRB eligibility (CRR Art 199)
    "valuation_date": pl.Date,           # Date of last valuation
    "valuation_type": pl.String,         # market, indexed, independent
    "pledge_percentage": pl.Float64,     # Fraction of beneficiary EAD
    "is_presold": pl.Boolean,            # ADC pre-sold to qualifying buyer
}

Data Transformations

Stage 1: Loading

Input: Raw files Output: RawDataBundle

Transformations: - Load files as LazyFrames - Validate against schemas - Convert data types - Add metadata columns

# Example transformation
counterparties = (
    pl.scan_parquet(path / "counterparties.parquet")
    .with_columns(
        _load_timestamp=pl.lit(datetime.now()),
        _source_file=pl.lit("counterparties.parquet")
    )
)

Stage 2: Hierarchy Resolution

Input: RawDataBundle Output: ResolvedHierarchyBundle

See hierarchy.py for implementation.

Transformations: - Resolve parent-child relationships - Calculate aggregate exposures - Inherit ratings - Resolve lending groups

# Hierarchy resolution adds columns
resolved = (
    exposures
    .with_columns(
        ultimate_parent_id=...,
        group_total_exposure=...,
        inherited_rating=...,
        inherited_cqs=...,
    )
)
Hierarchy Resolution (hierarchy.py)
        # Enrich counterparties with hierarchy info
        enriched_counterparties = self._enrich_counterparties_with_hierarchy(
            counterparties,
            org_mappings,
            ratings,
            ultimate_parents,
            rating_info,
        )

        return CounterpartyLookup(
            counterparties=enriched_counterparties,
            parent_mappings=org_mappings.select(
                [
                    "child_counterparty_reference",
                    "parent_counterparty_reference",
                ]
            ),
            ultimate_parent_mappings=ultimate_parents,
            rating_inheritance=rating_info,
        ), errors

    def _build_ultimate_parent_lazy(
        self,
        org_mappings: pl.LazyFrame,
        max_depth: int = 10,
    ) -> pl.LazyFrame:
        """
        Build ultimate parent mapping using eager graph traversal.

        Collects the small edge data eagerly, resolves the full graph via dict
        traversal, and returns the result as a LazyFrame for downstream joins.

        Returns LazyFrame with columns:
        - counterparty_reference: The entity
        - ultimate_parent_reference: Its ultimate parent
        - hierarchy_depth: Number of levels traversed
        """
        edges = (
            org_mappings.select(
                [
                    "child_counterparty_reference",
                    "parent_counterparty_reference",
                ]
            )
            .unique()
            .collect()
        )

        resolved = _resolve_graph_eager(
            edges,
            child_col="child_counterparty_reference",
            parent_col="parent_counterparty_reference",
            max_depth=max_depth,
        )

        return resolved.rename(
            {
                "entity": "counterparty_reference",
                "root": "ultimate_parent_reference",
                "depth": "hierarchy_depth",
            }
        ).lazy()

    def _build_rating_inheritance_lazy(
        self,
        counterparties: pl.LazyFrame,
        ratings: pl.LazyFrame,
        ultimate_parents: pl.LazyFrame,
    ) -> pl.LazyFrame:
        """

Stage 3: Classification

Input: ResolvedHierarchyBundle Output: ClassifiedExposuresBundle

See classifier.py for implementation.

Transformations: - Assign exposure class - Determine calculation approach - Calculate CCFs - Calculate EAD

# Classification adds columns
classified = (
    resolved
    .with_columns(
        exposure_class=...,
        approach_type=...,
        ccf=...,
        ead=pl.col("drawn_amount") + pl.col("undrawn_amount") * pl.col("ccf"),
    )
)
Classification Logic (classifier.py)
            pl.col("approach").is_in([ApproachType.FIRB.value, ApproachType.AIRB.value])
        )
        slotting_exposures = classified.filter(pl.col("approach") == ApproachType.SLOTTING.value)
        classification_audit = self._build_audit_trail(classified)

        return ClassifiedExposuresBundle(
            all_exposures=classified,
            sa_exposures=sa_exposures,
            irb_exposures=irb_exposures,
            slotting_exposures=slotting_exposures,
            equity_exposures=data.equity_exposures,
            ciu_holdings=data.ciu_holdings,
            collateral=data.collateral,
            guarantees=data.guarantees,
            provisions=data.provisions,
            counterparty_lookup=data.counterparty_lookup,
            classification_audit=classification_audit,
            classification_errors=[],
        )

    # =========================================================================
    # Phase 1: Counterparty join (retained unchanged)
    # =========================================================================

    def _add_counterparty_attributes(
        self,
        exposures: pl.LazyFrame,
        counterparties: pl.LazyFrame,
    ) -> pl.LazyFrame:
        """
        Add counterparty attributes needed for classification.

        Joins exposures with counterparty data to get:
        - entity_type (single source of truth for exposure class)
        - annual_revenue (for SME check)
        - total_assets (for large financial sector entity threshold)
        - default_status
        - country_code
        - apply_fi_scalar (for FI scalar - LFSE/unregulated FSE)
        - is_managed_as_retail (for SME retail treatment)
        """
        cp_schema = counterparties.collect_schema()
        cp_col_names = cp_schema.names()

        select_cols = [
            pl.col("counterparty_reference"),
            pl.col("entity_type").str.to_lowercase().alias("cp_entity_type"),
            pl.col("country_code").alias("cp_country_code"),
            pl.col("annual_revenue").alias("cp_annual_revenue"),
            pl.col("total_assets").alias("cp_total_assets"),

Stage 4: CRM Processing

Input: ClassifiedExposuresBundle Output: CRMAdjustedBundle

See crm/processor.py for implementation.

Transformations (Art. 111(2) compliant order):

  1. Resolve provisions — drawn-first deduction (SA), tracking only (IRB/Slotting)
  2. Apply CCFs — uses nominal_after_provision for off-balance sheet conversion
  3. Initialize EAD — set ead_pre_crm from drawn + interest + CCF contribution
  4. Apply collateral — haircuts, overcollateralisation, multi-level allocation
  5. Apply guarantees — substitution, cross-approach CCF substitution
  6. Finalize EAD — floor at zero, no further provision subtraction
# CRM processing adds columns
crm_adjusted = (
    classified
    .with_columns(
        # Provision resolution (before CCF)
        provision_allocated=...,
        provision_on_drawn=...,
        provision_on_nominal=...,
        nominal_after_provision=...,
        provision_deducted=...,
        # Collateral
        collateral_value=...,
        collateral_haircut=...,
        # Guarantees
        guaranteed_amount=...,
        guarantor_rw=...,
        # Final
        net_ead=...,
    )
)
CRM Processing (processor.py)
                "_ben_ref_facility": pl.String,
                "_ead_facility": pl.Float64,
                "_currency_facility": pl.String,
                "_maturity_facility": pl.Date,
            }
        )

    # Counterparty: aggregated per counterparty_reference
    cp_lookup = (
        exposures.group_by("counterparty_reference")
        .agg(
            [
                pl.col("ead_gross").sum().alias("_ead_cp"),
                pl.col("currency").first().alias("_currency_cp"),
                pl.col("maturity_date").first().alias("_maturity_cp"),
            ]
        )
        .rename({"counterparty_reference": "_ben_ref_cp"})
    )

    return direct_lookup, facility_lookup, cp_lookup


def _join_collateral_to_lookups(
    collateral: pl.LazyFrame,
    direct_lookup: pl.LazyFrame,
    facility_lookup: pl.LazyFrame,
    cp_lookup: pl.LazyFrame,
) -> pl.LazyFrame:
    """
    Join all lookup columns (EAD, currency, maturity) onto collateral in one pass.

    Replaces the separate _resolve_pledge_percentages and join_exposure_currency
    join passes — each used 3 left joins, so 6 total. Now we do 3 joins total,
    halving the plan size from lookup subtree duplication.

    When beneficiary_type is absent, falls back to a single direct join.
    """
    from rwa_calc.engine.crm.constants import DIRECT_BENEFICIARY_TYPES

    coll_schema = collateral.collect_schema()

    if "beneficiary_type" not in coll_schema.names():
        # Direct-only join — single pass with all columns
        return collateral.join(
            direct_lookup.select(
                pl.col("_ben_ref_direct"),
                pl.col("_ead_direct").alias("_beneficiary_ead"),
                pl.col("_currency_direct").alias("exposure_currency"),
                pl.col("_maturity_direct").alias("exposure_maturity"),
            ),
            left_on="beneficiary_reference",
            right_on="_ben_ref_direct",
            how="left",
        )

    bt_lower = pl.col("beneficiary_type").str.to_lowercase()

    # 3 left joins — each adds level-specific EAD, currency, maturity columns
    collateral = (
        collateral.join(
            direct_lookup,
            left_on="beneficiary_reference",
            right_on="_ben_ref_direct",
            how="left",
        )
        .join(
            facility_lookup,
            left_on="beneficiary_reference",
            right_on="_ben_ref_facility",
            how="left",
        )
        .join(
            cp_lookup,

Stage 5: RWA Calculation

Input: CRMAdjustedBundle Output: Result bundles

See sa/calculator.py and irb/formulas.py for implementations.

Transformations: - Look up risk weights (SA) - Calculate K formula (IRB) - Apply maturity adjustment - Calculate RWA

# SA calculation
sa_result = (
    sa_exposures
    .with_columns(
        risk_weight=...,
        supporting_factor=...,
        rwa=pl.col("ead") * pl.col("risk_weight") * pl.col("supporting_factor"),
    )
)

# IRB calculation
irb_result = (
    irb_exposures
    .with_columns(
        pd_floored=...,
        lgd_floored=...,
        correlation=...,
        k=...,
        maturity_adjustment=...,
        rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") * scaling,
        expected_loss=pl.col("pd") * pl.col("lgd") * pl.col("ead"),
    )
)
IRB Formula Application (formulas.py)
    # Optimisation: if all floors are the same (CRR case), return a scalar
    all_values = {
        floors.corporate,
        floors.corporate_sme,
        floors.retail_mortgage,
        floors.retail_other,
        floors.retail_qrre_transactor,
        floors.retail_qrre_revolver,
    }
    if len(all_values) == 1:
        return pl.lit(float(all_values.pop()))

    # Basel 3.1: differentiated floors by exposure class
    exp_class = pl.col("exposure_class").cast(pl.String).fill_null("CORPORATE").str.to_uppercase()

    # QRRE transactor/revolver distinction (CRE30.55):
    # Transactors (repay in full each period) get 0.03% floor;
    # revolvers (carry balance) get 0.10% floor.
    if has_transactor_col:
        qrre_floor = (
            pl.when(pl.col("is_qrre_transactor").fill_null(False))
            .then(pl.lit(float(floors.retail_qrre_transactor)))
            .otherwise(pl.lit(float(floors.retail_qrre_revolver)))
        )
    else:
        # Conservative default: revolver floor (0.10% under Basel 3.1)
        qrre_floor = pl.lit(float(floors.retail_qrre_revolver))

    return (
        pl.when(exp_class.str.contains("QRRE"))
        .then(qrre_floor)
        .when(exp_class.str.contains("MORTGAGE") | exp_class.str.contains("RESIDENTIAL"))
        .then(pl.lit(float(floors.retail_mortgage)))
        .when(exp_class.str.contains("RETAIL"))
        .then(pl.lit(float(floors.retail_other)))
        .when(exp_class == "CORPORATE_SME")
        .then(pl.lit(float(floors.corporate_sme)))
        .otherwise(pl.lit(float(floors.corporate)))
    )


def _lgd_floor_expression(
    config: CalculationConfig,
    *,
    has_seniority: bool = False,
) -> pl.Expr:
    """
    Build Polars expression for LGD floor (no collateral_type column).

    Under CRR: No LGD floors (returns 0.0).
    Under Basel 3.1 (CRE30.41): Differentiated floors for A-IRB:
        - Unsecured (senior): 25%
        - Unsecured (subordinated): 50%
        - Financial collateral: 0%
        - Receivables: 10%
        - CRE: 10%, RRE: 5%
        - Other physical: 15%

    Without a collateral_type column, defaults to unsecured floor (25%/50%).
    When has_seniority=True, checks seniority column for subordinated (50%).

    Returns a Polars expression evaluating to the per-row LGD floor value.
    """
    if config.is_crr:
        return pl.lit(0.0)

    floors = config.lgd_floors

    if has_seniority:
        is_subordinated = (
            pl.col("seniority").fill_null("senior").str.to_lowercase().str.contains("sub")
        )
        return (
            pl.when(is_subordinated)
            .then(pl.lit(float(floors.subordinated_unsecured)))
            .otherwise(pl.lit(float(floors.unsecured)))
        )

    # Default to unsecured floor (25%) — most conservative for senior
    return pl.lit(float(floors.unsecured))


def _lgd_floor_expression_with_collateral(
    config: CalculationConfig,
    *,
    has_seniority: bool = False,
) -> pl.Expr:
    """
    Build Polars expression for per-collateral-type LGD floor when collateral_type
    column is available.

    This is used when the dataframe has a collateral_type column, allowing
    precise per-row LGD floors based on the primary collateral type.

    When has_seniority=True, subordinated unsecured exposures get the higher
    50% floor instead of 25% (CRE30.41).
    """
    if config.is_crr:
        return pl.lit(0.0)

    floors = config.lgd_floors
    coll = pl.col("collateral_type").fill_null("unsecured").str.to_lowercase()

    # Determine unsecured floor: 50% for subordinated, 25% for senior (CRE30.41)

Stage 6: Aggregation

Input: Result bundles Output: AggregatedResultBundle

See aggregator.py for implementation.

Transformations: - Combine results from all approaches - Apply output floor (Basel 3.1) - Calculate totals and breakdowns

# Aggregation
aggregated = (
    pl.concat([sa_result, irb_result, slotting_result])
    .with_columns(
        # Basel 3.1 output floor
        rwa_floored=pl.when(framework == "BASEL_3_1")
            .then(pl.max_horizontal("rwa", "sa_equivalent_rwa" * floor))
            .otherwise(pl.col("rwa"))
    )
)
Aggregation (aggregator.py)

Data Validation

Schema Validation

def validate_schema(
    df: pl.LazyFrame,
    expected_schema: dict[str, pl.DataType]
) -> list[ValidationError]:
    """Validate DataFrame against expected schema."""
    errors = []

    for column, expected_type in expected_schema.items():
        if column not in df.columns:
            errors.append(ValidationError(
                field=column,
                message=f"Missing required column: {column}"
            ))
        elif df.schema[column] != expected_type:
            errors.append(ValidationError(
                field=column,
                message=f"Type mismatch: expected {expected_type}, got {df.schema[column]}"
            ))

    return errors

Business Rule Validation

def validate_exposure(exposure: dict) -> list[ValidationError]:
    """Validate exposure against business rules."""
    errors = []

    # EAD must be positive
    if exposure["ead"] <= 0:
        errors.append(ValidationError(
            field="ead",
            message="EAD must be positive"
        ))

    # PD must be in valid range
    if not (0 <= exposure["pd"] <= 1):
        errors.append(ValidationError(
            field="pd",
            message="PD must be between 0 and 1"
        ))

    return errors

Output Data

Result Schema

CALCULATION_OUTPUT_SCHEMA = {
    "exposure_reference": pl.String,
    "counterparty_reference": pl.String,
    "exposure_class": pl.String,
    "approach_applied": pl.String,
    "final_ead": pl.Float64,
    "sa_final_risk_weight": pl.Float64,
    "irb_risk_weight": pl.Float64,
    "final_rwa": pl.Float64,
    "irb_expected_loss": pl.Float64,  # IRB only
    "rwa_before_floor": pl.Float64,   # Basel 3.1
    "floor_impact": pl.Float64,       # Basel 3.1
}

Export Formats

# Export to various formats
result.to_parquet("results.parquet")
result.to_csv("results.csv")
result.to_json("results.json")

# Get as DataFrame
df = result.to_dataframe()

Data Lineage

Each calculator produces an audit trail LazyFrame alongside results. The audit trail captures the full calculation reasoning per exposure, including classification decisions, CRM adjustments, and formula parameters. Access audit data via the result bundles:

# Access audit trails from result bundles
result = pipeline.run_with_data(raw_data, config)

# SA audit
sa_audit = result.sa_result.calculation_audit

# IRB audit
irb_audit = result.irb_result.calculation_audit

# Materialize results with full detail
df = result.to_dataframe()

Next Steps