Pipeline Architecture¶
The RWA calculator processes exposures through a well-defined pipeline with discrete stages. This document details each stage and how they interact.
Pipeline Overview¶
flowchart TD
subgraph Input
A[Raw Data Files]
end
subgraph Stage1[Stage 1: Data Loading]
B[Loader]
B1[RawDataBundle]
end
subgraph Stage2[Stage 2: Hierarchy Resolution]
C[Hierarchy Resolver]
C1[ResolvedHierarchyBundle]
end
subgraph Stage3[Stage 3: Classification]
D[Classifier]
D1[ClassifiedExposuresBundle]
end
subgraph Stage4[Stage 4: CRM Processing]
E[CRM Processor]
E1[CRMAdjustedBundle]
end
subgraph Stage5[Stage 5: Split-Once + Parallel Calculate]
M["Materialise barrier<br/>(collect → lazy)"]
U["SA calculate_unified<br/>(Basel 3.1 floor only)"]
SP[Split once by approach]
F["SA branch<br/>calculate_branch()"]
G["IRB branch<br/>calculate_branch()"]
H["Slotting branch<br/>calculate_branch()"]
CA["pl.collect_all()<br/>(parallel collection)"]
end
subgraph Equity[Equity — Separate Path]
L[Equity Calculator]
L1[EquityResultBundle]
end
subgraph Stage6[Stage 6: Aggregation]
I["_aggregate_single_pass()"]
I1[AggregatedResultBundle]
end
A --> B --> B1
B1 --> C --> C1
C1 --> D --> D1
D1 --> E --> E1
E1 --> M --> U --> SP
SP --> F & G & H
F & G & H --> CA
E1 --> L --> L1
CA & L1 --> I --> I1
Pipeline Orchestration¶
The pipeline is orchestrated by PipelineOrchestrator (see pipeline.py:80-594):
from rwa_calc.engine.pipeline import create_pipeline
# Create pipeline with default components
pipeline = create_pipeline()
# Run with configuration
result = pipeline.run(config)
# Or with pre-loaded data
result = pipeline.run_with_data(raw_data, config)
Pipeline Implementation¶
For full API documentation, see Pipeline API Reference.
Full Implementation (pipeline.py)
stage: str
error_type: str
message: str
context: dict = field(default_factory=dict)
# =============================================================================
# Pipeline Orchestrator Implementation
# =============================================================================
class PipelineOrchestrator:
"""
Orchestrate the complete RWA calculation pipeline.
Implements PipelineProtocol for:
- Full pipeline execution from data loading to final aggregation
- Pre-loaded data execution (bypassing loader)
- Component dependency management
- Error accumulation across stages
Pipeline stages:
1. Loader: Load raw data from files/databases
2. HierarchyResolver: Resolve counterparty and facility hierarchies
3. Classifier: Classify exposures and assign approaches
4. CRMProcessor: Apply credit risk mitigation
5. SACalculator: Calculate SA RWA
6. IRBCalculator: Calculate IRB RWA
7. SlottingCalculator: Calculate Slotting RWA
8. Aggregation: Combine results, apply floor, generate summaries
Usage:
orchestrator = PipelineOrchestrator(
loader=ParquetLoader(base_path),
hierarchy_resolver=HierarchyResolver(),
classifier=ExposureClassifier(),
crm_processor=CRMProcessor(),
sa_calculator=SACalculator(),
irb_calculator=IRBCalculator(),
slotting_calculator=SlottingCalculator(),
)
result = orchestrator.run(config)
"""
def __init__(
self,
loader: LoaderProtocol | None = None,
hierarchy_resolver: HierarchyResolverProtocol | None = None,
classifier: ClassifierProtocol | None = None,
crm_processor: CRMProcessorProtocol | None = None,
sa_calculator: SACalculatorProtocol | None = None,
irb_calculator: IRBCalculatorProtocol | None = None,
slotting_calculator: SlottingCalculatorProtocol | None = None,
equity_calculator: EquityCalculatorProtocol | None = None,
output_aggregator: OutputAggregatorProtocol | None = None,
) -> None:
"""
Initialize pipeline with components.
Components can be injected for testing or customization.
If not provided, defaults will be created on first use.
Args:
loader: Data loader (optional - required for run())
hierarchy_resolver: Hierarchy resolver
classifier: Exposure classifier
crm_processor: CRM processor
sa_calculator: SA calculator
irb_calculator: IRB calculator
slotting_calculator: Slotting calculator
equity_calculator: Equity calculator
output_aggregator: Output aggregator
"""
self._loader = loader
self._hierarchy_resolver = hierarchy_resolver
self._classifier = classifier
self._crm_processor = crm_processor
self._sa_calculator = sa_calculator
self._irb_calculator = irb_calculator
self._slotting_calculator = slotting_calculator
self._equity_calculator = equity_calculator
self._output_aggregator = output_aggregator
self._errors: list[PipelineError] = []
# =========================================================================
# Public API
# =========================================================================
def run(self, config: CalculationConfig) -> AggregatedResultBundle:
"""
Execute the complete RWA calculation pipeline.
Requires a loader to be configured.
Args:
config: Calculation configuration
Returns:
AggregatedResultBundle with all results and audit trail
Raises:
ValueError: If no loader is configured
"""
if self._loader is None:
raise ValueError("No loader configured. Use run_with_data() or provide a loader.")
# Reset errors for new run
self._errors = []
# Stage 1: Load data
try:
raw_data = self._loader.load()
except Exception as e:
self._errors.append(
PipelineError(
stage="loader",
error_type="load_error",
message=str(e),
)
)
return self._create_error_result()
return self.run_with_data(raw_data, config)
def run_with_data(
self,
data: RawDataBundle,
config: CalculationConfig,
) -> AggregatedResultBundle:
"""
Execute pipeline with pre-loaded data.
Uses single-pass architecture: all approach-specific calculators
run sequentially on one unified LazyFrame, avoiding plan tree
duplication and mid-pipeline materialisation.
Args:
data: Pre-loaded raw data bundle
config: Calculation configuration
Returns:
AggregatedResultBundle with all results and audit trail
"""
# Reset errors for new run
self._errors = []
try:
# Ensure components are initialized (config needed for framework-specific CRM)
self._ensure_components_initialized(config)
# Validate input data values
self._validate_input_data(data)
# IRB mode requires model_permissions data; without it, fall back to SA
if config.permission_mode == PermissionMode.IRB and data.model_permissions is None:
logger.warning(
"IRB permission mode selected but no model_permissions data provided. "
"All exposures will fall back to Standardised Approach."
)
config = dataclasses.replace(config, permission_mode=PermissionMode.STANDARDISED)
# Stage 2: Resolve hierarchies
resolved = self._run_hierarchy_resolver(data, config)
if resolved is None:
return self._create_error_result()
# Stage 3: Classify exposures
classified = self._run_classifier(resolved, config)
if classified is None:
return self._create_error_result()
# Stage 4: Apply CRM (unified — no fan-out split)
crm_adjusted = self._run_crm_processor_unified(classified, config)
if crm_adjusted is None:
return self._create_error_result()
# Stages 5-8: Single-pass calculation and aggregation
result = self._run_single_pass(crm_adjusted, config)
# Add pipeline errors to result
if self._errors:
all_errors = list(result.errors) + [
Stage 1: Data Loading¶
Purpose¶
Load raw data from Parquet/CSV files into LazyFrames.
Input¶
File paths to data files.
Output¶
RawDataBundle containing:
- counterparties: Counterparty master data
- facilities: Credit facility data
- loans: Individual loan/draw data
- contingents: Off-balance sheet items
- collateral: Collateral information
- guarantees: Guarantee data
- provisions: Provision allocations
- ratings: External and internal ratings
- org_mapping: Organization hierarchy
- lending_mapping: Retail lending groups
Implementation¶
class ParquetLoader:
def load(self, path: Path) -> RawDataBundle:
return RawDataBundle(
counterparties=pl.scan_parquet(path / "counterparties.parquet"),
facilities=pl.scan_parquet(path / "facilities.parquet"),
loans=pl.scan_parquet(path / "loans.parquet"),
# ... other data sources
)
Validation¶
- Schema validation against defined schemas
- Required field checks
- Data type validation
Stage 2: Hierarchy Resolution¶
Purpose¶
Resolve counterparty and facility hierarchies, inherit ratings, unify exposures, and prepare enriched data for classification.
Input¶
RawDataBundle
Output¶
ResolvedHierarchyBundle with:
- exposures: Unified LazyFrame (loans + contingents + facility undrawn)
- counterparty_lookup: Enriched counterparties with hierarchy metadata and inherited ratings
- collateral, guarantees, provisions: FX-converted CRM data
- lending_group_totals: Aggregated group exposure for retail threshold testing
- hierarchy_errors: Accumulated non-blocking errors
Processing Steps¶
Step 1: Build counterparty hierarchy lookup
├── _build_ultimate_parent_lazy() → traverse org_mappings (up to 10 levels)
├── _build_rating_inheritance_lazy() → inherit ratings (own → parent → unrated)
└── _enrich_counterparties_with_hierarchy() → add hierarchy metadata
Step 2: Unify exposures
├── Standardise loans → exposure_type = "loan"
├── Standardise contingents → exposure_type = "contingent"
├── _build_facility_root_lookup() → traverse facility-to-facility hierarchies
├── _calculate_facility_undrawn() → limit - sum(descendant drawn amounts)
└── pl.concat(all exposure types) → single unified LazyFrame
Step 2a: FX conversion → convert all monetary values to base currency
Step 2b: Add collateral LTV → direct → facility → counterparty priority
Step 3: Residential property coverage
└── Separate residential vs all-property collateral for threshold exclusion
Step 4: Lending group totals
└── Aggregate by group, exclude residential from retail threshold (CRR Art. 123(c))
Step 5: Add lending group totals to exposures
Counterparty Hierarchy¶
The hierarchy resolution uses iterative Polars LazyFrame joins for performance. See hierarchy.py:258-327 for the full implementation.
Build ultimate parent mapping using eager graph traversal.
Collects the small edge data eagerly, resolves the full graph via dict traversal, and returns the result as a LazyFrame for downstream joins.
Returns LazyFrame with columns: - counterparty_reference: The entity - ultimate_parent_reference: Its ultimate parent - hierarchy_depth: Number of levels traversed
Actual Implementation (hierarchy.py)
child_col="child_counterparty_reference",
parent_col="parent_counterparty_reference",
max_depth=max_depth,
)
return resolved.rename(
{
"entity": "counterparty_reference",
"root": "ultimate_parent_reference",
"depth": "hierarchy_depth",
}
).lazy()
def _build_rating_inheritance_lazy(
self,
counterparties: pl.LazyFrame,
ratings: pl.LazyFrame,
ultimate_parents: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Build rating lookup with dual per-type resolution and inheritance.
Resolves the best internal and best external rating separately per
counterparty, then inherits each type independently from the ultimate
parent when the entity has no own rating of that type.
Returns LazyFrame with columns:
- counterparty_reference: The entity
- internal_pd: Best internal PD (own or inherited from parent)
- internal_model_id: Model ID for the internal rating
- external_cqs: Best external CQS (own or inherited from parent)
- cqs: Alias of external_cqs
- pd: Alias of internal_pd
"""
sort_cols = ["rating_date", "rating_reference"]
# Ensure model_id column exists on ratings (may be absent in legacy data)
if "model_id" not in ratings.collect_schema().names():
ratings = ratings.with_columns(pl.lit(None).cast(pl.String).alias("model_id"))
# Best internal rating per counterparty (no CQS — that's external only)
best_internal = (
ratings.filter(pl.col("rating_type") == "internal")
.sort(sort_cols, descending=[True, True])
.group_by("counterparty_reference")
.first()
.select(
[
pl.col("counterparty_reference").alias("_int_cp"),
pl.col("pd").alias("internal_pd"),
pl.col("model_id").alias("internal_model_id"),
]
)
)
# Best external rating per counterparty
best_external = (
ratings.filter(pl.col("rating_type") == "external")
.sort(sort_cols, descending=[True, True])
.group_by("counterparty_reference")
.first()
.select(
[
pl.col("counterparty_reference").alias("_ext_cp"),
pl.col("cqs").alias("external_cqs"),
]
)
)
# Materialise the per-counterparty best-rating aggregates before joining.
Rating Inheritance¶
Ratings are inherited from parent entities when not directly available. See hierarchy.py:329-431.
The inheritance priority is: 1. Entity's own rating 2. Ultimate parent's rating 3. Mark as unrated
Actual Implementation (hierarchy.py)
# Polars re-evaluates the filter→sort→group_by chain per reference.
best_int_df, best_ext_df = pl.collect_all([best_internal, best_external])
best_internal = best_int_df.lazy()
best_external = best_ext_df.lazy()
# Start with all counterparties, join own ratings per type
result = counterparties.select("counterparty_reference")
result = result.join(
best_internal, left_on="counterparty_reference", right_on="_int_cp", how="left"
)
result = result.join(
best_external, left_on="counterparty_reference", right_on="_ext_cp", how="left"
)
# Join with ultimate parents for inheritance
result = result.join(
ultimate_parents.select(
[
pl.col("counterparty_reference").alias("_cp"),
pl.col("ultimate_parent_reference"),
]
),
left_on="counterparty_reference",
right_on="_cp",
how="left",
)
# Parent's best internal
parent_internal = best_internal.select(
[
pl.col("_int_cp").alias("_p_int_cp"),
pl.col("internal_pd").alias("parent_internal_pd"),
pl.col("internal_model_id").alias("parent_internal_model_id"),
]
)
result = result.join(
parent_internal,
left_on="ultimate_parent_reference",
right_on="_p_int_cp",
how="left",
)
# Parent's best external
parent_external = best_external.select(
[
pl.col("_ext_cp").alias("_p_ext_cp"),
pl.col("external_cqs").alias("parent_external_cqs"),
]
)
result = result.join(
parent_external,
left_on="ultimate_parent_reference",
right_on="_p_ext_cp",
how="left",
)
# Per-type inheritance: coalesce own → parent for each type
result = result.with_columns(
[
pl.coalesce(pl.col("internal_pd"), pl.col("parent_internal_pd")).alias(
"internal_pd"
),
pl.coalesce(pl.col("internal_model_id"), pl.col("parent_internal_model_id")).alias(
"internal_model_id"
),
pl.coalesce(pl.col("external_cqs"), pl.col("parent_external_cqs")).alias(
"external_cqs"
),
]
)
# Derive convenience aliases
result = result.with_columns(
[
pl.col("external_cqs").alias("cqs"),
pl.col("internal_pd").alias("pd"),
]
)
return result.select(
[
"counterparty_reference",
"internal_pd",
"internal_model_id",
"external_cqs",
"cqs",
"pd",
]
)
def _build_facility_root_lookup(
self,
facility_mappings: pl.LazyFrame,
max_depth: int = 10,
) -> pl.LazyFrame:
"""
Build root facility lookup using eager graph traversal.
Collects the small facility edge data eagerly, resolves the full graph
via dict traversal, and returns the result as a LazyFrame.
Args:
facility_mappings: Facility mappings with parent_facility_reference,
Facility Hierarchy¶
Facilities can form multi-level hierarchies (e.g., master facility → sub-facilities). The resolver traverses these using the same iterative join pattern as counterparty hierarchies. See hierarchy.py:433-537.
Key behaviour:
- Facility-to-facility edges identified from facility_mappings where child_type = "facility"
- Traverses up to 10 levels to find the root facility for each sub-facility
- Drawn amounts from all descendant loans are aggregated to the root facility
- Sub-facilities are excluded from producing their own undrawn exposure records
- Only root/standalone facilities with undrawn_amount > 0 generate facility_undrawn exposures
Actual Implementation (hierarchy.py)
max_depth: Maximum hierarchy depth to traverse
Returns:
LazyFrame with columns:
- child_facility_reference: The sub-facility
- root_facility_reference: Its ultimate root facility
- facility_hierarchy_depth: Number of levels traversed
"""
empty_result = pl.LazyFrame(
schema={
"child_facility_reference": pl.String,
"root_facility_reference": pl.String,
"facility_hierarchy_depth": pl.Int32,
}
)
type_col = _detect_type_column(set(facility_mappings.collect_schema().names()))
if type_col is None:
return empty_result
# Filter to facility→facility relationships and collect (small data)
facility_edges = (
facility_mappings.filter(
pl.col(type_col).fill_null("").str.to_lowercase() == "facility"
)
.select(
[
pl.col("child_reference").alias("child_facility_reference"),
pl.col("parent_facility_reference"),
]
)
.unique()
.collect()
)
if facility_edges.height == 0:
return empty_result
resolved = _resolve_graph_eager(
facility_edges,
child_col="child_facility_reference",
parent_col="parent_facility_reference",
max_depth=max_depth,
)
return resolved.rename(
{
"entity": "child_facility_reference",
"root": "root_facility_reference",
"depth": "facility_hierarchy_depth",
}
).lazy()
def _enrich_counterparties_with_hierarchy(
self,
counterparties: pl.LazyFrame,
org_mappings: pl.LazyFrame,
ratings: pl.LazyFrame,
ultimate_parents: pl.LazyFrame,
rating_inheritance: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Enrich counterparties with hierarchy and rating information.
Adds columns:
- counterparty_has_parent: bool
- parent_counterparty_reference: str | null
- ultimate_parent_reference: str | null
- counterparty_hierarchy_depth: int
- cqs, pd, internal_pd, external_cqs, internal_model_id: from ratings
"""
# Join with org_mappings to get parent
enriched = counterparties.join(
org_mappings.select(
[
pl.col("child_counterparty_reference"),
pl.col("parent_counterparty_reference"),
]
),
left_on="counterparty_reference",
right_on="child_counterparty_reference",
how="left",
)
# Join with ultimate parents and rating inheritance in sequence,
# then derive flags in a single with_columns batch.
enriched = (
enriched.join(
ultimate_parents.select(
[
pl.col("counterparty_reference").alias("_up_cp"),
pl.col("ultimate_parent_reference"),
pl.col("hierarchy_depth").alias("counterparty_hierarchy_depth"),
]
),
left_on="counterparty_reference",
right_on="_up_cp",
how="left",
)
.join(
rating_inheritance.select(
[
pl.col("counterparty_reference").alias("_ri_cp"),
pl.col("cqs"),
pl.col("pd"),
Stage 3: Classification¶
Purpose¶
Assign exposure classes and determine calculation approach.
Input¶
ResolvedHierarchyBundle
Output¶
ClassifiedExposuresBundle with:
- exposure_class: Regulatory exposure class
- approach: SA, F-IRB, A-IRB, or Slotting
- Grouped exposures by approach
Classification Logic¶
The classifier assigns exposure classes and calculation approaches. See classifier.py:195-244 for the core classification logic.
rwa_calc.engine.classifier.ExposureClassifier
¶
Classify exposures by exposure class and approach.
Implements ClassifierProtocol for: - Mapping counterparty types to exposure classes - Checking SME criteria (turnover thresholds) - Checking retail criteria (aggregate exposure thresholds) - Determining IRB eligibility based on permissions - Identifying specialised lending for slotting - Splitting exposures by calculation approach
All operations use Polars LazyFrames for deferred execution. The classifier batches expressions into 4 .with_columns() calls to keep the query plan shallow (5 nodes instead of 21).
classify(data, config)
¶
Classify exposures and split by approach.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
ResolvedHierarchyBundle
|
Hierarchy-resolved data from HierarchyResolver |
required |
config
|
CalculationConfig
|
Calculation configuration |
required |
Returns:
| Type | Description |
|---|---|
ClassifiedExposuresBundle
|
ClassifiedExposuresBundle with exposures split by approach |
Classification Priority Order
- Central Govt / Central Bank: Government entities, central banks
- RGLA: Regional governments and local authorities
- PSE: Public sector entities
- MDB: Multilateral development banks
- Institution: Banks, regulated financial institutions, CCPs
- Retail: Individuals, small businesses meeting retail criteria
- Corporate: Non-financial corporates
- Specialised Lending: Project finance, object finance, etc.
Actual Implementation (classifier.py)
pl.col("approach").is_in([ApproachType.FIRB.value, ApproachType.AIRB.value])
)
slotting_exposures = classified.filter(pl.col("approach") == ApproachType.SLOTTING.value)
classification_audit = self._build_audit_trail(classified)
return ClassifiedExposuresBundle(
all_exposures=classified,
sa_exposures=sa_exposures,
irb_exposures=irb_exposures,
slotting_exposures=slotting_exposures,
equity_exposures=data.equity_exposures,
ciu_holdings=data.ciu_holdings,
collateral=data.collateral,
guarantees=data.guarantees,
provisions=data.provisions,
counterparty_lookup=data.counterparty_lookup,
classification_audit=classification_audit,
classification_errors=[],
)
# =========================================================================
# Phase 1: Counterparty join (retained unchanged)
# =========================================================================
def _add_counterparty_attributes(
self,
exposures: pl.LazyFrame,
counterparties: pl.LazyFrame,
) -> pl.LazyFrame:
"""
Add counterparty attributes needed for classification.
Joins exposures with counterparty data to get:
- entity_type (single source of truth for exposure class)
- annual_revenue (for SME check)
- total_assets (for large financial sector entity threshold)
- default_status
- country_code
- apply_fi_scalar (for FI scalar - LFSE/unregulated FSE)
- is_managed_as_retail (for SME retail treatment)
"""
cp_schema = counterparties.collect_schema()
cp_col_names = cp_schema.names()
select_cols = [
pl.col("counterparty_reference"),
pl.col("entity_type").str.to_lowercase().alias("cp_entity_type"),
pl.col("country_code").alias("cp_country_code"),
pl.col("annual_revenue").alias("cp_annual_revenue"),
pl.col("total_assets").alias("cp_total_assets"),
Stage 4: CRM Processing¶
Purpose¶
Apply credit risk mitigation: collateral, guarantees, provisions.
Input¶
ClassifiedExposuresBundle plus CRM data
Output¶
CRMAdjustedBundle with:
- Adjusted EAD values
- Applied haircuts
- Substituted risk weights (guarantees)
- Provision adjustments
Processing Order¶
Provisions are resolved before CCF application so that the nominal amount is adjusted before credit conversion. The full CRM pipeline order is:
- Resolve provisions — drawn-first deduction (SA only); IRB tracks but does not deduct
- Apply CCFs — uses
nominal_after_provisionfor off-balance sheet conversion - Initialize EAD waterfall — set
ead_pre_crmfrom drawn + interest + CCF contribution - Apply collateral with haircuts and overcollateralisation
- Apply guarantees (substitution approach, cross-approach CCF substitution)
- Finalize EAD — floor at zero; provisions already baked into
ead_pre_crm
class CRMProcessor:
def get_crm_adjusted_bundle(
self,
data: ClassifiedExposuresBundle,
config: CalculationConfig,
) -> CRMAdjustedBundle:
"""Apply CRM in correct order (Art. 111(2) compliant).
Returns CRMAdjustedBundle with exposures split by approach."""
# Step 1: Resolve provisions (before CCF)
# SA: drawn-first deduction, remainder reduces nominal
# IRB/Slotting: tracked but not deducted from EAD
after_provisions = self._resolve_provisions(exposures, provisions, config)
# Step 2: Apply CCFs (uses nominal_after_provision)
after_ccf = self._apply_ccf(after_provisions, config)
# Step 3: Initialize EAD waterfall — includes collect barrier
# (flattens deep lazy plan to prevent 3× re-evaluation downstream)
after_init = self._initialize_ead(after_ccf)
# Step 4: Collateral (3 lookup collects: direct/facility/counterparty)
after_collateral = self._apply_collateral(after_init, collateral, config)
# Step 5: Guarantees (cross-approach CCF substitution)
after_guarantees = self._apply_guarantees(
after_collateral, guarantees, counterparty_lookup, config
)
# Step 6: Finalize (no provision subtraction — already in ead_pre_crm)
return self._finalize_ead(after_guarantees)
def get_crm_unified_bundle(
self,
data: ClassifiedExposuresBundle,
config: CalculationConfig,
) -> CRMAdjustedBundle:
"""Unified CRM — does not split by approach.
Used for Basel 3.1 output floor: SA-equivalent RW needed on all rows."""
Stage 5: RWA Calculation¶
Purpose¶
Calculate RWA using appropriate approach for each exposure.
SA Calculator¶
class SACalculator:
def calculate(
self,
exposures: pl.LazyFrame,
config: CalculationConfig
) -> SAResultBundle:
"""Calculate SA RWA."""
result = (
exposures
.with_columns(
risk_weight=self._lookup_risk_weight(
pl.col("exposure_class"),
pl.col("cqs")
)
)
.with_columns(
rwa=pl.col("ead") * pl.col("risk_weight")
)
)
# Apply supporting factors (CRR only)
if config.framework == RegulatoryFramework.CRR:
result = self._apply_supporting_factors(result, config)
return SAResultBundle(data=result)
IRB Calculator¶
class IRBCalculator:
def calculate(
self,
exposures: pl.LazyFrame,
config: CalculationConfig
) -> IRBResultBundle:
"""Calculate IRB RWA."""
result = (
exposures
.with_columns(
# Apply PD floor
pd_floored=pl.max_horizontal(
pl.col("pd"),
pl.lit(config.pd_floors.get_floor(pl.col("exposure_class")))
)
)
.with_columns(
# Calculate correlation
correlation=self._calculate_correlation(
pl.col("exposure_class"),
pl.col("pd_floored"),
pl.col("turnover")
),
# Calculate maturity adjustment
ma=self._calculate_maturity_adjustment(
pl.col("pd_floored"),
pl.col("effective_maturity")
)
)
.with_columns(
# Calculate K
k=self._calculate_k(
pl.col("pd_floored"),
pl.col("lgd"),
pl.col("correlation")
)
)
.with_columns(
# Calculate RWA
rwa=pl.col("k") * 12.5 * pl.col("ead") * pl.col("ma") *
config.scaling_factor
)
)
return IRBResultBundle(data=result)
Slotting Calculator¶
class SlottingCalculator:
def calculate(
self,
exposures: pl.LazyFrame,
config: CalculationConfig
) -> SlottingResultBundle:
"""Calculate Slotting RWA."""
result = (
exposures
.with_columns(
risk_weight=self._lookup_slotting_weight(
pl.col("lending_type"),
pl.col("slotting_category"),
config.framework
)
)
.with_columns(
rwa=pl.col("ead") * pl.col("risk_weight")
)
)
return SlottingResultBundle(data=result)
Stage 6: Aggregation¶
Purpose¶
Combine results, apply output floor, produce final output.
Input¶
Result bundles from all calculators
Output¶
AggregatedResultBundle with:
- Total RWA
- RWA by approach
- RWA by exposure class
- Floor impact (Basel 3.1)
- Detailed breakdown
Output Floor (Basel 3.1)¶
def apply_output_floor(
irb_rwa: pl.LazyFrame,
sa_rwa: pl.LazyFrame,
floor_percentage: float
) -> pl.LazyFrame:
"""Apply output floor to IRB results."""
return (
irb_rwa
.join(sa_rwa, on="exposure_reference", suffix="_sa")
.with_columns(
floor=pl.col("rwa_sa") * floor_percentage,
rwa_floored=pl.max_horizontal(
pl.col("rwa"),
pl.col("rwa_sa") * floor_percentage
)
)
)
Error Handling¶
Each stage accumulates errors:
@dataclass
class LazyFrameResult:
frame: pl.LazyFrame
errors: list[CalculationError] = field(default_factory=list)
# Pipeline accumulates across stages
all_errors = []
all_errors.extend(loader_result.errors)
all_errors.extend(hierarchy_result.errors)
# ... etc.
Next Steps¶
- Data Flow - Detailed data flow documentation
- Component Overview - Individual component details
- API Reference - Pipeline API documentation