Skip to content

pipeline_lock

pipeline_lock

Single-pipeline-at-a-time lock for protecting local LLM resources.

PipelineInfo(pipeline_id, pipeline_type, period) dataclass

Metadata about the currently running pipeline.

PipelineLock()

Ensures only one pipeline runs at a time across the backend.

Usage::

lock = PipelineLock()
if not await lock.acquire("tisk", 10):
    raise RuntimeError("Another pipeline is running")
try:
    ...  # run pipeline
finally:
    lock.release()
Source code in pspcz_analyzer/services/pipeline_lock.py
def __init__(self) -> None:
    self._lock = asyncio.Lock()
    self._current: PipelineInfo | None = None

is_locked property

Whether a pipeline is currently running.

current property

Info about the currently running pipeline, or None.

acquire(pipeline_type, period) async

Try to acquire the lock for a pipeline.

Parameters:

Name Type Description Default
pipeline_type str

Type of pipeline (e.g. "tisk", "amendment").

required
period int

Electoral period number.

required

Returns:

Type Description
bool

True if lock acquired, False if another pipeline is running.

Source code in pspcz_analyzer/services/pipeline_lock.py
async def acquire(self, pipeline_type: str, period: int) -> bool:
    """Try to acquire the lock for a pipeline.

    Args:
        pipeline_type: Type of pipeline (e.g. "tisk", "amendment").
        period: Electoral period number.

    Returns:
        True if lock acquired, False if another pipeline is running.
    """
    if self._lock.locked():
        logger.warning(
            "[pipeline-lock] Rejected {}/{}: already running {}",
            pipeline_type,
            period,
            self._current,
        )
        return False

    await self._lock.acquire()
    pipeline_id = f"{pipeline_type}:{period}"
    self._current = PipelineInfo(
        pipeline_id=pipeline_id,
        pipeline_type=pipeline_type,
        period=period,
    )
    logger.info("[pipeline-lock] Acquired: {}", pipeline_id)
    return True

release()

Release the pipeline lock.

Source code in pspcz_analyzer/services/pipeline_lock.py
def release(self) -> None:
    """Release the pipeline lock."""
    if not self._lock.locked():
        return
    pipeline_id = self._current.pipeline_id if self._current else "unknown"
    self._current = None
    self._lock.release()
    logger.info("[pipeline-lock] Released: {}", pipeline_id)