Data Flow¶
This document describes how data flows through the RWA calculator, including schemas, transformations, and validation points.
Data Flow Overview¶
flowchart TD
subgraph External
A[Parquet Files]
B[CSV Files]
end
subgraph Loading
C[Schema Validation]
D[RawDataBundle]
end
subgraph Processing
E[Hierarchy Resolution]
F[Classification]
G[CRM Application]
end
subgraph Calculation
H[SA/IRB/Slotting/Equity]
end
subgraph Output
I[Aggregation]
J[Results]
end
A & B --> C --> D
D --> E --> F --> G --> H --> I --> J
Input Data¶
Required Files¶
| File | Description | Required |
|---|---|---|
counterparties.parquet |
Counterparty master data | Yes |
facilities.parquet |
Credit facilities | Yes |
loans.parquet |
Individual loans/draws | Yes |
Optional Files¶
| File | Description |
|---|---|
contingents.parquet |
Off-balance sheet items |
collateral.parquet |
Collateral details |
guarantees.parquet |
Guarantee information |
provisions.parquet |
Provision allocations |
ratings.parquet |
Credit ratings |
org_mapping.parquet |
Organization hierarchy |
lending_mapping.parquet |
Retail lending groups |
Schema Definitions¶
Schemas are defined in data/schemas.py. The key column names use _reference suffixes (e.g., counterparty_reference, facility_reference) rather than _id.
Counterparty Schema¶
COUNTERPARTY_SCHEMA = {
"counterparty_reference": pl.String, # Unique identifier
"counterparty_name": pl.String, # Legal name
"entity_type": pl.String, # Single source of truth: sovereign, institution, corporate, etc.
"country_code": pl.String, # ISO country code
"annual_revenue": pl.Float64, # For SME classification (EUR 50m threshold)
"total_assets": pl.Float64, # For large FSE threshold (EUR 70bn)
"default_status": pl.Boolean, # Default indicator
"sector_code": pl.String, # Based on SIC
"apply_fi_scalar": pl.Boolean, # User flag: True = apply 1.25x FI correlation scalar
"is_managed_as_retail": pl.Boolean, # SME managed on pooled retail basis - 75% RW
"scra_grade": pl.String, # SCRA grade for unrated institutions: A, B, C
"is_investment_grade": pl.Boolean, # Publicly traded + investment grade → 65% SA RW
}
Facility Schema¶
FACILITY_SCHEMA = {
"facility_reference": pl.String, # Unique identifier
"product_type": pl.String, # Product classification
"book_code": pl.String, # Book identifier
"counterparty_reference": pl.String, # Link to counterparty
"value_date": pl.Date, # Facility start
"maturity_date": pl.Date, # Final maturity
"currency": pl.String, # ISO currency code
"limit": pl.Float64, # Total commitment
"committed": pl.Boolean, # Committed flag
"lgd": pl.Float64, # A-IRB modelled LGD
"beel": pl.Float64, # Best estimate expected loss
"is_revolving": pl.Boolean, # Revolving facility flag
"seniority": pl.String, # senior/subordinated - affects F-IRB LGD
"risk_type": pl.String, # FR/MR/MLR/LR - determines CCF
"ccf_modelled": pl.Float64, # A-IRB modelled CCF (0.0-1.5)
"is_short_term_trade_lc": pl.Boolean, # 20% CCF under F-IRB (Art. 166(9))
"is_buy_to_let": pl.Boolean, # BTL - excluded from SME supporting factor
"is_qrre_transactor": pl.Boolean, # QRRE transactor flag (CRE30.55)
}
Loan Schema¶
LOAN_SCHEMA = {
"loan_reference": pl.String, # Unique identifier
"product_type": pl.String, # Product classification
"book_code": pl.String, # Book identifier
"counterparty_reference": pl.String, # Link to counterparty
"value_date": pl.Date, # Loan start
"maturity_date": pl.Date, # Final maturity
"currency": pl.String, # ISO currency code
"drawn_amount": pl.Float64, # Outstanding principal
"interest": pl.Float64, # Accrued interest (adds to EAD)
"lgd": pl.Float64, # A-IRB modelled LGD
"beel": pl.Float64, # Best estimate expected loss
"seniority": pl.String, # senior/subordinated
"is_buy_to_let": pl.Boolean, # BTL property lending
"has_netting_agreement": pl.Boolean, # Netting agreement flag
"netting_facility_reference": pl.String, # Netting facility reference
}
Rating Schema¶
RATINGS_SCHEMA = {
"rating_reference": pl.String, # Unique identifier
"counterparty_reference": pl.String, # Link to counterparty
"rating_type": pl.String, # internal/external
"rating_agency": pl.String, # internal, S&P, Moodys, Fitch, DBRS, etc.
"rating_value": pl.String, # AAA, AA+, Aa1, etc.
"cqs": pl.Int8, # Credit Quality Step 1-6
"pd": pl.Float64, # Probability of Default (for internal ratings)
"rating_date": pl.Date, # Rating as-of date
"is_solicited": pl.Boolean, # Whether rating was solicited
"model_id": pl.String, # IRB model identifier
}
Collateral Schema¶
COLLATERAL_SCHEMA = {
"collateral_reference": pl.String, # Unique identifier
"collateral_type": pl.String, # cash, gold, equity, bond, real_estate, etc.
"currency": pl.String, # ISO currency code
"maturity_date": pl.Date, # Collateral maturity
"market_value": pl.Float64, # Current market value
"nominal_value": pl.Float64, # Nominal value
"pledge_percentage": pl.Float64, # Fraction of beneficiary EAD (0.5 = 50%)
"beneficiary_type": pl.String, # counterparty/loan/facility/contingent
"beneficiary_reference": pl.String, # Reference to linked entity
"issuer_cqs": pl.Int8, # CQS of issuer (1-6) for haircut lookup
"issuer_type": pl.String, # sovereign/pse/corporate/securitisation
"residual_maturity_years": pl.Float64, # For haircut bands
"property_type": pl.String, # residential/commercial (RE collateral)
"property_ltv": pl.Float64, # Loan-to-value ratio
"is_income_producing": pl.Boolean, # Material income dependence
"is_adc": pl.Boolean, # Acquisition/Development/Construction
"is_eligible_financial_collateral": pl.Boolean, # SA eligibility (CRR Art 197)
"is_eligible_irb_collateral": pl.Boolean, # IRB eligibility (CRR Art 199)
"valuation_date": pl.Date, # Date of last valuation
"valuation_type": pl.String, # market, indexed, independent
"pledge_percentage": pl.Float64, # Fraction of beneficiary EAD
"is_presold": pl.Boolean, # ADC pre-sold to qualifying buyer
}
Data Transformations¶
Stage 1: Loading¶
Input: Raw files
Output: RawDataBundle
Transformations: - Load files as LazyFrames - Validate against schemas - Convert data types - Add metadata columns
# Example transformation
counterparties = (
pl.scan_parquet(path / "counterparties.parquet")
.with_columns(
_load_timestamp=pl.lit(datetime.now()),
_source_file=pl.lit("counterparties.parquet")
)
)
Stage 2: Hierarchy Resolution¶
Input: RawDataBundle
Output: ResolvedHierarchyBundle
See hierarchy.py for implementation.
Transformations: - Resolve parent-child relationships - Calculate aggregate exposures - Inherit ratings - Resolve lending groups
# Hierarchy resolution adds columns
resolved = (
exposures
.with_columns(
ultimate_parent_id=...,
group_total_exposure=...,
inherited_rating=...,
inherited_cqs=...,
)
)
Hierarchy Resolution (hierarchy.py)
# Enrich counterparties with hierarchy info
enriched_counterparties = self._enrich_counterparties_with_hierarchy(
counterparties,
org_mappings,
ratings,
ultimate_parents,
rating_info,
)
return CounterpartyLookup(
counterparties=enriched_counterparties,
parent_mappings=org_mappings.select(
[
"child_counterparty_reference",
"parent_counterparty_reference",
]
),
ultimate_parent_mappings=ultimate_parents,
rating_inheritance=rating_info,
), errors
def _build_ultimate_parent_lazy(
self,
org_mappings: pl.LazyFrame,
max_depth: int = 10,
) -> pl.LazyFrame:
"""
Build ultimate parent mapping using eager graph traversal.
Collects the small edge data eagerly, resolves the full graph via dict
traversal, and returns the result as a LazyFrame for downstream joins.
Returns LazyFrame with columns:
- counterparty_reference: The entity
- ultimate_parent_reference: Its ultimate parent
- hierarchy_depth: Number of levels traversed
"""
edges = (
org_mappings.select(
[
"child_counterparty_reference",
"parent_counterparty_reference",
]
)
.unique()
.collect()
)
resolved = _resolve_graph_eager(
edges,
child_col="child_counterparty_reference",
parent_col="parent_counterparty_reference",
max_depth=max_depth,
)
return resolved.rename(
{
"entity": "counterparty_reference",
"root": "ultimate_parent_reference",
"depth": "hierarchy_depth",
}
).lazy()
def _build_rating_inheritance_lazy(
self,
counterparties: pl.LazyFrame,
ratings: pl.LazyFrame,
ultimate_parents: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Stage 3: Classification¶
Input: ResolvedHierarchyBundle
Output: ClassifiedExposuresBundle
See classifier.py for implementation.
Transformations: - Assign exposure class - Determine calculation approach - Calculate CCFs - Calculate EAD
# Classification adds columns
classified = (
resolved
.with_columns(
exposure_class=...,
approach_type=...,
ccf=...,
ead=pl.col("drawn_amount") + pl.col("undrawn_amount") * pl.col("ccf"),
)
)
Classification Logic (classifier.py)
pl.col("approach").is_in([ApproachType.FIRB.value, ApproachType.AIRB.value])
)
slotting_exposures = classified.filter(pl.col("approach") == ApproachType.SLOTTING.value)
classification_audit = self._build_audit_trail(classified)
return ClassifiedExposuresBundle(
all_exposures=classified,
sa_exposures=sa_exposures,
irb_exposures=irb_exposures,
slotting_exposures=slotting_exposures,
equity_exposures=data.equity_exposures,
ciu_holdings=data.ciu_holdings,
collateral=data.collateral,
guarantees=data.guarantees,
provisions=data.provisions,
counterparty_lookup=data.counterparty_lookup,
classification_audit=classification_audit,
classification_errors=[],
)
# =========================================================================
# Phase 1: Counterparty join (retained unchanged)
# =========================================================================
def _add_counterparty_attributes(
self,
exposures: pl.LazyFrame,
counterparties: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Add counterparty attributes needed for classification.
Joins exposures with counterparty data to get:
- entity_type (single source of truth for exposure class)
- annual_revenue (for SME check)
- total_assets (for large financial sector entity threshold)
- default_status
- country_code
- apply_fi_scalar (for FI scalar - LFSE/unregulated FSE)
- is_managed_as_retail (for SME retail treatment)
"""
cp_schema = counterparties.collect_schema()
cp_col_names = cp_schema.names()
select_cols = [
pl.col("counterparty_reference"),
pl.col("entity_type").str.to_lowercase().alias("cp_entity_type"),
pl.col("country_code").alias("cp_country_code"),
pl.col("annual_revenue").alias("cp_annual_revenue"),
pl.col("total_assets").alias("cp_total_assets"),
Stage 4: CRM Processing¶
Input: ClassifiedExposuresBundle
Output: CRMAdjustedBundle
See crm/processor.py for implementation.
Transformations (Art. 111(2) compliant order):
- Resolve provisions — drawn-first deduction (SA), tracking only (IRB/Slotting)
- Apply CCFs — uses
nominal_after_provisionfor off-balance sheet conversion - Initialize EAD — set
ead_pre_crmfrom drawn + interest + CCF contribution - Apply collateral — haircuts, overcollateralisation, multi-level allocation
- Apply guarantees — substitution, cross-approach CCF substitution
- Finalize EAD — floor at zero, no further provision subtraction
# CRM processing adds columns
crm_adjusted = (
classified
.with_columns(
# Provision resolution (before CCF)
provision_allocated=...,
provision_on_drawn=...,
provision_on_nominal=...,
nominal_after_provision=...,
provision_deducted=...,
# Collateral
collateral_value=...,
collateral_haircut=...,
# Guarantees
guaranteed_amount=...,
guarantor_rw=...,
# Final
net_ead=...,
)
)
CRM Processing (processor.py)
"_ben_ref_facility": pl.String,
"_ead_facility": pl.Float64,
"_currency_facility": pl.String,
"_maturity_facility": pl.Date,
}
)
# Counterparty: aggregated per counterparty_reference
cp_lookup = (
exposures.group_by("counterparty_reference")
.agg(
[
pl.col("ead_gross").sum().alias("_ead_cp"),
pl.col("currency").first().alias("_currency_cp"),
pl.col("maturity_date").first().alias("_maturity_cp"),
]
)
.rename({"counterparty_reference": "_ben_ref_cp"})
)
return direct_lookup, facility_lookup, cp_lookup
def _join_collateral_to_lookups(
collateral: pl.LazyFrame,
direct_lookup: pl.LazyFrame,
facility_lookup: pl.LazyFrame,
cp_lookup: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Join all lookup columns (EAD, currency, maturity) onto collateral in one pass.
Replaces the separate _resolve_pledge_percentages and join_exposure_currency
join passes — each used 3 left joins, so 6 total. Now we do 3 joins total,
halving the plan size from lookup subtree duplication.
When beneficiary_type is absent, falls back to a single direct join.
"""
from rwa_calc.engine.crm.constants import DIRECT_BENEFICIARY_TYPES
coll_schema = collateral.collect_schema()
if "beneficiary_type" not in coll_schema.names():
# Direct-only join — single pass with all columns
return collateral.join(
direct_lookup.select(
pl.col("_ben_ref_direct"),
pl.col("_ead_direct").alias("_beneficiary_ead"),
pl.col("_currency_direct").alias("exposure_currency"),
pl.col("_maturity_direct").alias("exposure_maturity"),
),
left_on="beneficiary_reference",
right_on="_ben_ref_direct",
how="left",
)
bt_lower = pl.col("beneficiary_type").str.to_lowercase()
# 3 left joins — each adds level-specific EAD, currency, maturity columns
collateral = (
collateral.join(
direct_lookup,
left_on="beneficiary_reference",
right_on="_ben_ref_direct",
how="left",
)
.join(
facility_lookup,
left_on="beneficiary_reference",
right_on="_ben_ref_facility",
how="left",
)
.join(
cp_lookup,
Stage 5: RWA Calculation¶
Input: CRMAdjustedBundle
Output: Result bundles
See sa/calculator.py and irb/formulas.py for implementations.
Transformations: - Look up risk weights (SA) - Calculate K formula (IRB) - Apply maturity adjustment - Calculate RWA
# SA calculation
sa_result = (
sa_exposures
.with_columns(
risk_weight=...,
supporting_factor=...,
rwa=pl.col("ead") * pl.col("risk_weight") * pl.col("supporting_factor"),
)
)
# IRB calculation
irb_result = (
irb_exposures
.with_columns(
pd_floored=...,
lgd_floored=...,
correlation=...,
k=...,
maturity_adjustment=...,
rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") * scaling,
expected_loss=pl.col("pd") * pl.col("lgd") * pl.col("ead"),
)
)
IRB Formula Application (formulas.py)
# Optimisation: if all floors are the same (CRR case), return a scalar
all_values = {
floors.corporate,
floors.corporate_sme,
floors.retail_mortgage,
floors.retail_other,
floors.retail_qrre_transactor,
floors.retail_qrre_revolver,
}
if len(all_values) == 1:
return pl.lit(float(all_values.pop()))
# Basel 3.1: differentiated floors by exposure class
exp_class = pl.col("exposure_class").cast(pl.String).fill_null("CORPORATE").str.to_uppercase()
# QRRE transactor/revolver distinction (CRE30.55):
# Transactors (repay in full each period) get 0.03% floor;
# revolvers (carry balance) get 0.10% floor.
if has_transactor_col:
qrre_floor = (
pl.when(pl.col("is_qrre_transactor").fill_null(False))
.then(pl.lit(float(floors.retail_qrre_transactor)))
.otherwise(pl.lit(float(floors.retail_qrre_revolver)))
)
else:
# Conservative default: revolver floor (0.10% under Basel 3.1)
qrre_floor = pl.lit(float(floors.retail_qrre_revolver))
return (
pl.when(exp_class.str.contains("QRRE"))
.then(qrre_floor)
.when(exp_class.str.contains("MORTGAGE") | exp_class.str.contains("RESIDENTIAL"))
.then(pl.lit(float(floors.retail_mortgage)))
.when(exp_class.str.contains("RETAIL"))
.then(pl.lit(float(floors.retail_other)))
.when(exp_class == "CORPORATE_SME")
.then(pl.lit(float(floors.corporate_sme)))
.otherwise(pl.lit(float(floors.corporate)))
)
def _lgd_floor_expression(
config: CalculationConfig,
*,
has_seniority: bool = False,
) -> pl.Expr:
"""
Build Polars expression for LGD floor (no collateral_type column).
Under CRR: No LGD floors (returns 0.0).
Under Basel 3.1 (CRE30.41): Differentiated floors for A-IRB:
- Unsecured (senior): 25%
- Unsecured (subordinated): 50%
- Financial collateral: 0%
- Receivables: 10%
- CRE: 10%, RRE: 5%
- Other physical: 15%
Without a collateral_type column, defaults to unsecured floor (25%/50%).
When has_seniority=True, checks seniority column for subordinated (50%).
Returns a Polars expression evaluating to the per-row LGD floor value.
"""
if config.is_crr:
return pl.lit(0.0)
floors = config.lgd_floors
if has_seniority:
is_subordinated = (
pl.col("seniority").fill_null("senior").str.to_lowercase().str.contains("sub")
)
return (
pl.when(is_subordinated)
.then(pl.lit(float(floors.subordinated_unsecured)))
.otherwise(pl.lit(float(floors.unsecured)))
)
# Default to unsecured floor (25%) — most conservative for senior
return pl.lit(float(floors.unsecured))
def _lgd_floor_expression_with_collateral(
config: CalculationConfig,
*,
has_seniority: bool = False,
) -> pl.Expr:
"""
Build Polars expression for per-collateral-type LGD floor when collateral_type
column is available.
This is used when the dataframe has a collateral_type column, allowing
precise per-row LGD floors based on the primary collateral type.
When has_seniority=True, subordinated unsecured exposures get the higher
50% floor instead of 25% (CRE30.41).
"""
if config.is_crr:
return pl.lit(0.0)
floors = config.lgd_floors
coll = pl.col("collateral_type").fill_null("unsecured").str.to_lowercase()
# Determine unsecured floor: 50% for subordinated, 25% for senior (CRE30.41)
Stage 6: Aggregation¶
Input: Result bundles
Output: AggregatedResultBundle
See aggregator.py for implementation.
Transformations: - Combine results from all approaches - Apply output floor (Basel 3.1) - Calculate totals and breakdowns
# Aggregation
aggregated = (
pl.concat([sa_result, irb_result, slotting_result])
.with_columns(
# Basel 3.1 output floor
rwa_floored=pl.when(framework == "BASEL_3_1")
.then(pl.max_horizontal("rwa", "sa_equivalent_rwa" * floor))
.otherwise(pl.col("rwa"))
)
)
Data Validation¶
Schema Validation¶
def validate_schema(
df: pl.LazyFrame,
expected_schema: dict[str, pl.DataType]
) -> list[ValidationError]:
"""Validate DataFrame against expected schema."""
errors = []
for column, expected_type in expected_schema.items():
if column not in df.columns:
errors.append(ValidationError(
field=column,
message=f"Missing required column: {column}"
))
elif df.schema[column] != expected_type:
errors.append(ValidationError(
field=column,
message=f"Type mismatch: expected {expected_type}, got {df.schema[column]}"
))
return errors
Business Rule Validation¶
def validate_exposure(exposure: dict) -> list[ValidationError]:
"""Validate exposure against business rules."""
errors = []
# EAD must be positive
if exposure["ead"] <= 0:
errors.append(ValidationError(
field="ead",
message="EAD must be positive"
))
# PD must be in valid range
if not (0 <= exposure["pd"] <= 1):
errors.append(ValidationError(
field="pd",
message="PD must be between 0 and 1"
))
return errors
Output Data¶
Result Schema¶
CALCULATION_OUTPUT_SCHEMA = {
"exposure_reference": pl.String,
"counterparty_reference": pl.String,
"exposure_class": pl.String,
"approach_applied": pl.String,
"final_ead": pl.Float64,
"sa_final_risk_weight": pl.Float64,
"irb_risk_weight": pl.Float64,
"final_rwa": pl.Float64,
"irb_expected_loss": pl.Float64, # IRB only
"rwa_before_floor": pl.Float64, # Basel 3.1
"floor_impact": pl.Float64, # Basel 3.1
}
Export Formats¶
# Export to various formats
result.to_parquet("results.parquet")
result.to_csv("results.csv")
result.to_json("results.json")
# Get as DataFrame
df = result.to_dataframe()
Data Lineage¶
Each calculator produces an audit trail LazyFrame alongside results. The audit trail captures the full calculation reasoning per exposure, including classification decisions, CRM adjustments, and formula parameters. Access audit data via the result bundles:
# Access audit trails from result bundles
result = pipeline.run_with_data(raw_data, config)
# SA audit
sa_audit = result.sa_result.calculation_audit
# IRB audit
irb_audit = result.irb_result.calculation_audit
# Materialize results with full detail
df = result.to_dataframe()
Next Steps¶
- Component Overview - Individual components
- API Reference - API documentation
- Data Model - Complete schema reference