Skip to content

log_stream

log_stream

SSE log broadcasting for real-time pipeline log streaming.

LogBroadcaster()

Captures loguru messages and streams them to SSE subscribers.

Source code in pspcz_analyzer/admin/log_stream.py
def __init__(self) -> None:
    self._buffer: deque[str] = deque(maxlen=_MAX_BUFFER)
    self._subscribers: list[asyncio.Queue[str]] = []
    self._sink_id: int | None = None

start()

Install loguru sink to capture log lines.

Source code in pspcz_analyzer/admin/log_stream.py
def start(self) -> None:
    """Install loguru sink to capture log lines."""
    if self._sink_id is not None:
        return
    self._sink_id = logger.add(
        self._handle_log,
        format="{time:HH:mm:ss} | {level: <8} | {message}",
        level="INFO",
        filter=lambda record: any(
            tag in record["message"]
            for tag in (
                "[tisk pipeline]",
                "[amendment pipeline]",
                "[daily-refresh]",
                "[pipeline-lock]",
                "[runtime-config]",
                "[file-watcher]",
            )
        ),
    )
    logger.info("[log-stream] Broadcaster started")

stop()

Remove loguru sink.

Source code in pspcz_analyzer/admin/log_stream.py
def stop(self) -> None:
    """Remove loguru sink."""
    if self._sink_id is not None:
        logger.remove(self._sink_id)
        self._sink_id = None

subscribe() async

Yield log lines as SSE events wrapped in divs. Sends buffered history first.

Source code in pspcz_analyzer/admin/log_stream.py
async def subscribe(self) -> AsyncGenerator[str, None]:
    """Yield log lines as SSE events wrapped in divs. Sends buffered history first."""
    q: asyncio.Queue[str] = asyncio.Queue(maxsize=200)
    self._subscribers.append(q)
    try:
        # Send buffered history
        for line in self._buffer:
            escaped = html.escape(line)
            yield f'data: <div class="log-line">{escaped}</div>\n\n'
        # Stream new lines
        while True:
            line = await q.get()
            escaped = html.escape(line)
            yield f'data: <div class="log-line">{escaped}</div>\n\n'
    finally:
        if q in self._subscribers:
            self._subscribers.remove(q)