Pipeline API¶
The pipeline module provides the main entry points for RWA calculation. See pipeline.py for the full implementation.
Module: rwa_calc.engine.pipeline¶
create_pipeline¶
Factory function to create a pipeline with default components:
Create a pipeline orchestrator with default components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_path
|
str | Path | None
|
Path to data directory (creates ParquetLoader) |
None
|
loader
|
LoaderProtocol | None
|
Pre-configured loader (overrides data_path) |
None
|
Returns:
| Type | Description |
|---|---|
PipelineOrchestrator
|
PipelineOrchestrator ready for use |
PipelineOrchestrator¶
The main pipeline class (note: the class is PipelineOrchestrator, not RWAPipeline):
rwa_calc.engine.pipeline.PipelineOrchestrator
¶
Orchestrate the complete RWA calculation pipeline.
Implements PipelineProtocol for: - Full pipeline execution from data loading to final aggregation - Pre-loaded data execution (bypassing loader) - Component dependency management - Error accumulation across stages
Pipeline stages: 1. Loader: Load raw data from files/databases 2. HierarchyResolver: Resolve counterparty and facility hierarchies 3. Classifier: Classify exposures and assign approaches 4. CRMProcessor: Apply credit risk mitigation 5. SACalculator: Calculate SA RWA 6. IRBCalculator: Calculate IRB RWA 7. SlottingCalculator: Calculate Slotting RWA 8. Aggregation: Combine results, apply floor, generate summaries
Usage
orchestrator = PipelineOrchestrator( loader=ParquetLoader(base_path), hierarchy_resolver=HierarchyResolver(), classifier=ExposureClassifier(), crm_processor=CRMProcessor(), sa_calculator=SACalculator(), irb_calculator=IRBCalculator(), slotting_calculator=SlottingCalculator(), ) result = orchestrator.run(config)
run(config)
¶
Execute the complete RWA calculation pipeline.
Requires a loader to be configured.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
CalculationConfig
|
Calculation configuration |
required |
Returns:
| Type | Description |
|---|---|
AggregatedResultBundle
|
AggregatedResultBundle with all results and audit trail |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no loader is configured |
run_with_data(data, config)
¶
Execute pipeline with pre-loaded data.
Uses single-pass architecture: all approach-specific calculators run sequentially on one unified LazyFrame, avoiding plan tree duplication and mid-pipeline materialisation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
RawDataBundle
|
Pre-loaded raw data bundle |
required |
config
|
CalculationConfig
|
Calculation configuration |
required |
Returns:
| Type | Description |
|---|---|
AggregatedResultBundle
|
AggregatedResultBundle with all results and audit trail |
Pipeline Implementation (pipeline.py)
stage: str
error_type: str
message: str
context: dict = field(default_factory=dict)
# =============================================================================
# Pipeline Orchestrator Implementation
# =============================================================================
class PipelineOrchestrator:
"""
Orchestrate the complete RWA calculation pipeline.
Implements PipelineProtocol for:
- Full pipeline execution from data loading to final aggregation
- Pre-loaded data execution (bypassing loader)
- Component dependency management
- Error accumulation across stages
Pipeline stages:
1. Loader: Load raw data from files/databases
2. HierarchyResolver: Resolve counterparty and facility hierarchies
3. Classifier: Classify exposures and assign approaches
4. CRMProcessor: Apply credit risk mitigation
5. SACalculator: Calculate SA RWA
6. IRBCalculator: Calculate IRB RWA
7. SlottingCalculator: Calculate Slotting RWA
8. Aggregation: Combine results, apply floor, generate summaries
Usage:
orchestrator = PipelineOrchestrator(
loader=ParquetLoader(base_path),
hierarchy_resolver=HierarchyResolver(),
classifier=ExposureClassifier(),
crm_processor=CRMProcessor(),
sa_calculator=SACalculator(),
irb_calculator=IRBCalculator(),
slotting_calculator=SlottingCalculator(),
)
result = orchestrator.run(config)
"""
def __init__(
self,
loader: LoaderProtocol | None = None,
hierarchy_resolver: HierarchyResolverProtocol | None = None,
classifier: ClassifierProtocol | None = None,
crm_processor: CRMProcessorProtocol | None = None,
sa_calculator: SACalculatorProtocol | None = None,
irb_calculator: IRBCalculatorProtocol | None = None,
slotting_calculator: SlottingCalculatorProtocol | None = None,
equity_calculator: EquityCalculatorProtocol | None = None,
output_aggregator: OutputAggregatorProtocol | None = None,
) -> None:
"""
Initialize pipeline with components.
Components can be injected for testing or customization.
If not provided, defaults will be created on first use.
Args:
loader: Data loader (optional - required for run())
hierarchy_resolver: Hierarchy resolver
classifier: Exposure classifier
crm_processor: CRM processor
sa_calculator: SA calculator
irb_calculator: IRB calculator
slotting_calculator: Slotting calculator
equity_calculator: Equity calculator
output_aggregator: Output aggregator
"""
self._loader = loader
self._hierarchy_resolver = hierarchy_resolver
self._classifier = classifier
self._crm_processor = crm_processor
self._sa_calculator = sa_calculator
self._irb_calculator = irb_calculator
self._slotting_calculator = slotting_calculator
self._equity_calculator = equity_calculator
self._output_aggregator = output_aggregator
self._errors: list[PipelineError] = []
# =========================================================================
# Public API
# =========================================================================
def run(self, config: CalculationConfig) -> AggregatedResultBundle:
"""
Execute the complete RWA calculation pipeline.
Requires a loader to be configured.
Args:
config: Calculation configuration
Returns:
AggregatedResultBundle with all results and audit trail
Raises:
Usage Examples¶
Basic Usage¶
from datetime import date
from rwa_calc.engine.pipeline import create_pipeline
from rwa_calc.contracts.config import CalculationConfig
# Create pipeline
pipeline = create_pipeline()
# Configure for CRR
config = CalculationConfig.crr(
reporting_date=date(2026, 12, 31),
)
# Run calculation
result = pipeline.run(config)
# Access results
print(f"Total RWA: {result.total_rwa:,.0f}")
print(f"SA RWA: {result.sa_rwa:,.0f}")
print(f"IRB RWA: {result.irb_rwa:,.0f}")
With Custom Data Path¶
from pathlib import Path
from rwa_calc.engine.loader import ParquetLoader
# Load data from custom path
loader = ParquetLoader()
raw_data = loader.load(Path("/path/to/data"))
# Run with pre-loaded data
result = pipeline.run_with_data(raw_data, config)
Framework Comparison¶
# Run under both frameworks
config_crr = CalculationConfig.crr(date(2026, 12, 31))
config_b31 = CalculationConfig.basel_3_1(date(2027, 1, 1))
result_crr = pipeline.run(config_crr)
result_b31 = pipeline.run(config_b31)
# Compare
impact = (result_b31.total_rwa / result_crr.total_rwa - 1) * 100
print(f"Basel 3.1 impact: {impact:+.1f}%")
Error Handling¶
result = pipeline.run(config)
# Check for errors
if result.has_errors:
print("Calculation completed with errors:")
for error in result.errors:
print(f" - {error.exposure_id}: {error.message}")
# Check for warnings
if result.has_warnings:
print("Warnings:")
for warning in result.warnings:
print(f" - {warning.message}")