pipeline
pipeline
¶
Amendment pipeline orchestrator.
Coordinates the full amendment analysis workflow
IDENTIFY → PDF_DOWNLOAD_PARSE → STENO_DOWNLOAD_PARSE → MERGE → RESOLVE_IDS → RESOLVE_SUBMITTERS → LLM_SUMMARIZE → CACHE
PDF text is the primary source for amendment structure (letters, submitter names, per-amendment text). Steno records provide vote linkage (vote numbers, results, stances). The MERGE stage combines both data sources.
Follows the same async pattern as TiskPipelineService.
AmendmentPipelineService(cache_dir, _progress=dict(), _tasks=dict())
dataclass
¶
Orchestrates the amendment analysis pipeline for all periods.
Follows the same pattern as TiskPipelineService: - Background async tasks - Per-period progress tracking - Cancellation support
Attributes:
| Name | Type | Description |
|---|---|---|
cache_dir |
Path
|
Base cache directory. |
_progress |
dict[int, AmendmentProgress]
|
Per-period progress tracking. |
_tasks |
dict[int, Task]
|
Per-period asyncio tasks. |
progress
property
¶
Current progress for all periods.
start_period(period, period_data, on_complete=None, on_progress=None, mode=AmendmentMode.FULL)
¶
Start the amendment pipeline for a single period.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
period
|
int
|
Electoral period number. |
required |
period_data
|
PeriodData
|
Loaded period data with tisk_lookup populated. |
required |
on_complete
|
Callable | None
|
Optional callback(period, bills) on completion. |
None
|
on_progress
|
Callable[[int, list[BillAmendmentData]], None] | None
|
Optional callback(period, bills) for incremental UI refresh. |
None
|
mode
|
AmendmentMode
|
Pipeline execution mode. |
FULL
|
Source code in pspcz_analyzer/services/amendments/pipeline.py
is_running(period)
¶
Check if the pipeline is currently running for a period.
get_task(period)
¶
Get the running asyncio.Task for a period, or None if not running.
Source code in pspcz_analyzer/services/amendments/pipeline.py
cancel_period(period)
¶
Cancel the amendment pipeline for a single period.
Each period runs as an independent asyncio.Task, so we can just cancel it directly.
Returns True if a task was found and cancelled.
Source code in pspcz_analyzer/services/amendments/pipeline.py
cancel_all()
¶
Cancel all running pipeline tasks.