Skip to content

API Reference

Auto-generated documentation from source code docstrings.

Scraper

Core

The core scraper handles fetching Reddit JSON data via a stealth browser.

python_reddit_scraper.scraper.core

Camoufox-based Reddit JSON scraper with pagination.

Navigates old.reddit.com JSON API using a stealth Firefox browser, follows pagination tokens, and returns raw post data.

scrape_subreddit(browser, subreddit, max_pages=50, delay=1.5, quiet=False)

Scrape a subreddit's posts via Reddit's old JSON API.

Parameters:

Name Type Description Default
browser Browser

A camoufox Browser instance (from sync context manager).

required
subreddit str

Subreddit name (without r/ prefix).

required
max_pages int

Maximum number of pages to fetch (100 posts per page).

50
delay float

Seconds to wait between page requests.

1.5
quiet bool

If True, suppress all progress output.

False

Returns:

Type Description
list[dict]

List of post data dicts (the 'data' field of each child).

Source code in src/python_reddit_scraper/scraper/core.py
def scrape_subreddit(
    browser: Browser,
    subreddit: str,
    max_pages: int = 50,
    delay: float = 1.5,
    quiet: bool = False,
) -> list[dict]:
    """
    Scrape a subreddit's posts via Reddit's old JSON API.

    Args:
        browser: A camoufox Browser instance (from sync context manager).
        subreddit: Subreddit name (without r/ prefix).
        max_pages: Maximum number of pages to fetch (100 posts per page).
        delay: Seconds to wait between page requests.
        quiet: If True, suppress all progress output.

    Returns:
        List of post data dicts (the 'data' field of each child).
    """
    posts: list[dict] = []
    after: str | None = None
    base_url = f"https://old.reddit.com/r/{subreddit}.json?limit=100&raw_json=1"

    page = browser.new_page()
    page.context.route("**/*", _only_reddit)
    progress = (
        None
        if quiet
        else Progress(
            SpinnerColumn(),
            TextColumn(f"r/{subreddit}"),
            BarColumn(),
            MofNCompleteColumn(),
            TextColumn("pages"),
            TimeElapsedColumn(),
            TextColumn("{task.fields[postfix]}"),
        )
    )
    task_id = None
    if progress is not None:
        progress.start()
        task_id = progress.add_task(f"r/{subreddit}", total=max_pages, postfix="")

    try:
        for page_num in range(max_pages):
            url = base_url
            if after:
                url += f"&after={after}"

            data = _fetch_json_page(page, url, subreddit)
            if data is None:
                break

            children = data.get("data", {}).get("children", [])
            if not children:
                break

            for child in children:
                if "data" in child:
                    posts.append(child["data"])

            if progress is not None and task_id is not None:
                progress.update(task_id, advance=1, postfix=f"{len(posts)} posts")

            after = data.get("data", {}).get("after")
            if after is None:
                break

            if page_num < max_pages - 1:
                time.sleep(delay)
    finally:
        if progress is not None:
            progress.stop()
        page.close()

    return posts

Parallel

Multi-process parallel scraping of multiple subreddits.

python_reddit_scraper.scraper.parallel

Parallel multi-process scraping of multiple subreddits.

scrape_worker(subreddit, max_pages=50, delay=1.5, quiet=False, proxy=None)

Standalone scrape function for use with ProcessPoolExecutor.

Each call creates its own Camoufox browser instance (Playwright sync API is not thread-safe, so each process must have its own browser).

Parameters:

Name Type Description Default
subreddit str

Subreddit name (without r/ prefix).

required
max_pages int

Maximum pages to fetch.

50
delay float

Seconds between page requests.

1.5
quiet bool

Suppress per-subreddit progress output.

False
proxy dict | None

Optional proxy dict with keys server, username, password. When provided, geoip=True is also set so Camoufox auto-matches locale/timezone to the proxy's exit IP.

None

Returns:

Type Description
tuple[str, list[dict]]

Tuple of (subreddit_name, list_of_post_dicts).

Source code in src/python_reddit_scraper/scraper/parallel.py
def scrape_worker(
    subreddit: str,
    max_pages: int = 50,
    delay: float = 1.5,
    quiet: bool = False,
    proxy: dict | None = None,
) -> tuple[str, list[dict]]:
    """
    Standalone scrape function for use with ProcessPoolExecutor.

    Each call creates its own Camoufox browser instance (Playwright sync API
    is not thread-safe, so each process must have its own browser).

    Args:
        subreddit: Subreddit name (without r/ prefix).
        max_pages: Maximum pages to fetch.
        delay: Seconds between page requests.
        quiet: Suppress per-subreddit progress output.
        proxy: Optional proxy dict with keys ``server``, ``username``, ``password``.
               When provided, ``geoip=True`` is also set so Camoufox auto-matches
               locale/timezone to the proxy's exit IP.

    Returns:
        Tuple of (subreddit_name, list_of_post_dicts).
    """
    from camoufox import DefaultAddons
    from camoufox.sync_api import Camoufox

    from python_reddit_scraper.scraper.core import scrape_subreddit

    camoufox_kwargs: dict = {
        "headless": True,
        "exclude_addons": [DefaultAddons.UBO],
    }
    if proxy:
        camoufox_kwargs["proxy"] = proxy
        camoufox_kwargs["geoip"] = True

    with Camoufox(**camoufox_kwargs) as browser:
        posts = scrape_subreddit(browser, subreddit, max_pages=max_pages, delay=delay, quiet=quiet)
    return subreddit, posts

scrape_parallel(subreddits, max_pages=50, delay=1.5, max_workers=4, on_complete=None, progress=None, proxies=None)

Scrape multiple subreddits in parallel using separate processes.

Each process gets its own Camoufox browser instance. Results are returned as a dict keyed by subreddit name. An optional callback is invoked as each subreddit finishes (useful for queueing downloads).

Parameters:

Name Type Description Default
subreddits list[str]

List of subreddit names.

required
max_pages int

Max pages per subreddit.

50
delay float

Seconds between page requests per scraper.

1.5
max_workers int

Maximum concurrent scraper processes.

4
on_complete OnCompleteCallback | None

Optional callback (sub: str, posts: list[dict]) -> None invoked as each subreddit finishes scraping.

None
progress ProgressDisplay | None

Optional shared :class:ProgressDisplay for the scraping bar.

None
proxies list[dict] | None

Optional list of proxy dicts from :func:proxy_handler.fetch_proxies. Assigned round-robin across workers. When None, no proxy is used.

None

Returns:

Type Description
dict[str, list[dict]]

Dict mapping subreddit name to its list of post dicts.

Source code in src/python_reddit_scraper/scraper/parallel.py
def scrape_parallel(
    subreddits: list[str],
    max_pages: int = 50,
    delay: float = 1.5,
    max_workers: int = 4,
    on_complete: OnCompleteCallback | None = None,
    progress: ProgressDisplay | None = None,
    proxies: list[dict] | None = None,
) -> dict[str, list[dict]]:
    """
    Scrape multiple subreddits in parallel using separate processes.

    Each process gets its own Camoufox browser instance. Results are returned
    as a dict keyed by subreddit name. An optional callback is invoked as each
    subreddit finishes (useful for queueing downloads).

    Args:
        subreddits: List of subreddit names.
        max_pages: Max pages per subreddit.
        delay: Seconds between page requests per scraper.
        max_workers: Maximum concurrent scraper processes.
        on_complete: Optional callback ``(sub: str, posts: list[dict]) -> None``
            invoked as each subreddit finishes scraping.
        progress: Optional shared :class:`ProgressDisplay` for the scraping bar.
        proxies: Optional list of proxy dicts from :func:`proxy_handler.fetch_proxies`.
            Assigned round-robin across workers. When ``None``, no proxy is used.

    Returns:
        Dict mapping subreddit name to its list of post dicts.
    """
    results: dict[str, list[dict]] = {}
    n_workers = min(len(subreddits), max_workers)
    quiet = progress is not None

    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        future_to_sub: dict = {}
        for i, sub in enumerate(subreddits):
            proxy = proxies[i % len(proxies)] if proxies else None
            future = executor.submit(scrape_worker, sub, max_pages, delay, quiet=quiet, proxy=proxy)
            future_to_sub[future] = sub
            if progress:
                progress.mark_scrape_started(sub)

        for future in as_completed(future_to_sub):
            sub = future_to_sub[future]
            try:
                _, posts = future.result()
                results[sub] = posts
                logger.info("r/{}: {} posts collected", sub, len(posts))
                if progress:
                    progress.mark_scrape_done(sub)
                if on_complete:
                    on_complete(sub, posts)
            except Exception as exc:
                logger.error("r/{}: scraping failed -- {}", sub, exc)
                results[sub] = []
                if progress:
                    progress.mark_scrape_done(sub)

    return results

JSON I/O

JSON file reading and writing for scraped data.

python_reddit_scraper.scraper.json_io

JSON I/O for scraped Reddit data.

save_scraped_json(posts, subreddit, output_dir='./input')

Save scraped posts to a JSON file compatible with the existing parser.

Wraps posts in Reddit's listing format so parse_json_files() can read them. Returns the path to the saved file.

Source code in src/python_reddit_scraper/scraper/json_io.py
def save_scraped_json(
    posts: list[dict],
    subreddit: str,
    output_dir: str = "./input",
) -> str:
    """
    Save scraped posts to a JSON file compatible with the existing parser.

    Wraps posts in Reddit's listing format so parse_json_files() can read them.
    Returns the path to the saved file.
    """
    out_path = Path(output_dir) / subreddit
    out_path.mkdir(parents=True, exist_ok=True)

    listing = {
        "kind": "Listing",
        "data": {
            "children": [{"kind": "t3", "data": post} for post in posts],
            "after": None,
        },
    }

    filepath = out_path / "scraped.json"
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(listing, f, ensure_ascii=False, indent=2)

    return str(filepath)

parse_json_files(input_dir)

Parse all JSON files in input directory and extract posts.

Source code in src/python_reddit_scraper/scraper/json_io.py
def parse_json_files(input_dir: str) -> list[dict]:
    """Parse all JSON files in input directory and extract posts."""
    posts: list[dict] = []
    input_path = Path(input_dir)

    if not input_path.exists():
        logger.error("Input directory {} does not exist!", input_dir)
        return posts

    json_files = sorted(set(list(input_path.glob("*.json")) + list(input_path.glob("**/*.json"))))
    logger.info("Found {} JSON files", len(json_files))

    for json_file in json_files:
        try:
            with open(json_file, encoding="utf-8") as f:
                data = json.load(f)

            if isinstance(data, dict) and "data" in data and "children" in data["data"]:
                for child in data["data"]["children"]:
                    if "data" in child:
                        posts.append(child["data"])
            elif isinstance(data, dict) and "data" in data:
                posts.append(data["data"])
            elif isinstance(data, list):
                for item in data:
                    if isinstance(item, dict) and "data" in item:
                        if "children" in item["data"]:
                            for child in item["data"]["children"]:
                                if "data" in child:
                                    posts.append(child["data"])
                        else:
                            posts.append(item["data"])

        except Exception as e:
            logger.error("Error parsing {}: {}", json_file, e)

    return posts

Downloader

Media

Media URL extraction, type detection, and filtering.

python_reddit_scraper.downloader.media

Media URL extraction, type detection, and filtering.

sanitize_filename(text, max_length=100)

Convert text to a safe filename.

Source code in src/python_reddit_scraper/downloader/media.py
def sanitize_filename(text: str, max_length: int = 100) -> str:
    """Convert text to a safe filename."""
    text = re.sub(r"[\s\n\r\t]+", " ", text).strip()
    text = re.sub(r"[^\w\-_.()\[\]{} ]", "", text)
    if len(text) > max_length:
        text = text[: max_length - 3] + "..."
    return text or "untitled"

get_file_extension(url)

Extract file extension from URL.

Source code in src/python_reddit_scraper/downloader/media.py
def get_file_extension(url: str) -> str:
    """Extract file extension from URL."""
    parsed = urlparse(url)
    path = parsed.path.lower()
    if "." in path:
        ext = path.split(".")[-1]
        if ext in ["jpg", "jpeg", "png", "gif", "webp", "mp4", "webm", "mov"]:
            return f".{ext}"
    return ".bin"

get_media_type(filename)

Determine media type from filename for directory sorting.

Source code in src/python_reddit_scraper/downloader/media.py
def get_media_type(filename: str) -> str:
    """Determine media type from filename for directory sorting."""
    ext = Path(filename).suffix.lower()
    if ext in [".jpg", ".jpeg", ".png", ".webp"]:
        return "images"
    elif ext in [".gif"]:
        return "gifs"
    elif ext in [".mp4", ".webm", ".mov"]:
        return "videos"
    else:
        return "other"

is_media_url(url)

Check if URL points to a media file.

Source code in src/python_reddit_scraper/downloader/media.py
def is_media_url(url: str) -> bool:
    """Check if URL points to a media file."""
    url_lower = url.lower()
    return any(
        ext in url_lower
        for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".mp4", ".webm", ".mov"]
    )

extract_media_urls(post_data)

Extract all media URLs from a Reddit post at highest resolution.

Source code in src/python_reddit_scraper/downloader/media.py
def extract_media_urls(post_data: dict) -> list[dict[str, str]]:
    """Extract all media URLs from a Reddit post at highest resolution."""
    if post_data is None or not isinstance(post_data, dict):
        return []

    media_urls: list[dict[str, str]] = []
    post_id = post_data.get("id", "unknown")
    title = post_data.get("title", "")
    safe_title = sanitize_filename(title)

    # 1. Direct media URL (highest priority)
    direct_url = post_data.get("url_overridden_by_dest")
    if direct_url and is_media_url(direct_url):
        media_urls.append(
            {
                "url": direct_url.replace("&amp;", "&"),
                "filename": f"{post_id}_{safe_title}{get_file_extension(direct_url)}",
            }
        )
        return media_urls

    # 2. Gallery posts
    if post_data.get("is_gallery") and post_data.get("media_metadata"):
        gallery_data = post_data.get("gallery_data") or {}
        gallery_items = gallery_data.get("items", [])
        media_metadata = post_data["media_metadata"]

        for i, item in enumerate(gallery_items):
            media_id = item.get("media_id")
            if media_id and media_id in media_metadata:
                meta = media_metadata[media_id]
                if "s" in meta and "u" in meta["s"]:
                    url = meta["s"]["u"].replace("&amp;", "&")
                    media_urls.append(
                        {
                            "url": url,
                            "filename": f"{post_id}_{safe_title}_{i + 1}{get_file_extension(url)}",
                        }
                    )

    # 3. Reddit-hosted videos (media.reddit_video)
    if post_data.get("is_video") or post_data.get("media"):
        media = post_data.get("media") or post_data.get("secure_media")
        if media and isinstance(media, dict) and "reddit_video" in media:
            video = media["reddit_video"]
            if "fallback_url" in video:
                video_url = video["fallback_url"]
                media_urls.append(
                    {"url": video_url, "filename": f"{post_id}_{safe_title}_video.mp4"}
                )
                media_urls.extend(_build_audio_entries(video_url, post_id, safe_title))

    # 4. Reddit video preview (embedded videos from redgifs, external hosts, etc.)
    preview = post_data.get("preview", {})
    if isinstance(preview, dict):
        rvp = preview.get("reddit_video_preview")
        if rvp and isinstance(rvp, dict) and "fallback_url" in rvp:
            video_url = rvp["fallback_url"]
            existing_video_urls = {m["url"] for m in media_urls}
            if video_url not in existing_video_urls:
                media_urls.append(
                    {"url": video_url, "filename": f"{post_id}_{safe_title}_video.mp4"}
                )

    # 5. Crossposted videos -- check parent post for video data
    crosspost_list = post_data.get("crosspost_parent_list")
    if crosspost_list and isinstance(crosspost_list, list):
        for cp in crosspost_list:
            if not isinstance(cp, dict):
                continue
            cp_media = cp.get("media") or cp.get("secure_media")
            if cp_media and isinstance(cp_media, dict) and "reddit_video" in cp_media:
                video = cp_media["reddit_video"]
                if "fallback_url" in video:
                    video_url = video["fallback_url"]
                    existing_video_urls = {m["url"] for m in media_urls}
                    if video_url not in existing_video_urls:
                        media_urls.append(
                            {
                                "url": video_url,
                                "filename": f"{post_id}_{safe_title}_video.mp4",
                            }
                        )
                        media_urls.extend(_build_audio_entries(video_url, post_id, safe_title))

    # 6. Preview images/GIFs
    if isinstance(preview, dict) and "images" in preview and preview["images"]:
        image_data = preview["images"][0]

        variants = image_data.get("variants", {})
        if "gif" in variants and "source" in variants["gif"]:
            gif_url = variants["gif"]["source"]["url"].replace("&amp;", "&")
            media_urls.append({"url": gif_url, "filename": f"{post_id}_{safe_title}_preview.gif"})
        elif "mp4" in variants and "source" in variants["mp4"]:
            mp4_url = variants["mp4"]["source"]["url"].replace("&amp;", "&")
            media_urls.append({"url": mp4_url, "filename": f"{post_id}_{safe_title}_preview.mp4"})
        elif "source" in image_data:
            has_video = any(get_media_type(m["filename"]) == "videos" for m in media_urls)
            if not has_video:
                img_url = image_data["source"]["url"].replace("&amp;", "&")
                media_urls.append(
                    {
                        "url": img_url,
                        "filename": f"{post_id}_{safe_title}_preview{get_file_extension(img_url)}",
                    }
                )

    # 7. Handle gifv links (convert to mp4)
    if direct_url and direct_url.endswith(".gifv"):
        mp4_url = direct_url[:-5] + ".mp4"
        media_urls.append({"url": mp4_url, "filename": f"{post_id}_{safe_title}.mp4"})

    # 8. Redgifs/external oembed thumbnail as last resort
    if not media_urls:
        media = post_data.get("media") or post_data.get("secure_media")
        if media and isinstance(media, dict):
            oembed = media.get("oembed")
            if oembed and isinstance(oembed, dict):
                thumb = oembed.get("thumbnail_url")
                if thumb and is_media_url(thumb):
                    media_urls.append(
                        {
                            "url": thumb.replace("&amp;", "&"),
                            "filename": f"{post_id}_{safe_title}_thumb{get_file_extension(thumb)}",
                        }
                    )

    return media_urls

extract_all_media(posts)

Extract all media URLs from a list of posts, deduplicating by URL.

Returns list of dicts with 'url', 'filename', and 'subreddit' keys.

Source code in src/python_reddit_scraper/downloader/media.py
def extract_all_media(posts: list[dict]) -> list[dict[str, str]]:
    """
    Extract all media URLs from a list of posts, deduplicating by URL.

    Returns list of dicts with 'url', 'filename', and 'subreddit' keys.
    """
    all_media: list[dict[str, str]] = []
    seen_urls: set[str] = set()

    for post in posts:
        if post is None or not isinstance(post, dict):
            continue
        subreddit = post.get("subreddit", "unknown")
        media_urls = extract_media_urls(post)
        for media in media_urls:
            url = media["url"]
            if url not in seen_urls:
                seen_urls.add(url)
                media["subreddit"] = subreddit
                all_media.append(media)

    return all_media

filter_by_media_type(downloads, media_types=None)

Filter media list by a set of allowed types.

Parameters:

Name Type Description Default
downloads list[dict[str, str]]

List of dicts with 'url' and 'filename' keys.

required
media_types set[str] | frozenset[str] | None

Allowed values from {"images", "videos", "gifs"}. None or an empty set means "no filter" — all items are kept, including those classified as "other" (preserves historical behavior where only --video-only / --image-only narrowed the set).

None

Returns:

Type Description
list[dict[str, str]]

Filtered list.

Source code in src/python_reddit_scraper/downloader/media.py
def filter_by_media_type(
    downloads: list[dict[str, str]],
    media_types: set[str] | frozenset[str] | None = None,
) -> list[dict[str, str]]:
    """Filter media list by a set of allowed types.

    Args:
        downloads: List of dicts with 'url' and 'filename' keys.
        media_types: Allowed values from ``{"images", "videos", "gifs"}``.
            ``None`` or an empty set means "no filter" — all items are kept,
            including those classified as ``"other"`` (preserves historical
            behavior where only ``--video-only`` / ``--image-only`` narrowed
            the set).

    Returns:
        Filtered list.
    """
    if not media_types:
        return downloads

    allowed = set(media_types)
    return [item for item in downloads if get_media_type(item["filename"]) in allowed]

Engine

Concurrent file downloading with progress tracking.

python_reddit_scraper.downloader.engine

Download engine: concurrent file downloading with progress tracking.

download_file(url, filepath, *, fallback_urls=None)

Download a file from URL to filepath with retries.

Returns:

Type Description
bool

(True, "") on success, or (False, reason) on failure.

str

reason is a short label like "http_403" or "timeout".

Source code in src/python_reddit_scraper/downloader/engine.py
def download_file(
    url: str,
    filepath: str,
    *,
    fallback_urls: list[str] | None = None,
) -> tuple[bool, str]:
    """Download a file from URL to filepath with retries.

    Returns:
        ``(True, "")`` on success, or ``(False, reason)`` on failure.
        *reason* is a short label like ``"http_403"`` or ``"timeout"``.
    """
    all_urls = [url] + (fallback_urls or [])

    for candidate_url in all_urls:
        for attempt in range(_MAX_RETRIES):
            try:
                _fetch_url(candidate_url, filepath)
                return True, ""
            except HTTPError as exc:
                code = exc.code
                reason = f"http_{code}"
                if code in _PERMANENT_CODES:
                    break  # try next fallback URL, don't retry this one
                if attempt < _MAX_RETRIES - 1:
                    time.sleep(_BACKOFF_BASE**attempt)
                    continue
            except (URLError, TimeoutError, OSError) as exc:
                reason = "timeout" if "timed out" in str(exc) else "connection_error"
                if attempt < _MAX_RETRIES - 1:
                    time.sleep(_BACKOFF_BASE**attempt)
                    continue
            except Exception as exc:
                reason = f"error_{type(exc).__name__}"
                break

    return False, reason

download_all(downloads, output_dir, workers=16, on_file_done=None, on_file_failed=None, progress=None)

Download all media files concurrently.

Parameters:

Name Type Description Default
downloads list[dict[str, str]]

List of dicts with 'url', 'filename', optionally 'subreddit', 'optional', and 'audio_fallbacks' keys.

required
output_dir str

Base output directory (files sorted into subdirectories).

required
workers int

Number of parallel download threads.

16
on_file_done OnFileDoneCallback | None

Optional callback (url: str) -> None on success.

None
on_file_failed OnFileFailedCallback | None

Optional callback (url: str, reason: str, permanent: bool) -> None.

None
progress ProgressDisplay | None

Optional shared :class:ProgressDisplay instance. When None (standalone / resume mode), a local rich.progress.Progress bar is used as a fallback.

None

Returns:

Type Description
int

Tuple of (successful, failed, error_counts) where error_counts

int

is a :class:~collections.Counter mapping reason labels to counts.

Source code in src/python_reddit_scraper/downloader/engine.py
def download_all(
    downloads: list[dict[str, str]],
    output_dir: str,
    workers: int = 16,
    on_file_done: OnFileDoneCallback | None = None,
    on_file_failed: OnFileFailedCallback | None = None,
    progress: ProgressDisplay | None = None,
) -> tuple[int, int, Counter]:
    """Download all media files concurrently.

    Args:
        downloads: List of dicts with 'url', 'filename', optionally 'subreddit',
            'optional', and 'audio_fallbacks' keys.
        output_dir: Base output directory (files sorted into subdirectories).
        workers: Number of parallel download threads.
        on_file_done: Optional callback ``(url: str) -> None`` on success.
        on_file_failed: Optional callback ``(url: str, reason: str, permanent: bool) -> None``.
        progress: Optional shared :class:`ProgressDisplay` instance. When *None*
            (standalone / resume mode), a local ``rich.progress.Progress`` bar is
            used as a fallback.

    Returns:
        Tuple of ``(successful, failed, error_counts)`` where *error_counts*
        is a :class:`~collections.Counter` mapping reason labels to counts.
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed

    # Build (url, filepath, fallback_urls, optional) tuples
    download_items: list[tuple[str, str, list[str], bool]] = []
    for media in downloads:
        media_type = get_media_type(media["filename"])
        subreddit = media.get("subreddit")
        if subreddit:
            filepath = os.path.join(output_dir, subreddit, media_type, media["filename"])
        else:
            filepath = os.path.join(output_dir, media_type, media["filename"])

        fallbacks = (
            media.get("audio_fallbacks", "").split("|") if media.get("audio_fallbacks") else []
        )
        optional = media.get("optional") == "true"
        download_items.append((media["url"], filepath, fallbacks, optional))

    if not download_items:
        return 0, 0, Counter()

    seen_dirs: set[str] = set()
    for _, filepath, _, _ in download_items:
        d = os.path.dirname(filepath)
        if d not in seen_dirs:
            seen_dirs.add(d)
            Path(d).mkdir(parents=True, exist_ok=True)

    successful = 0
    failed = 0
    skipped_optional = 0
    skipped_existing = 0
    error_counts: Counter = Counter()

    # Fallback: local rich progress bar when no shared ProgressDisplay is provided
    local_progress = None
    local_task_id = None
    if progress is None:
        import rich.progress

        local_progress = rich.progress.Progress(
            rich.progress.TextColumn("[bold blue]{task.description}"),
            rich.progress.BarColumn(),
            rich.progress.MofNCompleteColumn(),
            rich.progress.TimeElapsedColumn(),
            rich.progress.TransferSpeedColumn(),
        )
        local_progress.start()
        local_task_id = local_progress.add_task("Downloading", total=len(download_items))

    # Cross-session dedup: files already on disk are treated as successes.
    # Flat `{output_dir}/{subreddit}/{media_type}/` layout makes this a
    # reliable "I already have this post" signal across runs.
    fresh_items: list[tuple[str, str, list[str], bool]] = []
    for url, filepath, fallbacks, optional in download_items:
        if os.path.exists(filepath):
            skipped_existing += 1
            successful += 1
            if on_file_done:
                on_file_done(url)
            if progress is not None:
                progress.advance_download()
            elif local_progress is not None and local_task_id is not None:
                local_progress.advance(local_task_id)
            continue
        fresh_items.append((url, filepath, fallbacks, optional))

    try:
        with ThreadPoolExecutor(max_workers=workers) as executor:
            future_map = {
                executor.submit(
                    download_file,
                    url,
                    filepath,
                    fallback_urls=fallbacks,
                ): (url, filepath, optional)
                for url, filepath, fallbacks, optional in fresh_items
            }
            for future in as_completed(future_map):
                url, filepath, is_optional = future_map[future]
                success, reason = future.result()
                if success:
                    successful += 1
                    if on_file_done:
                        on_file_done(url)
                else:
                    permanent = (
                        reason.startswith("http_") and int(reason.split("_")[1]) in _PERMANENT_CODES
                    )
                    if is_optional and permanent:
                        skipped_optional += 1
                    else:
                        failed += 1
                        if progress is not None:
                            progress.mark_download_failure()
                    error_counts[reason] += 1
                    if on_file_failed:
                        on_file_failed(url, reason, permanent)

                if progress is not None:
                    progress.advance_download()
                elif local_progress is not None and local_task_id is not None:
                    local_progress.advance(local_task_id)
    finally:
        if local_progress is not None:
            local_progress.stop()

    if skipped_existing:
        logger.info(
            "Skipped {} file(s) already on disk (cross-session dedup)",
            skipped_existing,
        )
    if skipped_optional:
        logger.info(
            "Skipped {} optional files (audio tracks blocked by Reddit CDN)",
            skipped_optional,
        )
    if error_counts:
        summary = ", ".join(f"{c}x {r}" for r, c in error_counts.most_common())
        logger.warning("Download errors: {}", summary)

    return successful, failed, error_counts

run_download_queue(download_q, output_dir, workers, media_types, state=None, progress=None)

Consumer thread: pulls (subreddit, posts) from queue, downloads one sub at a time.

Returns cumulative (successful, failed) counts.

Source code in src/python_reddit_scraper/downloader/engine.py
def run_download_queue(
    download_q: queue.Queue[tuple[str, list[dict]] | None],
    output_dir: str,
    workers: int,
    media_types: set[str] | frozenset[str] | None,
    state=None,
    progress: ProgressDisplay | None = None,
) -> tuple[int, int]:
    """Consumer thread: pulls (subreddit, posts) from queue, downloads one sub at a time.

    Returns cumulative (successful, failed) counts.
    """
    total_ok = 0
    total_fail = 0

    while True:
        item = download_q.get()
        if item is None:
            break
        sub, posts = item

        media = extract_all_media(posts)
        media = filter_by_media_type(media, media_types=media_types)
        if not media:
            logger.info("r/{}: no media after filtering", sub)
            download_q.task_done()
            continue

        if state:
            state.set_media_manifest(state.media + media)

        if progress is not None:
            progress.init_download(total_files=len(media), sub=sub, queued=download_q.qsize())

        logger.info("r/{}: downloading {} files...", sub, len(media))
        ok, fail, _errors = download_all(
            media,
            output_dir,
            workers=workers,
            on_file_done=state.mark_downloaded if state else None,
            on_file_failed=state.mark_permanently_failed if state else None,
            progress=progress,
        )
        total_ok += ok
        total_fail += fail
        logger.info("r/{}: {} downloaded, {} failed", sub, ok, fail)
        download_q.task_done()

    return total_ok, total_fail

Session State

The state module manages resume/session persistence.

python_reddit_scraper.downloader.state

Session state management for resume support.

Persists scraping progress and download manifests to .scraper-state/ so interrupted runs can be resumed with --resume.

SessionState

Manages persistent state for a single scrape+download session.

State is saved to a JSON file in .scraper-state/{timestamp}.json. The file tracks which subreddits have been scraped, the full media manifest, and which files have been successfully downloaded.

Parameters:

Name Type Description Default
output_dir str

The download output directory for this session.

required
media_types set[str] | frozenset[str] | None

Allowed media types. None means "all types". Legacy video_only / image_only booleans are accepted for backward compatibility and translated on construction.

None
state_path str | None

Explicit path to a state file (used when resuming).

None
Source code in src/python_reddit_scraper/downloader/state.py
class SessionState:
    """
    Manages persistent state for a single scrape+download session.

    State is saved to a JSON file in ``.scraper-state/{timestamp}.json``.
    The file tracks which subreddits have been scraped, the full media
    manifest, and which files have been successfully downloaded.

    Args:
        output_dir: The download output directory for this session.
        media_types: Allowed media types. ``None`` means "all types".
            Legacy ``video_only`` / ``image_only`` booleans are accepted
            for backward compatibility and translated on construction.
        state_path: Explicit path to a state file (used when resuming).
    """

    def __init__(
        self,
        output_dir: str,
        media_types: set[str] | frozenset[str] | None = None,
        video_only: bool = False,
        image_only: bool = False,
        state_path: str | None = None,
    ):
        self.output_dir = output_dir
        if media_types is not None:
            self.media_types: frozenset[str] = frozenset(media_types) or ALL_MEDIA_TYPES
        else:
            self.media_types = _booleans_to_media_types(video_only, image_only)

        self.subreddits: dict[str, str] = {}
        self.media: list[dict] = []
        self._lock = threading.Lock()
        self._dirty_count = 0

        if state_path:
            self.state_path = state_path
        else:
            Path(STATE_DIR).mkdir(parents=True, exist_ok=True)
            ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            self.state_path = os.path.join(STATE_DIR, f"{ts}.json")

    @property
    def video_only(self) -> bool:
        """Legacy boolean view — true when filter is videos + gifs only."""
        return self.media_types == frozenset({"videos", "gifs"})

    @property
    def image_only(self) -> bool:
        """Legacy boolean view — true when filter is images only."""
        return self.media_types == frozenset({"images"})

    def _to_dict(self) -> dict:
        return {
            "output_dir": self.output_dir,
            "filters": {
                "media_types": sorted(self.media_types),
                # Legacy keys kept for forward-compat with older readers.
                "video_only": self.video_only,
                "image_only": self.image_only,
            },
            "subreddits": self.subreddits,
            "media": self.media,
        }

    def save(self) -> None:
        """Write current state to disk atomically."""
        tmp = self.state_path + ".tmp"
        with open(tmp, "w", encoding="utf-8") as f:
            json.dump(self._to_dict(), f, ensure_ascii=False)
        os.replace(tmp, self.state_path)
        self._dirty_count = 0

    @classmethod
    def load(cls, path: str) -> "SessionState":
        """
        Load a session state from a JSON file.

        Args:
            path: Path to the state JSON file.

        Returns:
            A populated SessionState instance.
        """
        with open(path, encoding="utf-8") as f:
            data = json.load(f)

        filters = data.get("filters", {})
        raw_mt = filters.get("media_types")
        if raw_mt is not None:
            media_types: frozenset[str] | None = frozenset(raw_mt) & ALL_MEDIA_TYPES or None
        else:
            media_types = _booleans_to_media_types(
                filters.get("video_only", False),
                filters.get("image_only", False),
            )

        state = cls(
            output_dir=data["output_dir"],
            media_types=media_types,
            state_path=path,
        )
        state.subreddits = data.get("subreddits", {})
        state.media = data.get("media", [])
        return state

    @classmethod
    def find_latest(cls) -> str | None:
        """
        Find the most recent state file in the state directory.

        Returns:
            Path to the newest state file, or None if none exist.
        """
        state_dir = Path(STATE_DIR)
        if not state_dir.exists():
            return None
        files = sorted(state_dir.glob("*.json"), reverse=True)
        return str(files[0]) if files else None

    @classmethod
    def list_all(cls) -> list[str]:
        """Return all state-file paths in ``.scraper-state/``, newest first."""
        state_dir = Path(STATE_DIR)
        if not state_dir.exists():
            return []
        return [str(p) for p in sorted(state_dir.glob("*.json"), reverse=True)]

    def mark_subreddit_scraped(self, sub: str) -> None:
        """Mark a subreddit as having been fully scraped."""
        self.subreddits[sub] = "scraped"

    def set_media_manifest(self, media_list: list[dict]) -> None:
        """
        Set the full media manifest (list of files to download).

        Each item should have ``url``, ``filename``, ``subreddit`` keys.
        A ``downloaded`` field is added and defaults to ``False``.
        """
        self.media = [{**m, "downloaded": m.get("downloaded", False)} for m in media_list]
        self.save()

    def mark_downloaded(self, url: str, batch_size: int = 50) -> None:
        """Mark a media URL as successfully downloaded.

        State is flushed to disk every ``batch_size`` completions for performance.

        Args:
            url: The URL that was downloaded.
            batch_size: How often to flush state to disk.
        """
        with self._lock:
            for item in self.media:
                if item["url"] == url:
                    item["downloaded"] = True
                    break
            self._dirty_count += 1
            if self._dirty_count >= batch_size:
                self.save()

    def mark_permanently_failed(self, url: str, reason: str, permanent: bool) -> None:
        """Mark a media URL as permanently failed (e.g. HTTP 403/404).

        These items will be skipped on future resume attempts.
        Only stores permanent failures; transient ones can be retried.
        """
        if not permanent:
            return
        with self._lock:
            for item in self.media:
                if item["url"] == url:
                    item["failed"] = True
                    item["fail_reason"] = reason
                    break
            self._dirty_count += 1
            if self._dirty_count >= 50:
                self.save()

    def get_pending_media(self) -> list[dict]:
        """Get media items that have not yet been downloaded.

        Also checks whether the file already exists on disk (handles
        the case where the file was downloaded but state wasn't saved).
        Permanently failed items (HTTP 403/404) are skipped.

        Returns:
            List of media dicts that still need downloading.
        """
        from python_reddit_scraper.downloader.media import get_media_type

        pending = []
        for item in self.media:
            if item.get("downloaded"):
                continue
            if item.get("failed"):
                continue
            media_type = get_media_type(item["filename"])
            sub = item.get("subreddit")
            if sub:
                filepath = os.path.join(self.output_dir, sub, media_type, item["filename"])
            else:
                filepath = os.path.join(self.output_dir, media_type, item["filename"])
            if os.path.exists(filepath):
                item["downloaded"] = True
                continue
            pending.append(item)
        return pending

    def flush_and_cleanup(self) -> None:
        """Save final state and remove the state file on completion."""
        import contextlib

        self.save()
        with contextlib.suppress(OSError):
            os.remove(self.state_path)
        with contextlib.suppress(OSError):
            os.rmdir(STATE_DIR)

video_only property

Legacy boolean view — true when filter is videos + gifs only.

image_only property

Legacy boolean view — true when filter is images only.

save()

Write current state to disk atomically.

Source code in src/python_reddit_scraper/downloader/state.py
def save(self) -> None:
    """Write current state to disk atomically."""
    tmp = self.state_path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(self._to_dict(), f, ensure_ascii=False)
    os.replace(tmp, self.state_path)
    self._dirty_count = 0

load(path) classmethod

Load a session state from a JSON file.

Parameters:

Name Type Description Default
path str

Path to the state JSON file.

required

Returns:

Type Description
SessionState

A populated SessionState instance.

Source code in src/python_reddit_scraper/downloader/state.py
@classmethod
def load(cls, path: str) -> "SessionState":
    """
    Load a session state from a JSON file.

    Args:
        path: Path to the state JSON file.

    Returns:
        A populated SessionState instance.
    """
    with open(path, encoding="utf-8") as f:
        data = json.load(f)

    filters = data.get("filters", {})
    raw_mt = filters.get("media_types")
    if raw_mt is not None:
        media_types: frozenset[str] | None = frozenset(raw_mt) & ALL_MEDIA_TYPES or None
    else:
        media_types = _booleans_to_media_types(
            filters.get("video_only", False),
            filters.get("image_only", False),
        )

    state = cls(
        output_dir=data["output_dir"],
        media_types=media_types,
        state_path=path,
    )
    state.subreddits = data.get("subreddits", {})
    state.media = data.get("media", [])
    return state

find_latest() classmethod

Find the most recent state file in the state directory.

Returns:

Type Description
str | None

Path to the newest state file, or None if none exist.

Source code in src/python_reddit_scraper/downloader/state.py
@classmethod
def find_latest(cls) -> str | None:
    """
    Find the most recent state file in the state directory.

    Returns:
        Path to the newest state file, or None if none exist.
    """
    state_dir = Path(STATE_DIR)
    if not state_dir.exists():
        return None
    files = sorted(state_dir.glob("*.json"), reverse=True)
    return str(files[0]) if files else None

list_all() classmethod

Return all state-file paths in .scraper-state/, newest first.

Source code in src/python_reddit_scraper/downloader/state.py
@classmethod
def list_all(cls) -> list[str]:
    """Return all state-file paths in ``.scraper-state/``, newest first."""
    state_dir = Path(STATE_DIR)
    if not state_dir.exists():
        return []
    return [str(p) for p in sorted(state_dir.glob("*.json"), reverse=True)]

mark_subreddit_scraped(sub)

Mark a subreddit as having been fully scraped.

Source code in src/python_reddit_scraper/downloader/state.py
def mark_subreddit_scraped(self, sub: str) -> None:
    """Mark a subreddit as having been fully scraped."""
    self.subreddits[sub] = "scraped"

set_media_manifest(media_list)

Set the full media manifest (list of files to download).

Each item should have url, filename, subreddit keys. A downloaded field is added and defaults to False.

Source code in src/python_reddit_scraper/downloader/state.py
def set_media_manifest(self, media_list: list[dict]) -> None:
    """
    Set the full media manifest (list of files to download).

    Each item should have ``url``, ``filename``, ``subreddit`` keys.
    A ``downloaded`` field is added and defaults to ``False``.
    """
    self.media = [{**m, "downloaded": m.get("downloaded", False)} for m in media_list]
    self.save()

mark_downloaded(url, batch_size=50)

Mark a media URL as successfully downloaded.

State is flushed to disk every batch_size completions for performance.

Parameters:

Name Type Description Default
url str

The URL that was downloaded.

required
batch_size int

How often to flush state to disk.

50
Source code in src/python_reddit_scraper/downloader/state.py
def mark_downloaded(self, url: str, batch_size: int = 50) -> None:
    """Mark a media URL as successfully downloaded.

    State is flushed to disk every ``batch_size`` completions for performance.

    Args:
        url: The URL that was downloaded.
        batch_size: How often to flush state to disk.
    """
    with self._lock:
        for item in self.media:
            if item["url"] == url:
                item["downloaded"] = True
                break
        self._dirty_count += 1
        if self._dirty_count >= batch_size:
            self.save()

mark_permanently_failed(url, reason, permanent)

Mark a media URL as permanently failed (e.g. HTTP 403/404).

These items will be skipped on future resume attempts. Only stores permanent failures; transient ones can be retried.

Source code in src/python_reddit_scraper/downloader/state.py
def mark_permanently_failed(self, url: str, reason: str, permanent: bool) -> None:
    """Mark a media URL as permanently failed (e.g. HTTP 403/404).

    These items will be skipped on future resume attempts.
    Only stores permanent failures; transient ones can be retried.
    """
    if not permanent:
        return
    with self._lock:
        for item in self.media:
            if item["url"] == url:
                item["failed"] = True
                item["fail_reason"] = reason
                break
        self._dirty_count += 1
        if self._dirty_count >= 50:
            self.save()

get_pending_media()

Get media items that have not yet been downloaded.

Also checks whether the file already exists on disk (handles the case where the file was downloaded but state wasn't saved). Permanently failed items (HTTP 403/404) are skipped.

Returns:

Type Description
list[dict]

List of media dicts that still need downloading.

Source code in src/python_reddit_scraper/downloader/state.py
def get_pending_media(self) -> list[dict]:
    """Get media items that have not yet been downloaded.

    Also checks whether the file already exists on disk (handles
    the case where the file was downloaded but state wasn't saved).
    Permanently failed items (HTTP 403/404) are skipped.

    Returns:
        List of media dicts that still need downloading.
    """
    from python_reddit_scraper.downloader.media import get_media_type

    pending = []
    for item in self.media:
        if item.get("downloaded"):
            continue
        if item.get("failed"):
            continue
        media_type = get_media_type(item["filename"])
        sub = item.get("subreddit")
        if sub:
            filepath = os.path.join(self.output_dir, sub, media_type, item["filename"])
        else:
            filepath = os.path.join(self.output_dir, media_type, item["filename"])
        if os.path.exists(filepath):
            item["downloaded"] = True
            continue
        pending.append(item)
    return pending

flush_and_cleanup()

Save final state and remove the state file on completion.

Source code in src/python_reddit_scraper/downloader/state.py
def flush_and_cleanup(self) -> None:
    """Save final state and remove the state file on completion."""
    import contextlib

    self.save()
    with contextlib.suppress(OSError):
        os.remove(self.state_path)
    with contextlib.suppress(OSError):
        os.rmdir(STATE_DIR)

CLI

Commands

The CLI commands module provides the main download command.

python_reddit_scraper.cli.commands

Typer-decorated entry points for the Reddit media downloader.

This module is intentionally thin: each command parses its arguments and delegates straight to a flow in python_reddit_scraper.cli.flows.

download(subreddits=None, output_dir=None, save_json=False, max_pages=None, workers=None, scrape_workers=None, resume=False, dry_run=False, version=False)

Download media from Reddit subreddits.

Source code in src/python_reddit_scraper/cli/commands.py
def download(
    subreddits: Annotated[
        str | None,
        typer.Option(
            "--subreddits",
            "-s",
            help="Comma-separated subreddit names (e.g. 'buildapc,dataengineering').",
        ),
    ] = None,
    output_dir: Annotated[
        str | None,
        typer.Option(
            "--output-dir",
            "-o",
            help="Base directory for downloaded files. "
            "If omitted, uses the value from config.yaml or prompts interactively.",
        ),
    ] = None,
    save_json: Annotated[
        bool,
        typer.Option(
            "--save-json", help="Save scraped JSON to ./input/{subreddit}/ for later reuse."
        ),
    ] = False,
    max_pages: Annotated[
        int | None,
        typer.Option(
            "--max-pages",
            help="Max pages to scrape per subreddit (100 posts/page). "
            "If omitted, uses the value from config.yaml or prompts interactively.",
        ),
    ] = None,
    workers: Annotated[
        int | None,
        typer.Option(
            "--workers",
            "-w",
            help="Number of parallel download threads. "
            "If omitted, uses the value from config.yaml (default: 16).",
        ),
    ] = None,
    scrape_workers: Annotated[
        int | None,
        typer.Option(
            "--scrape-workers",
            "-sw",
            help="Max parallel camoufox scraper processes. "
            "If omitted, uses the value from config.yaml (default: 1).",
        ),
    ] = None,
    resume: Annotated[
        bool,
        typer.Option("--resume", help="Resume the most recent interrupted download session."),
    ] = False,
    dry_run: Annotated[
        bool,
        typer.Option(
            "--dry-run",
            help="Scrape and list what would be downloaded; do not write any files.",
        ),
    ] = False,
    version: Annotated[
        bool,
        typer.Option(
            "--version",
            "-V",
            help="Show version and exit.",
            callback=_version_callback,
            is_eager=True,
        ),
    ] = False,
) -> None:
    """Download media from Reddit subreddits."""
    if resume:
        from python_reddit_scraper.cli.flows.resume import run_resume

        run_resume(workers)
        return

    from python_reddit_scraper.cli.flows.live import run_live

    run_live(
        subreddits=subreddits,
        output_dir=output_dir,
        save_json=save_json,
        max_pages=max_pages,
        workers=workers,
        scrape_workers=scrape_workers,
        dry_run=dry_run,
    )

Runtime

Shared runtime helpers used across CLI flows (env checks, config resolution).

python_reddit_scraper.cli.runtime

Runtime environment setup: camoufox check, proxy loading, option resolution.

This module is the glue between user-provided CLI arguments / YAML defaults and the concrete values the scraper and downloader need. It contains no UI styling — presentation is the responsibility of python_reddit_scraper.ui.

resolve_options(output_dir, max_pages, workers, scrape_workers)

Apply the CLI → config → prompt-or-default ladder for user-tunable options.

Source code in src/python_reddit_scraper/cli/runtime.py
def resolve_options(
    output_dir: str | None,
    max_pages: int | None,
    workers: int | None,
    scrape_workers: int | None,
) -> ResolvedOptions:
    """Apply the CLI → config → prompt-or-default ladder for user-tunable options."""
    defaults = get_defaults()

    if defaults.media_types is not None:
        media_types: frozenset[str] = frozenset(defaults.media_types)
    else:
        media_types = prompt_media_types(default=ALL_MEDIA_TYPES)

    if output_dir is not None:
        resolved_out = output_dir
    elif defaults.output_dir is not None:
        resolved_out = defaults.output_dir
    else:
        resolved_out = prompt_output_dir(DEFAULT_OUTPUT_DIR)

    if max_pages is not None:
        resolved_pages = max_pages
    elif defaults.max_pages is not None:
        resolved_pages = defaults.max_pages
    else:
        resolved_pages = prompt_max_pages(DEFAULT_MAX_PAGES)

    resolved_workers = workers if workers is not None else (defaults.workers or DEFAULT_WORKERS)
    resolved_scrape_workers = (
        scrape_workers
        if scrape_workers is not None
        else (defaults.scrape_workers or DEFAULT_SCRAPE_WORKERS)
    )

    return ResolvedOptions(
        media_types=media_types,
        output_dir=resolved_out,
        max_pages=resolved_pages,
        workers=resolved_workers,
        scrape_workers=resolved_scrape_workers,
    )

check_camoufox_binary()

Verify the stealth Firefox binary required by camoufox is installed.

Source code in src/python_reddit_scraper/cli/runtime.py
def check_camoufox_binary() -> None:
    """Verify the stealth Firefox binary required by camoufox is installed."""
    with spinner("Verifying Camoufox binary…"):
        try:
            from camoufox.pkgman import installed_verstr

            ver = installed_verstr()
            if not ver:
                raise FileNotFoundError
        except Exception:
            logger.error(
                "Camoufox browser not found. Run this command first:\n\n"
                "    uv run camoufox fetch\n\n"
                "This downloads the stealth Firefox binary (~80 MB, one-time setup)."
            )
            raise typer.Exit(1) from None

load_proxies()

Load proxies for the chosen provider, with per-account fallback.

Returns the working proxy pool or None when no providers are configured. Prompts for a provider when multiple are present in the YAML config. Exits with a clear error when every account for the picked provider has hit its bandwidth limit.

Source code in src/python_reddit_scraper/cli/runtime.py
def load_proxies() -> list[dict] | None:
    """Load proxies for the chosen provider, with per-account fallback.

    Returns the working proxy pool or ``None`` when no providers are configured.
    Prompts for a provider when multiple are present in the YAML config.
    Exits with a clear error when every account for the picked provider has
    hit its bandwidth limit.
    """
    from python_reddit_scraper.scraper.proxy_handler import (
        AllAccountsExhaustedError,
        load_proxies_for_provider,
    )

    providers = get_providers()
    if not providers:
        return None

    provider = choose_provider(providers)

    try:
        from camoufox.locale import download_mmdb

        with spinner("Downloading GeoIP database…"):
            download_mmdb()
        with spinner(f"Probing {provider.name} accounts…"):
            return load_proxies_for_provider(provider)
    except AllAccountsExhaustedError as exc:
        logger.error("{}", exc)
        raise typer.Exit(1) from exc
    except Exception as exc:
        logger.warning("Could not load {} proxies, scraping without proxy: {}", provider.name, exc)
        return None

Configure

Interactive subcommand that writes user defaults to config.yaml.

python_reddit_scraper.cli.configure

Interactive configure subcommand — writes defaults to config.yaml.

configure()

Interactively write download defaults to ~/.config/python_reddit_scraper/config.yaml.

Source code in src/python_reddit_scraper/cli/configure.py
def configure() -> None:
    """Interactively write download defaults to ~/.config/python_reddit_scraper/config.yaml."""
    existing = get_defaults()

    media_default = (
        set(existing.media_types) if existing.media_types is not None else set(ALL_MEDIA_TYPES)
    )
    media_types = prompt_media_types(default=media_default)

    output_default = existing.output_dir or "./redditdownloads"
    output_dir = prompt_output_dir(output_default)

    pages_default = existing.max_pages if existing.max_pages is not None else 50
    max_pages = prompt_max_pages(pages_default)

    workers_default = existing.workers if existing.workers is not None else 16
    workers = prompt_workers(workers_default)

    scrape_workers_default = existing.scrape_workers if existing.scrape_workers is not None else 1
    scrape_workers = prompt_scrape_workers(scrape_workers_default)

    path = save_defaults(
        Defaults(
            media_types=tuple(sorted(media_types)),
            output_dir=output_dir,
            max_pages=max_pages,
            workers=workers,
            scrape_workers=scrape_workers,
        )
    )
    print_defaults_panel(
        path,
        {
            "media_types": ", ".join(sorted(media_types)),
            "output_dir": output_dir,
            "max_pages": max_pages,
            "workers": workers,
            "scrape_workers": scrape_workers,
        },
    )

History

history subcommand — lists past runs from the run log.

python_reddit_scraper.cli.history_cmd

download-reddit-media history subcommand.

history(limit=20, show_all=False, since=None)

Render past runs as a rich table.

Source code in src/python_reddit_scraper/cli/history_cmd.py
def history(
    limit: Annotated[
        int,
        typer.Option("--limit", "-n", help="Show at most this many runs (newest first)."),
    ] = 20,
    show_all: Annotated[
        bool,
        typer.Option("--all", help="Show every recorded run (overrides --limit)."),
    ] = False,
    since: Annotated[
        str | None,
        typer.Option(
            "--since",
            help="Filter to runs within the last duration, e.g. 30m, 12h, 7d.",
        ),
    ] = None,
) -> None:
    """Render past runs as a rich table."""
    runs = filter_runs(
        load_runs(),
        limit=None if show_all else limit,
        since=since,
    )
    print_history(runs, path=history_path())

Flows — Live

End-to-end scrape + download flow with a live dashboard.

python_reddit_scraper.cli.flows.live

Live-scrape flow: scrape subreddits from Reddit, then download media.

run_live(subreddits, output_dir, save_json, max_pages, workers, scrape_workers, dry_run=False)

Scrape one or more subreddits and download all matching media.

With dry_run=True the media URLs are extracted and counted but nothing is written to disk — useful for validating a large run before committing.

Source code in src/python_reddit_scraper/cli/flows/live.py
def run_live(
    subreddits: str | None,
    output_dir: str | None,
    save_json: bool,
    max_pages: int | None,
    workers: int | None,
    scrape_workers: int | None,
    dry_run: bool = False,
) -> None:
    """Scrape one or more subreddits and download all matching media.

    With ``dry_run=True`` the media URLs are extracted and counted but nothing
    is written to disk — useful for validating a large run before committing.
    """
    if subreddits:
        sub_list = [s.strip().lstrip("r/") for s in subreddits.split(",") if s.strip()]
    else:
        sub_list = prompt_subreddits()

    check_camoufox_binary()
    proxies = load_proxies()

    sub_list = _run_preflight(sub_list, proxies)

    opts = resolve_options(output_dir, max_pages, workers, scrape_workers)
    session_dir = _ensure_output_dir(opts.output_dir)

    print_banner("dry-run" if dry_run else "live", subreddit_count=len(sub_list))
    logger.info(
        "Scraping {} subreddit(s): {}",
        len(sub_list),
        ", ".join(f"r/{s}" for s in sub_list),
    )
    started_at = time.time()

    if dry_run:
        total = _run_dry(sub_list, opts, proxies, started_at)
        record_run(
            mode="dry-run",
            subreddits=sub_list,
            duration_s=time.time() - started_at,
            successful=total,
            failed=0,
            output_dir=session_dir,
        )
        return

    state = SessionState(output_dir=session_dir, media_types=opts.media_types)
    for sub in sub_list:
        state.subreddits[sub] = "pending"
    state.save()

    download_q: queue.Queue[tuple[str, list[dict]] | None] = queue.Queue()
    download_results: list[tuple[int, int]] = []
    progress = ProgressDisplay(total_subs=len(sub_list))

    def download_consumer() -> None:
        ok, fail = run_download_queue(
            download_q,
            session_dir,
            opts.workers,
            opts.media_types,
            state,
            progress=progress,
        )
        download_results.append((ok, fail))

    consumer = threading.Thread(target=download_consumer, daemon=True)
    consumer.start()

    def on_sub_complete(sub: str, posts: list[dict]) -> None:
        state.mark_subreddit_scraped(sub)
        if save_json and posts:
            path = save_scraped_json(posts, sub)
            logger.info("r/{}: saved JSON to {}", sub, path)
        state.save()
        download_q.put((sub, posts))

    with progress:
        scrape_parallel(
            sub_list,
            max_pages=opts.max_pages,
            max_workers=min(len(sub_list), opts.scrape_workers),
            on_complete=on_sub_complete,
            progress=progress,
            proxies=proxies,
        )
        download_q.put(None)
        consumer.join()

    total_ok = sum(r[0] for r in download_results)
    total_fail = sum(r[1] for r in download_results)

    print_summary(
        session_dir,
        total_ok,
        total_fail,
        list(state.subreddits.keys()),
        started_at=started_at,
    )

    record_run(
        mode="live",
        subreddits=list(state.subreddits.keys()),
        duration_s=time.time() - started_at,
        successful=total_ok,
        failed=total_fail,
        output_dir=session_dir,
    )

    if total_fail == 0:
        state.flush_and_cleanup()
    else:
        state.save()
        logger.info("Resume with: download-reddit-media --resume")

Flows — Resume

Resume flow for continuing a previously interrupted download session.

python_reddit_scraper.cli.flows.resume

Resume flow: pick up a previously interrupted download session.

run_resume(workers)

Resume a previously interrupted download session.

When a single interrupted session exists it is picked automatically; otherwise the user picks via a radiolist dialog (Enter takes the newest).

Source code in src/python_reddit_scraper/cli/flows/resume.py
def run_resume(workers: int | None) -> None:
    """Resume a previously interrupted download session.

    When a single interrupted session exists it is picked automatically;
    otherwise the user picks via a radiolist dialog (Enter takes the newest).
    """
    defaults = get_defaults()
    resolved_workers = workers if workers is not None else (defaults.workers or DEFAULT_WORKERS)

    state_paths = SessionState.list_all()
    if not state_paths:
        logger.error("No interrupted session found in .scraper-state/")
        raise typer.Exit(1)

    state_path = pick_resume_session([(p, _describe_state(p)) for p in state_paths])
    if state_path is None:
        logger.info("Resume cancelled.")
        raise typer.Exit(0)

    logger.info("Resuming session from {}", state_path)
    state = SessionState.load(state_path)
    output_dir = state.output_dir

    print_banner("resume", subreddit_count=len(state.subreddits))

    pending_subs = [sub for sub, status in state.subreddits.items() if status == "pending"]
    if pending_subs:
        _rescrape_pending(state, pending_subs, defaults)

    pending = state.get_pending_media()
    total = len(state.media)
    failed_count = sum(1 for m in state.media if m.get("failed"))
    done = sum(1 for m in state.media if m.get("downloaded"))
    logger.info(
        "{}/{} downloaded, {} remaining, {} permanently failed",
        done,
        total,
        len(pending),
        failed_count,
    )

    if not pending:
        logger.success("All downloadable files complete!")
        state.flush_and_cleanup()
        return

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    started_at = time.time()

    ok, fail, _errors = download_all(
        pending,
        output_dir,
        workers=resolved_workers,
        on_file_done=state.mark_downloaded,
        on_file_failed=state.mark_permanently_failed,
    )

    print_summary(
        output_dir,
        ok,
        fail,
        list(state.subreddits.keys()),
        started_at=started_at,
        title="Resume summary",
    )

    record_run(
        mode="resume",
        subreddits=list(state.subreddits.keys()),
        duration_s=time.time() - started_at,
        successful=ok,
        failed=fail,
        output_dir=output_dir,
    )

    remaining = state.get_pending_media()
    if not remaining:
        state.flush_and_cleanup()
        logger.info("State file cleaned up — all done!")
    else:
        state.save()
        logger.info(
            "{} files still pending — resume again with: download-reddit-media --resume",
            len(remaining),
        )

UI

Prompts

Interactive prompts (fuzzy-completed, styled) used by the CLI flows.

python_reddit_scraper.ui.prompts

Interactive prompts built on prompt_toolkit — styled, fuzzy-completed, live-validated.

Every single-line prompt renders through :func:styled_prompt so the visual style ([LABEL:] in bold cyan + dim default hint) stays consistent. Dialogs use the palette from :mod:python_reddit_scraper.ui.theme.

styled_prompt(label, *, default=None, validator=None, completer=None)

Render a prompt as [LABEL:] (default) and return the user's answer.

An empty answer returns default when provided, otherwise returns "".

Source code in src/python_reddit_scraper/ui/prompts.py
def styled_prompt(
    label: str,
    *,
    default: str | None = None,
    validator: Validator | None = None,
    completer: Completer | None = None,
) -> str:
    """Render a prompt as `[LABEL:] (default) ` and return the user's answer.

    An empty answer returns *default* when provided, otherwise returns ``""``.
    """
    fragments: list[tuple[str, str]] = [("class:key", f"[{label.upper()}:]")]
    if default not in (None, ""):
        fragments.append(("class:hint", f" ({default})"))
    fragments.append(("", " "))

    raw = prompt(
        FormattedText(fragments),
        style=PROMPT_STYLE,
        validator=validator,
        validate_while_typing=False,
        completer=completer,
        complete_while_typing=bool(completer),
    ).strip()
    if not raw and default is not None:
        return str(default)
    return raw

choose_provider(providers)

Return the single provider, or let the user pick via a radiolist dialog.

Source code in src/python_reddit_scraper/ui/prompts.py
def choose_provider(providers: list[Provider]) -> Provider:
    """Return the single provider, or let the user pick via a radiolist dialog."""
    if len(providers) == 1:
        return providers[0]

    _require_tty("proxy provider")
    values = [
        (
            p,
            f"{p.name}{len(p.accounts)} account{'s' if len(p.accounts) != 1 else ''}",
        )
        for p in providers
    ]
    selected = radiolist_dialog(
        title="Proxy provider",
        text="Pick the proxy provider to use for this run.",
        values=values,
        style=PROMPT_STYLE,
    ).run()
    if selected is None:
        logger.error("No provider selected. Exiting.")
        raise typer.Exit(1)
    return selected

prompt_subreddits()

Prompt for comma-separated subreddit names with fuzzy completion.

Source code in src/python_reddit_scraper/ui/prompts.py
def prompt_subreddits() -> list[str]:
    """Prompt for comma-separated subreddit names with fuzzy completion."""
    _require_tty("subreddits")
    completer = FuzzyWordCompleter(_load_subreddit_vocabulary())
    raw = styled_prompt("SUBREDDITS", default=None, completer=completer)
    subs = [s.strip().lstrip("r/") for s in raw.split(",") if s.strip()]
    if not subs:
        logger.error("No subreddits provided. Exiting.")
        raise typer.Exit(1)
    _append_to_history(subs)
    return subs

pick_resume_session(sessions)

Prompt the user to pick one of sessions to resume.

sessions is a list of (path, label) pairs in newest-first order. Returns the chosen path, or None if the dialog was cancelled. Auto-returns the only option when len(sessions) == 1.

Source code in src/python_reddit_scraper/ui/prompts.py
def pick_resume_session(sessions: list[tuple[str, str]]) -> str | None:
    """Prompt the user to pick one of *sessions* to resume.

    *sessions* is a list of ``(path, label)`` pairs in newest-first order.
    Returns the chosen path, or ``None`` if the dialog was cancelled.
    Auto-returns the only option when ``len(sessions) == 1``.
    """
    if not sessions:
        return None
    if len(sessions) == 1:
        return sessions[0][0]

    _require_tty("resume session")
    selected = radiolist_dialog(
        title="Resume which session?",
        text="Pick an interrupted session (newest first).",
        values=[(path, label) for path, label in sessions],
        default=sessions[0][0],
        style=PROMPT_STYLE,
    ).run()
    return selected

prompt_media_types(default=None)

Prompt for media types via a checkbox dialog (space toggles, enter confirms).

Source code in src/python_reddit_scraper/ui/prompts.py
def prompt_media_types(default: set[str] | frozenset[str] | None = None) -> frozenset[str]:
    """Prompt for media types via a checkbox dialog (space toggles, enter confirms)."""
    _require_tty("media types")
    preselected = list(default) if default else list(ALL_MEDIA_TYPES)
    values = [
        ("images", "Images (.jpg/.jpeg/.png/.webp)"),
        ("videos", "Videos (.mp4/.webm/.mov)"),
        ("gifs", "GIFs / animations (.gif)"),
    ]
    selection = checkboxlist_dialog(
        title="Media types",
        text="Space toggles, Enter confirms.",
        values=values,
        default_values=preselected,
        style=PROMPT_STYLE,
    ).run()
    if not selection:
        logger.error("No media types selected. Exiting.")
        raise typer.Exit(1)
    return frozenset(selection)

prompt_output_dir(default)

Prompt for an output directory with tab-completion; empty input returns default.

Source code in src/python_reddit_scraper/ui/prompts.py
def prompt_output_dir(default: str) -> str:
    """Prompt for an output directory with tab-completion; empty input returns *default*."""
    _require_tty("output directory")
    raw = styled_prompt("OUTPUT DIR", default=default, completer=PathCompleter(expanduser=True))
    return os.path.expanduser(raw) if raw and raw != default else raw

Theme

Shared color palette and prompt_toolkit styles.

python_reddit_scraper.ui.theme

Central colour palette and formatting helpers.

A single coherent look across the CLI: rich markup for output, prompt_toolkit styles for input. The two styling systems are kept in sync here so a change to a colour in one place flows everywhere.

key_value(label, value)

Render a [LABEL:] value line as rich-markup text.

Source code in src/python_reddit_scraper/ui/theme.py
def key_value(label: str, value: object) -> str:
    """Render a ``[LABEL:] value`` line as rich-markup text."""
    return f"[{KEY}]\\[{label.upper()}:][/{KEY}] [{VALUE}]{value}[/{VALUE}]"

label(text)

Bold-cyan [LABEL:] prefix (no trailing value).

Source code in src/python_reddit_scraper/ui/theme.py
def label(text: str) -> str:
    """Bold-cyan ``[LABEL:]`` prefix (no trailing value)."""
    return f"[{KEY}]\\[{text.upper()}:][/{KEY}]"

Rich-based UI helpers — startup banner, run summary, preflight and history tables.

python_reddit_scraper.ui.banner

Startup banner: a single rich Rule with app name, version, mode, and target count.

print_banner(mode, subreddit_count=None)

Print a one-line ━━ app v1.1.1 • mode=live • 3 subs ━━ rule.

Source code in src/python_reddit_scraper/ui/banner.py
def print_banner(mode: str, subreddit_count: int | None = None) -> None:
    """Print a one-line ``━━ app v1.1.1 • mode=live • 3 subs ━━`` rule."""
    parts = [f"{__app_name__} [bold]v{__version__}[/bold]", f"mode=[magenta]{mode}[/magenta]"]
    if subreddit_count is not None:
        parts.append(f"[cyan]{subreddit_count} sub{'s' if subreddit_count != 1 else ''}[/cyan]")
    title = f"[bold cyan] {' [bright_black]•[/bright_black] '.join(parts)} [/bold cyan]"
    log_console.print(Rule(title=title, style=BORDER, align="center"))

python_reddit_scraper.ui.spinner

Uniform spinner context manager backed by rich.status.Status.

Usage::

with spinner("Probing proxy accounts…"):
    result = slow_work()

spinner(message, *, spinner_style='dots')

Show a spinner with message while the wrapped block runs.

The spinner is automatically hidden when the block exits (success or exception), and on non-interactive stdout it degrades to a single message… line so piped invocations still surface progress.

Source code in src/python_reddit_scraper/ui/spinner.py
@contextmanager
def spinner(message: str, *, spinner_style: str = "dots") -> Iterator[None]:
    """Show a spinner with *message* while the wrapped block runs.

    The spinner is automatically hidden when the block exits (success *or*
    exception), and on non-interactive stdout it degrades to a single
    ``message…`` line so piped invocations still surface progress.
    """
    if log_console.is_terminal:
        with log_console.status(f"[cyan]{message}", spinner=spinner_style):
            yield
    else:
        log_console.print(f"[cyan]{message}")
        yield

python_reddit_scraper.ui.summary

End-of-run summary rendering.

Two callers:

  • print_summary — after a live scrape or resume run; per-subreddit counts.
  • print_defaults_panel — after configure saves; echoes the persisted values.

print_summary(output_dir, successful, failed, subreddits, *, started_at=None, title='Download summary')

Render a rich Panel+Table summary of what was downloaded.

Source code in src/python_reddit_scraper/ui/summary.py
def print_summary(
    output_dir: str,
    successful: int,
    failed: int,
    subreddits: list[str],
    *,
    started_at: float | None = None,
    title: str = "Download summary",
) -> None:
    """Render a rich Panel+Table summary of what was downloaded."""
    table = Table(
        box=ROUNDED,
        border_style="bright_black",
        header_style="bold cyan",
        expand=False,
    )
    table.add_column("Subreddit", style="cyan", no_wrap=True)
    for name in _MEDIA_DIRS:
        table.add_column(name.capitalize(), justify="right")
    table.add_column("Total", justify="right", style="bold")
    table.add_column("Size", justify="right", style="green")

    total_files = 0
    total_bytes = 0
    for sub in subreddits or ():
        sub_path = Path(output_dir, sub)
        if not sub_path.exists():
            table.add_row(f"r/{sub}", "—", "—", "—", "0", "—")
            continue
        counts = {name: _count_files(sub_path / name) for name in _MEDIA_DIRS}
        size = _dir_size(sub_path)
        files = sum(counts.values())
        total_files += files
        total_bytes += size
        table.add_row(
            f"r/{sub}",
            *[str(counts[n]) if counts[n] else "—" for n in _MEDIA_DIRS],
            str(files),
            _fmt_bytes(size),
        )

    elapsed = time.time() - started_at if started_at else None
    summary_line = _status_line(successful, failed, elapsed)

    panel = Panel(
        _stack(table, summary_line),
        title=f"[bold cyan] {title} [/bold cyan]",
        subtitle=f"[dim]{output_dir}[/dim]",
        border_style=BORDER,
        box=ROUNDED,
    )
    log_console.print(panel)

print_dry_run(counts, *, started_at=None)

Render a DRY RUN panel of per-subreddit media counts; no disk I/O.

Source code in src/python_reddit_scraper/ui/summary.py
def print_dry_run(
    counts: dict[str, Counter[str]],
    *,
    started_at: float | None = None,
) -> None:
    """Render a DRY RUN panel of per-subreddit media counts; no disk I/O."""
    table = Table(
        box=ROUNDED,
        border_style="bright_black",
        header_style="bold cyan",
        expand=False,
    )
    table.add_column("Subreddit", style="cyan", no_wrap=True)
    for name in _MEDIA_DIRS:
        table.add_column(name.capitalize(), justify="right")
    table.add_column("Total", justify="right", style="bold")

    total_files = 0
    for sub, c in counts.items():
        files = sum(c.values())
        total_files += files
        table.add_row(
            f"r/{sub}",
            *[str(c[n]) if c[n] else "—" for n in _MEDIA_DIRS],
            str(files),
        )

    elapsed = time.time() - started_at if started_at else None
    footer = Text()
    footer.append("  ")
    footer.append(f"{total_files} files would download", style="bold yellow")
    if elapsed is not None:
        footer.append("  ·  ", style="bright_black")
        footer.append(_fmt_duration(elapsed), style="dim")

    panel = Panel(
        _stack(table, footer),
        title="[bold yellow] DRY RUN [/bold yellow]",
        subtitle="[dim]no files were written[/dim]",
        border_style="yellow",
        box=ROUNDED,
    )
    log_console.print(panel)

print_defaults_panel(path, defaults)

Echo the saved configure defaults as a KEY/value table inside a panel.

Source code in src/python_reddit_scraper/ui/summary.py
def print_defaults_panel(path: Path, defaults: dict) -> None:
    """Echo the saved configure defaults as a KEY/value table inside a panel."""
    table = Table.grid(padding=(0, 2))
    table.add_column(style=KEY, no_wrap=True)
    table.add_column(style=VALUE)
    for key, value in defaults.items():
        table.add_row(f"[{KEY}]\\[{key.upper()}:][/{KEY}]", str(value))

    panel = Panel(
        table,
        title="[bold cyan] Saved defaults [/bold cyan]",
        subtitle=f"[dim]{path}[/dim]",
        border_style=BORDER,
        box=ROUNDED,
    )
    log_console.print(panel)

python_reddit_scraper.ui.preflight_table

Render the subreddit preflight results as a rich Table.

print_preflight(results)

Render results as a bordered table inside a cyan panel.

Source code in src/python_reddit_scraper/ui/preflight_table.py
def print_preflight(results: list[PreflightResult]) -> None:
    """Render *results* as a bordered table inside a cyan panel."""
    table = Table(
        box=ROUNDED,
        border_style="bright_black",
        header_style="bold cyan",
        expand=False,
    )
    table.add_column("Subreddit", style="cyan", no_wrap=True)
    table.add_column("Status", no_wrap=True)
    table.add_column("Subs", justify="right")
    table.add_column("NSFW", justify="center")
    table.add_column("Notes", style="dim")

    for r in results:
        status_style = _STATUS_STYLE.get(r.status, "white")
        if r.status == "public":
            mark = "[green]✓[/green]"
        elif not r.ok:
            mark = "[red]✗[/red]"
        else:
            mark = "[yellow]?[/yellow]"
        status_text = f"{mark} [{status_style}]{r.status}[/{status_style}]"
        table.add_row(
            f"r/{r.sub}",
            status_text,
            _fmt_subscribers(r.subscribers) if r.subscribers is not None else "—",
            _fmt_nsfw(r.nsfw),
            r.note or "",
        )

    panel = Panel(
        table,
        title="[bold cyan] Preflight [/bold cyan]",
        border_style=BORDER,
        box=ROUNDED,
    )
    log_console.print(panel)

python_reddit_scraper.ui.history_table

Render download-reddit-media history output as a rich Panel+Table.

print_history(runs, *, path)

Render runs (newest first) with a subtitle pointing to the source file.

Source code in src/python_reddit_scraper/ui/history_table.py
def print_history(runs: list[RunRecord], *, path: Path) -> None:
    """Render *runs* (newest first) with a subtitle pointing to the source file."""
    if not runs:
        panel = Panel(
            "[dim]No runs recorded yet.[/dim]",
            title="[bold cyan] History [/bold cyan]",
            subtitle=f"[dim]{path}[/dim]",
            border_style=BORDER,
            box=ROUNDED,
        )
        log_console.print(panel)
        return

    table = Table(
        box=ROUNDED,
        border_style="bright_black",
        header_style="bold cyan",
        expand=False,
    )
    table.add_column("When", no_wrap=True, style="dim")
    table.add_column("Mode", no_wrap=True)
    table.add_column("Subreddits", overflow="fold")
    table.add_column("OK", justify="right", style="green")
    table.add_column("Fail", justify="right", style="red")
    table.add_column("Duration", justify="right", no_wrap=True)
    table.add_column("Output", overflow="fold", style="dim")

    for r in runs:
        mode_style = _MODE_STYLE.get(r.mode, "white")
        table.add_row(
            _fmt_when(r.when),
            f"[{mode_style}]{r.mode}[/{mode_style}]",
            _fmt_subs(r.subreddits),
            str(r.successful) if r.successful else "—",
            str(r.failed) if r.failed else "—",
            _fmt_duration(r.duration_s),
            r.output_dir,
        )

    panel = Panel(
        table,
        title=f"[bold cyan] History — last {len(runs)} run(s) [/bold cyan]",
        subtitle=f"[dim]{path}[/dim]",
        border_style=BORDER,
        box=ROUNDED,
    )
    log_console.print(panel)

Config

User configuration loader — defaults, proxy providers, and YAML read/write.

python_reddit_scraper.config

Load user configuration from ~/.config/python_reddit_scraper/config.yaml.

Provider dataclass

One proxy provider block from the config file.

accounts holds the provider-specific raw dicts (shape differs per provider — see scraper.proxy_handler for how each is parsed).

Source code in src/python_reddit_scraper/config.py
@dataclass(frozen=True)
class Provider:
    """One proxy provider block from the config file.

    ``accounts`` holds the provider-specific raw dicts (shape differs per
    provider — see ``scraper.proxy_handler`` for how each is parsed).
    """

    name: str
    accounts: list[dict]

Defaults dataclass

User-configured defaults for the download command.

Each field is None when not configured, meaning the CLI should fall back to prompting the user (or to its built-in default).

Source code in src/python_reddit_scraper/config.py
@dataclass(frozen=True)
class Defaults:
    """User-configured defaults for the download command.

    Each field is ``None`` when not configured, meaning the CLI should fall
    back to prompting the user (or to its built-in default).
    """

    media_types: tuple[str, ...] | None = None
    output_dir: str | None = None
    max_pages: int | None = None
    workers: int | None = None
    scrape_workers: int | None = None

get_providers()

Return all configured proxy providers in the order they appear in YAML.

Source code in src/python_reddit_scraper/config.py
def get_providers() -> list[Provider]:
    """Return all configured proxy providers in the order they appear in YAML."""
    cfg = load_config()
    return [
        Provider(name=p["name"], accounts=list(p.get("accounts") or []))
        for p in (cfg.get("providers") or [])
    ]

get_defaults()

Return user-configured defaults from the defaults: block, if any.

Source code in src/python_reddit_scraper/config.py
def get_defaults() -> Defaults:
    """Return user-configured defaults from the ``defaults:`` block, if any."""
    cfg = load_config()
    raw = cfg.get("defaults") or {}

    media_types: tuple[str, ...] | None = None
    raw_mt = raw.get("media_types")
    if raw_mt is not None:
        if isinstance(raw_mt, str):
            raw_mt = [raw_mt]
        valid = [m for m in raw_mt if m in ALL_MEDIA_TYPES]
        invalid = [m for m in raw_mt if m not in ALL_MEDIA_TYPES]
        if invalid:
            logger.warning(
                "Ignoring unknown media_types in config: {}. Valid values: {}.",
                ", ".join(invalid),
                ", ".join(sorted(ALL_MEDIA_TYPES)),
            )
        media_types = tuple(valid) if valid else None

    output_dir = raw.get("output_dir")
    if output_dir is not None and not isinstance(output_dir, str):
        logger.warning("Ignoring non-string output_dir in config.")
        output_dir = None

    def _positive_int(key: str) -> int | None:
        val = raw.get(key)
        if val is None:
            return None
        try:
            parsed = int(val)
            if parsed <= 0:
                raise ValueError
        except (TypeError, ValueError):
            logger.warning("Ignoring invalid {} in config: {!r}", key, val)
            return None
        return parsed

    max_pages = _positive_int("max_pages")
    workers = _positive_int("workers")
    scrape_workers = _positive_int("scrape_workers")

    return Defaults(
        media_types=media_types,
        output_dir=output_dir,
        max_pages=max_pages,
        workers=workers,
        scrape_workers=scrape_workers,
    )

save_defaults(defaults)

Merge defaults into the config file, preserving other top-level keys.

Only fields that are not None are written, so partial configs stay partial. Returns the path that was written.

Source code in src/python_reddit_scraper/config.py
def save_defaults(defaults: Defaults) -> Path:
    """Merge *defaults* into the config file, preserving other top-level keys.

    Only fields that are not ``None`` are written, so partial configs stay
    partial. Returns the path that was written.
    """
    _CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
    cfg = load_config()

    block = dict(cfg.get("defaults") or {})
    if defaults.media_types is not None:
        block["media_types"] = list(defaults.media_types)
    if defaults.output_dir is not None:
        block["output_dir"] = defaults.output_dir
    if defaults.max_pages is not None:
        block["max_pages"] = defaults.max_pages
    if defaults.workers is not None:
        block["workers"] = defaults.workers
    if defaults.scrape_workers is not None:
        block["scrape_workers"] = defaults.scrape_workers
    cfg["defaults"] = block

    with _CONFIG_PATH.open("w") as f:
        yaml.safe_dump(cfg, f, sort_keys=False)
    return _CONFIG_PATH