Pipeline Collect Barriers — Full Analysis¶
Current State¶
The pipeline has .collect() calls across several engine files. The hot-path collects (every calculation run) are concentrated in CRM processing and the pipeline's split-once + collect_all pattern. The pipeline cannot currently run as a single LazyFrame operation due to three categories of blockers.
Category 1: Polars Optimizer Limitations (Hardest to Fix)¶
These are platform-level constraints in Polars itself:
| Location | Barrier | Root Cause |
|---|---|---|
pipeline.py:631 |
Pre-branch materialisation | CRM output is a deep plan tree. Without collecting, collect_all re-optimizes it 3× (once per SA/IRB/Slotting branch). |
pipeline.py:665 |
collect_all() for 3 branches |
Must use CPU engine (not streaming) because streaming doesn't support CSE. Without CSE, each branch re-executes the full CRM plan (~9× slower). |
crm/processor.py:379 |
Post-init_ead materialisation (fan-out path) | Flattens deep plan to prevent re-evaluation when 3 lookup collects reference the upstream. |
crm/processor.py:424 |
Post-init_ead materialisation (unified path) | Same as above but for Basel 3.1 output floor path (all rows, no approach split). |
crm/processor.py:480 |
Pre-collateral materialisation (unified path) | Without this, the 3 downstream lookup collects each re-execute provisions → CCF → init_ead (4× total). |
crm/processor.py:816-818 |
3 lookup table collects | Each lookup is referenced in 5+ downstream joins. Without materialisation, the group_by().agg() expressions re-evaluate at each reference. |
Why they exist: Polars' lazy engine lacks robust CSE (Common Subexpression Elimination) for deep plan trees. When the same LazyFrame is referenced by multiple downstream consumers (fan-out pattern), the optimizer either re-executes the shared upstream per consumer or segfaults on very deep plans.
What would fix them upstream: Polars would need:
- Reliable CSE that handles deep plan trees without re-optimization per branch
- Streaming engine CSE support
- Optimizer stability with very deep plans (no segfaults)
Category 2: Graph Traversal Algorithm (Design Choice)¶
| Location | Barrier | Root Cause |
|---|---|---|
hierarchy.py:265 |
Ultimate parent resolution | Collects edge data into Python dict for iterative graph walk (cycle detection, depth tracking) |
hierarchy.py:473 |
Facility root lookup | Same pattern — facility hierarchy edges collected for dict traversal |
Why they exist: The hierarchy resolver needs to walk parent→child chains of arbitrary depth to find ultimate roots (counterparty and facility). This is inherently an iterative/recursive algorithm that Polars expressions can't express natively.
Data size: Small (unique org/facility edges — typically <1,000 rows). Performance impact is negligible.
Alternatives:
- Iterative self-join — Polars
joinin a Python loop up tomax_depthtimes. Stays "lazier" but still requires a fixed iteration count and builds a very wide plan. - Accept as-is — The data is small and early in the pipeline. This is the lowest-impact collect in the entire chain.
Category 3: Validation & Edge Cases (Non-blocking)¶
| Location | Barrier | Impact |
|---|---|---|
contracts/validation.py:574,701 |
Column value validation | Collects invalid rows to build error messages. Not on hot path. |
engine/utils.py (5 calls) |
has_rows() checks |
.head(1).collect() — minimal, checking if optional data exists. |
sa/calculator.py:884 |
Single-scenario mode | Not vectorized path — only for one-off testing. |
irb/formulas.py:560 |
Scalar formula wrapper | 1-row collect for scalar IRB calculations. |
These are all either off the hot path or trivially small.
Architecture Diagram — Where Laziness Breaks¶
LAZY ──────────────────────────────────────────────────────────
Loader (scan_parquet)
│
HierarchyResolver
│ ├── COLLECT: graph edges (~1K rows) ← Category 2
│ └── all joins/enrichments stay lazy
│
Classifier (fully lazy, schema checks only)
│
CRM: provisions → CCF → init_ead
│
EAGER ─── COLLECT #1: post-init_ead flatten ── Category 1 ────
│
LAZY ──────────────────────────────────────────────────────────
CRM: build 3 lookup tables (group_by)
│
EAGER ─── COLLECT #2: 3 small lookups ──── Category 1 ────────
│
LAZY ──────────────────────────────────────────────────────────
CRM: collateral allocation, guarantees, finalize_ead
│ (no final CRM collect — plan tree is shallow post-collateral)
│
Pipeline: _run_single_pass()
│
EAGER ─── COLLECT #3: pre-branch flatten ── Category 1 ───────
│ (pipeline.py:631 — materialise CRM output before split)
│
LAZY ──────────────────────────────────────────────────────────
SA calculate_unified() (Basel 3.1 only — SA-equiv RW on all rows)
Split once by approach
├── SA calculator (lazy)
├── IRB calculator (lazy)
└── Slotting calculator (lazy)
│
EAGER ─── COLLECT #4: collect_all(3 branches) ─────────────────
│ (pipeline.py:665 — CPU engine, not streaming)
│
Aggregator (re-lazify for summaries, then final output)
Path to Fewer Collects¶
| Priority | Action | Collects Removed | Difficulty | Dependency |
|---|---|---|---|---|
| 1 | Wait for Polars CSE improvements | Up to 5 (CRM + pipeline) | None (upstream) | Polars roadmap — CSE for deep plans and streaming |
| 3 | Flatten CRM plan tree — restructure provisions/CCF/collateral to produce a shallower plan | Potentially 1-2 | High | Requires significant CRM refactor; risk of correctness regressions |
| 4 | Move lookup collects into the CRM plan — if Polars adds cache() / explicit CSE hints |
3 (lookups) | Low (if available) | Polars LazyFrame.cache() API (proposed but not yet stable) |
| 5 | Single-collect architecture — eliminate pre-branch collect by moving to a single collect_all with CSE |
1 (pipeline:622) | Low-Medium | Requires Polars optimizer to handle deep plans without segfault |
Out-of-Core Support (Implemented)¶
All hot-path collects now go through engine/materialise.py, which selects strategy based on config.collect_engine:
| Engine | materialise_barrier() |
materialise_branches() |
|---|---|---|
"cpu" |
.collect().lazy() (original behavior) |
pl.collect_all() with CSE |
"streaming" (default) |
sink_parquet → scan_parquet (disk spill) |
Sink each branch sequentially → read back |
Streaming mode caps peak memory to approximately one column-batch at a time by spilling intermediate results to temp parquet files. This enables the pipeline to process datasets larger than available RAM.
Config options:
- collect_engine: "streaming" (default) — disk-spill for out-of-core support
- collect_engine: "cpu" — in-memory collect for backward compatibility
- spill_dir: Path | None — directory for temp files (default: system temp)
Fallback: If sink_parquet fails for a particular expression (unsupported in streaming engine), the barrier falls back to in-memory .collect().lazy().
Cleanup: Temp files are cleaned up via cleanup_spill_files() in pipeline.run_with_data()'s finally block, plus an atexit safety net.
Key Takeaway¶
Most hot-path collects exist because of Polars optimizer limitations (deep plan re-execution, no streaming CSE). The remaining 2 are algorithmic (graph traversal on small data). The materialization barriers are now strategy-aware via engine/materialise.py, supporting both in-memory and disk-spill modes. The realistic path forward is:
- Short term (done): Disk-spill materialization via
materialise_barrier/materialise_branchesenables out-of-core processing for any dataset size. - Medium term: Monitor Polars'
LazyFrame.cache()and CSE roadmap. When available, the 3 lookup collects and the pre-branch collect can likely be replaced. - Long term: A single
collect_all()at the output boundary becomes feasible only when Polars can handle deep plan trees with fan-out without re-execution or segfaults.