Skip to content

pipeline

pipeline

Amendment pipeline orchestrator.

Coordinates the full amendment analysis workflow

IDENTIFY → PDF_DOWNLOAD_PARSE → STENO_DOWNLOAD_PARSE → MERGE → RESOLVE_IDS → RESOLVE_SUBMITTERS → LLM_SUMMARIZE → CACHE

PDF text is the primary source for amendment structure (letters, submitter names, per-amendment text). Steno records provide vote linkage (vote numbers, results, stances). The MERGE stage combines both data sources.

Follows the same async pattern as TiskPipelineService.

AmendmentPipelineService(cache_dir, _progress=dict(), _tasks=dict()) dataclass

Orchestrates the amendment analysis pipeline for all periods.

Follows the same pattern as TiskPipelineService: - Background async tasks - Per-period progress tracking - Cancellation support

Attributes:

Name Type Description
cache_dir Path

Base cache directory.

_progress dict[int, AmendmentProgress]

Per-period progress tracking.

_tasks dict[int, Task]

Per-period asyncio tasks.

progress property

Current progress for all periods.

start_period(period, period_data, on_complete=None, on_progress=None, mode=AmendmentMode.FULL)

Start the amendment pipeline for a single period.

Parameters:

Name Type Description Default
period int

Electoral period number.

required
period_data PeriodData

Loaded period data with tisk_lookup populated.

required
on_complete Callable | None

Optional callback(period, bills) on completion.

None
on_progress Callable[[int, list[BillAmendmentData]], None] | None

Optional callback(period, bills) for incremental UI refresh.

None
mode AmendmentMode

Pipeline execution mode.

FULL
Source code in pspcz_analyzer/services/amendments/pipeline.py
def start_period(
    self,
    period: int,
    period_data: PeriodData,
    on_complete: Callable | None = None,
    on_progress: Callable[[int, list[BillAmendmentData]], None] | None = None,
    mode: AmendmentMode = AmendmentMode.FULL,
) -> None:
    """Start the amendment pipeline for a single period.

    Args:
        period: Electoral period number.
        period_data: Loaded period data with tisk_lookup populated.
        on_complete: Optional callback(period, bills) on completion.
        on_progress: Optional callback(period, bills) for incremental UI refresh.
        mode: Pipeline execution mode.
    """
    if period in self._tasks and not self._tasks[period].done():
        logger.info("[amendment pipeline] Already running for period {}", period)
        return

    prog = AmendmentProgress(status=AmendmentStatus.RUNNING)
    self._progress[period] = prog

    async def _run() -> None:
        try:
            bills = await asyncio.to_thread(
                _run_pipeline_sync,
                period,
                period_data,
                self.cache_dir,
                prog,
                on_progress,
                mode,
            )
            prog.status = AmendmentStatus.COMPLETED
            prog.stage = AmendmentStage.COMPLETED
            if on_complete:
                on_complete(period, bills)
        except asyncio.CancelledError:
            logger.info("[amendment pipeline] Cancelled for period {}", period)
            raise
        except Exception:
            prog.status = AmendmentStatus.FAILED
            prog.stage = AmendmentStage.FAILED
            logger.opt(exception=True).error(
                "[amendment pipeline] Failed for period {}", period
            )

    self._tasks[period] = asyncio.create_task(_run())

is_running(period)

Check if the pipeline is currently running for a period.

Source code in pspcz_analyzer/services/amendments/pipeline.py
def is_running(self, period: int) -> bool:
    """Check if the pipeline is currently running for a period."""
    task = self._tasks.get(period)
    return task is not None and not task.done()

get_task(period)

Get the running asyncio.Task for a period, or None if not running.

Source code in pspcz_analyzer/services/amendments/pipeline.py
def get_task(self, period: int) -> asyncio.Task | None:
    """Get the running asyncio.Task for a period, or None if not running."""
    task = self._tasks.get(period)
    if task is not None and not task.done():
        return task
    return None

cancel_period(period)

Cancel the amendment pipeline for a single period.

Each period runs as an independent asyncio.Task, so we can just cancel it directly.

Returns True if a task was found and cancelled.

Source code in pspcz_analyzer/services/amendments/pipeline.py
def cancel_period(self, period: int) -> bool:
    """Cancel the amendment pipeline for a single period.

    Each period runs as an independent asyncio.Task, so we can just
    cancel it directly.

    Returns True if a task was found and cancelled.
    """
    task = self._tasks.get(period)
    if task is None or task.done():
        return False
    task.cancel()
    logger.info("[amendment pipeline] Cancelling for period {}", period)
    return True

cancel_all()

Cancel all running pipeline tasks.

Source code in pspcz_analyzer/services/amendments/pipeline.py
def cancel_all(self) -> None:
    """Cancel all running pipeline tasks."""
    for period, task in self._tasks.items():
        if not task.done():
            task.cancel()
            logger.info("[amendment pipeline] Cancelling for period {}", period)