Skip to content

API Reference

Auto-generated documentation from source code docstrings.

Scraper

Core

The core scraper handles fetching Reddit JSON data via a stealth browser.

python_reddit_scraper.scraper.core

Camoufox-based Reddit JSON scraper with pagination.

Navigates old.reddit.com JSON API using a stealth Firefox browser, follows pagination tokens, and returns raw post data.

scrape_subreddit(browser, subreddit, max_pages=50, delay=1.5, quiet=False)

Scrape a subreddit's posts via Reddit's old JSON API.

Parameters:

Name Type Description Default
browser

A camoufox Browser instance (from sync context manager).

required
subreddit str

Subreddit name (without r/ prefix).

required
max_pages int

Maximum number of pages to fetch (100 posts per page).

50
delay float

Seconds to wait between page requests.

1.5
quiet bool

If True, suppress all progress output.

False

Returns:

Type Description
list[dict]

List of post data dicts (the 'data' field of each child).

Source code in src/python_reddit_scraper/scraper/core.py
def scrape_subreddit(
    browser,
    subreddit: str,
    max_pages: int = 50,
    delay: float = 1.5,
    quiet: bool = False,
) -> list[dict]:
    """
    Scrape a subreddit's posts via Reddit's old JSON API.

    Args:
        browser: A camoufox Browser instance (from sync context manager).
        subreddit: Subreddit name (without r/ prefix).
        max_pages: Maximum number of pages to fetch (100 posts per page).
        delay: Seconds to wait between page requests.
        quiet: If True, suppress all progress output.

    Returns:
        List of post data dicts (the 'data' field of each child).
    """
    posts: list[dict] = []
    after: str | None = None
    base_url = f"https://old.reddit.com/r/{subreddit}.json?limit=100&raw_json=1"

    page = browser.new_page()
    progress = (
        None
        if quiet
        else Progress(
            SpinnerColumn(),
            TextColumn(f"r/{subreddit}"),
            BarColumn(),
            MofNCompleteColumn(),
            TextColumn("pages"),
            TimeElapsedColumn(),
            TextColumn("{task.fields[postfix]}"),
        )
    )
    task_id = None
    if progress is not None:
        progress.start()
        task_id = progress.add_task(f"r/{subreddit}", total=max_pages, postfix="")

    try:
        for page_num in range(max_pages):
            url = base_url
            if after:
                url += f"&after={after}"

            data = _fetch_json_page(page, url, subreddit)
            if data is None:
                break

            children = data.get("data", {}).get("children", [])
            if not children:
                break

            for child in children:
                if "data" in child:
                    posts.append(child["data"])

            if progress is not None and task_id is not None:
                progress.update(task_id, advance=1, postfix=f"{len(posts)} posts")

            after = data.get("data", {}).get("after")
            if after is None:
                break

            if page_num < max_pages - 1:
                time.sleep(delay)
    finally:
        if progress is not None:
            progress.stop()
        page.close()

    return posts

Parallel

Multi-process parallel scraping of multiple subreddits.

python_reddit_scraper.scraper.parallel

Parallel multi-process scraping of multiple subreddits.

scrape_worker(subreddit, max_pages=50, delay=1.5, quiet=False)

Standalone scrape function for use with ProcessPoolExecutor.

Each call creates its own Camoufox browser instance (Playwright sync API is not thread-safe, so each process must have its own browser).

Parameters:

Name Type Description Default
subreddit str

Subreddit name (without r/ prefix).

required
max_pages int

Maximum pages to fetch.

50
delay float

Seconds between page requests.

1.5
quiet bool

Suppress per-subreddit progress output.

False

Returns:

Type Description
tuple[str, list[dict]]

Tuple of (subreddit_name, list_of_post_dicts).

Source code in src/python_reddit_scraper/scraper/parallel.py
def scrape_worker(
    subreddit: str, max_pages: int = 50, delay: float = 1.5, quiet: bool = False
) -> tuple[str, list[dict]]:
    """
    Standalone scrape function for use with ProcessPoolExecutor.

    Each call creates its own Camoufox browser instance (Playwright sync API
    is not thread-safe, so each process must have its own browser).

    Args:
        subreddit: Subreddit name (without r/ prefix).
        max_pages: Maximum pages to fetch.
        delay: Seconds between page requests.
        quiet: Suppress per-subreddit progress output.

    Returns:
        Tuple of (subreddit_name, list_of_post_dicts).
    """
    from camoufox.sync_api import Camoufox

    from python_reddit_scraper.scraper.core import scrape_subreddit

    with Camoufox(headless=True) as browser:
        posts = scrape_subreddit(browser, subreddit, max_pages=max_pages, delay=delay, quiet=quiet)
    return subreddit, posts

scrape_parallel(subreddits, max_pages=50, delay=1.5, max_workers=4, on_complete=None, progress=None)

Scrape multiple subreddits in parallel using separate processes.

Each process gets its own Camoufox browser instance. Results are returned as a dict keyed by subreddit name. An optional callback is invoked as each subreddit finishes (useful for queueing downloads).

Parameters:

Name Type Description Default
subreddits list[str]

List of subreddit names.

required
max_pages int

Max pages per subreddit.

50
delay float

Seconds between page requests per scraper.

1.5
max_workers int

Maximum concurrent scraper processes.

4
on_complete

Optional callback (sub: str, posts: list[dict]) -> None invoked as each subreddit finishes scraping.

None
progress ProgressDisplay | None

Optional shared :class:ProgressDisplay for the scraping bar.

None

Returns:

Type Description
dict[str, list[dict]]

Dict mapping subreddit name to its list of post dicts.

Source code in src/python_reddit_scraper/scraper/parallel.py
def scrape_parallel(
    subreddits: list[str],
    max_pages: int = 50,
    delay: float = 1.5,
    max_workers: int = 4,
    on_complete=None,
    progress: ProgressDisplay | None = None,
) -> dict[str, list[dict]]:
    """
    Scrape multiple subreddits in parallel using separate processes.

    Each process gets its own Camoufox browser instance. Results are returned
    as a dict keyed by subreddit name. An optional callback is invoked as each
    subreddit finishes (useful for queueing downloads).

    Args:
        subreddits: List of subreddit names.
        max_pages: Max pages per subreddit.
        delay: Seconds between page requests per scraper.
        max_workers: Maximum concurrent scraper processes.
        on_complete: Optional callback ``(sub: str, posts: list[dict]) -> None``
            invoked as each subreddit finishes scraping.
        progress: Optional shared :class:`ProgressDisplay` for the scraping bar.

    Returns:
        Dict mapping subreddit name to its list of post dicts.
    """
    results: dict[str, list[dict]] = {}
    n_workers = min(len(subreddits), max_workers)
    quiet = progress is not None

    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        future_to_sub: dict = {}
        for sub in subreddits:
            future = executor.submit(scrape_worker, sub, max_pages, delay, quiet=quiet)
            future_to_sub[future] = sub
            if progress:
                progress.mark_scrape_started(sub)

        for future in as_completed(future_to_sub):
            sub = future_to_sub[future]
            try:
                _, posts = future.result()
                results[sub] = posts
                logger.info("r/{}: {} posts collected", sub, len(posts))
                if progress:
                    progress.mark_scrape_done(sub)
                if on_complete:
                    on_complete(sub, posts)
            except Exception as exc:
                logger.error("r/{}: scraping failed -- {}", sub, exc)
                results[sub] = []
                if progress:
                    progress.mark_scrape_done(sub)

    return results

JSON I/O

JSON file reading and writing for scraped data.

python_reddit_scraper.scraper.json_io

JSON I/O for scraped Reddit data.

save_scraped_json(posts, subreddit, output_dir='./input')

Save scraped posts to a JSON file compatible with the existing parser.

Wraps posts in Reddit's listing format so parse_json_files() can read them. Returns the path to the saved file.

Source code in src/python_reddit_scraper/scraper/json_io.py
def save_scraped_json(
    posts: list[dict],
    subreddit: str,
    output_dir: str = "./input",
) -> str:
    """
    Save scraped posts to a JSON file compatible with the existing parser.

    Wraps posts in Reddit's listing format so parse_json_files() can read them.
    Returns the path to the saved file.
    """
    out_path = Path(output_dir) / subreddit
    out_path.mkdir(parents=True, exist_ok=True)

    listing = {
        "kind": "Listing",
        "data": {
            "children": [{"kind": "t3", "data": post} for post in posts],
            "after": None,
        },
    }

    filepath = out_path / "scraped.json"
    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(listing, f, ensure_ascii=False, indent=2)

    return str(filepath)

parse_json_files(input_dir)

Parse all JSON files in input directory and extract posts.

Source code in src/python_reddit_scraper/scraper/json_io.py
def parse_json_files(input_dir: str) -> list[dict]:
    """Parse all JSON files in input directory and extract posts."""
    posts: list[dict] = []
    input_path = Path(input_dir)

    if not input_path.exists():
        logger.error("Input directory {} does not exist!", input_dir)
        return posts

    json_files = sorted(set(list(input_path.glob("*.json")) + list(input_path.glob("**/*.json"))))
    logger.info("Found {} JSON files", len(json_files))

    for json_file in json_files:
        try:
            with open(json_file, encoding="utf-8") as f:
                data = json.load(f)

            if isinstance(data, dict) and "data" in data and "children" in data["data"]:
                for child in data["data"]["children"]:
                    if "data" in child:
                        posts.append(child["data"])
            elif isinstance(data, dict) and "data" in data:
                posts.append(data["data"])
            elif isinstance(data, list):
                for item in data:
                    if isinstance(item, dict) and "data" in item:
                        if "children" in item["data"]:
                            for child in item["data"]["children"]:
                                if "data" in child:
                                    posts.append(child["data"])
                        else:
                            posts.append(item["data"])

        except Exception as e:
            logger.error("Error parsing {}: {}", json_file, e)

    return posts

Downloader

Media

Media URL extraction, type detection, and filtering.

python_reddit_scraper.downloader.media

Media URL extraction, type detection, and filtering.

sanitize_filename(text, max_length=100)

Convert text to a safe filename.

Source code in src/python_reddit_scraper/downloader/media.py
def sanitize_filename(text: str, max_length: int = 100) -> str:
    """Convert text to a safe filename."""
    text = re.sub(r"[\s\n\r\t]+", " ", text).strip()
    text = re.sub(r"[^\w\-_.()\[\]{} ]", "", text)
    if len(text) > max_length:
        text = text[: max_length - 3] + "..."
    return text or "untitled"

get_file_extension(url)

Extract file extension from URL.

Source code in src/python_reddit_scraper/downloader/media.py
def get_file_extension(url: str) -> str:
    """Extract file extension from URL."""
    parsed = urlparse(url)
    path = parsed.path.lower()
    if "." in path:
        ext = path.split(".")[-1]
        if ext in ["jpg", "jpeg", "png", "gif", "webp", "mp4", "webm", "mov"]:
            return f".{ext}"
    return ".bin"

get_media_type(filename)

Determine media type from filename for directory sorting.

Source code in src/python_reddit_scraper/downloader/media.py
def get_media_type(filename: str) -> str:
    """Determine media type from filename for directory sorting."""
    ext = Path(filename).suffix.lower()
    if ext in [".jpg", ".jpeg", ".png", ".webp"]:
        return "images"
    elif ext in [".gif"]:
        return "gifs"
    elif ext in [".mp4", ".webm", ".mov"]:
        return "videos"
    else:
        return "other"

is_media_url(url)

Check if URL points to a media file.

Source code in src/python_reddit_scraper/downloader/media.py
def is_media_url(url: str) -> bool:
    """Check if URL points to a media file."""
    url_lower = url.lower()
    return any(
        ext in url_lower
        for ext in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".mp4", ".webm", ".mov"]
    )

extract_media_urls(post_data)

Extract all media URLs from a Reddit post at highest resolution.

Source code in src/python_reddit_scraper/downloader/media.py
def extract_media_urls(post_data: dict) -> list[dict[str, str]]:
    """Extract all media URLs from a Reddit post at highest resolution."""
    if post_data is None or not isinstance(post_data, dict):
        return []

    media_urls: list[dict[str, str]] = []
    post_id = post_data.get("id", "unknown")
    title = post_data.get("title", "")
    safe_title = sanitize_filename(title)

    # 1. Direct media URL (highest priority)
    direct_url = post_data.get("url_overridden_by_dest")
    if direct_url and is_media_url(direct_url):
        media_urls.append(
            {
                "url": direct_url.replace("&amp;", "&"),
                "filename": f"{post_id}_{safe_title}{get_file_extension(direct_url)}",
            }
        )
        return media_urls

    # 2. Gallery posts
    if post_data.get("is_gallery") and post_data.get("media_metadata"):
        gallery_data = post_data.get("gallery_data") or {}
        gallery_items = gallery_data.get("items", [])
        media_metadata = post_data["media_metadata"]

        for i, item in enumerate(gallery_items):
            media_id = item.get("media_id")
            if media_id and media_id in media_metadata:
                meta = media_metadata[media_id]
                if "s" in meta and "u" in meta["s"]:
                    url = meta["s"]["u"].replace("&amp;", "&")
                    media_urls.append(
                        {
                            "url": url,
                            "filename": f"{post_id}_{safe_title}_{i + 1}{get_file_extension(url)}",
                        }
                    )

    # 3. Reddit-hosted videos (media.reddit_video)
    if post_data.get("is_video") or post_data.get("media"):
        media = post_data.get("media") or post_data.get("secure_media")
        if media and isinstance(media, dict) and "reddit_video" in media:
            video = media["reddit_video"]
            if "fallback_url" in video:
                video_url = video["fallback_url"]
                media_urls.append(
                    {"url": video_url, "filename": f"{post_id}_{safe_title}_video.mp4"}
                )
                media_urls.extend(_build_audio_entries(video_url, post_id, safe_title))

    # 4. Reddit video preview (embedded videos from redgifs, external hosts, etc.)
    preview = post_data.get("preview", {})
    if isinstance(preview, dict):
        rvp = preview.get("reddit_video_preview")
        if rvp and isinstance(rvp, dict) and "fallback_url" in rvp:
            video_url = rvp["fallback_url"]
            existing_video_urls = {m["url"] for m in media_urls}
            if video_url not in existing_video_urls:
                media_urls.append(
                    {"url": video_url, "filename": f"{post_id}_{safe_title}_video.mp4"}
                )

    # 5. Crossposted videos -- check parent post for video data
    crosspost_list = post_data.get("crosspost_parent_list")
    if crosspost_list and isinstance(crosspost_list, list):
        for cp in crosspost_list:
            if not isinstance(cp, dict):
                continue
            cp_media = cp.get("media") or cp.get("secure_media")
            if cp_media and isinstance(cp_media, dict) and "reddit_video" in cp_media:
                video = cp_media["reddit_video"]
                if "fallback_url" in video:
                    video_url = video["fallback_url"]
                    existing_video_urls = {m["url"] for m in media_urls}
                    if video_url not in existing_video_urls:
                        media_urls.append(
                            {
                                "url": video_url,
                                "filename": f"{post_id}_{safe_title}_video.mp4",
                            }
                        )
                        media_urls.extend(_build_audio_entries(video_url, post_id, safe_title))

    # 6. Preview images/GIFs
    if isinstance(preview, dict) and "images" in preview and preview["images"]:
        image_data = preview["images"][0]

        variants = image_data.get("variants", {})
        if "gif" in variants and "source" in variants["gif"]:
            gif_url = variants["gif"]["source"]["url"].replace("&amp;", "&")
            media_urls.append({"url": gif_url, "filename": f"{post_id}_{safe_title}_preview.gif"})
        elif "mp4" in variants and "source" in variants["mp4"]:
            mp4_url = variants["mp4"]["source"]["url"].replace("&amp;", "&")
            media_urls.append({"url": mp4_url, "filename": f"{post_id}_{safe_title}_preview.mp4"})
        elif "source" in image_data:
            has_video = any(get_media_type(m["filename"]) == "videos" for m in media_urls)
            if not has_video:
                img_url = image_data["source"]["url"].replace("&amp;", "&")
                media_urls.append(
                    {
                        "url": img_url,
                        "filename": f"{post_id}_{safe_title}_preview{get_file_extension(img_url)}",
                    }
                )

    # 7. Handle gifv links (convert to mp4)
    if direct_url and direct_url.endswith(".gifv"):
        mp4_url = direct_url[:-5] + ".mp4"
        media_urls.append({"url": mp4_url, "filename": f"{post_id}_{safe_title}.mp4"})

    # 8. Redgifs/external oembed thumbnail as last resort
    if not media_urls:
        media = post_data.get("media") or post_data.get("secure_media")
        if media and isinstance(media, dict):
            oembed = media.get("oembed")
            if oembed and isinstance(oembed, dict):
                thumb = oembed.get("thumbnail_url")
                if thumb and is_media_url(thumb):
                    media_urls.append(
                        {
                            "url": thumb.replace("&amp;", "&"),
                            "filename": f"{post_id}_{safe_title}_thumb{get_file_extension(thumb)}",
                        }
                    )

    return media_urls

extract_all_media(posts)

Extract all media URLs from a list of posts, deduplicating by URL.

Returns list of dicts with 'url', 'filename', and 'subreddit' keys.

Source code in src/python_reddit_scraper/downloader/media.py
def extract_all_media(posts: list[dict]) -> list[dict[str, str]]:
    """
    Extract all media URLs from a list of posts, deduplicating by URL.

    Returns list of dicts with 'url', 'filename', and 'subreddit' keys.
    """
    all_media: list[dict[str, str]] = []
    seen_urls: set[str] = set()

    for post in posts:
        if post is None or not isinstance(post, dict):
            continue
        subreddit = post.get("subreddit", "unknown")
        media_urls = extract_media_urls(post)
        for media in media_urls:
            url = media["url"]
            if url not in seen_urls:
                seen_urls.add(url)
                media["subreddit"] = subreddit
                all_media.append(media)

    return all_media

filter_by_media_type(downloads, video_only=False, image_only=False)

Filter media list by type.

Parameters:

Name Type Description Default
downloads list[dict[str, str]]

List of dicts with 'url' and 'filename' keys.

required
video_only bool

Keep only videos + gifs (animations).

False
image_only bool

Keep only images.

False

Returns:

Type Description
list[dict[str, str]]

Filtered list.

Source code in src/python_reddit_scraper/downloader/media.py
def filter_by_media_type(
    downloads: list[dict[str, str]],
    video_only: bool = False,
    image_only: bool = False,
) -> list[dict[str, str]]:
    """
    Filter media list by type.

    Args:
        downloads: List of dicts with 'url' and 'filename' keys.
        video_only: Keep only videos + gifs (animations).
        image_only: Keep only images.

    Returns:
        Filtered list.
    """
    if not video_only and not image_only:
        return downloads

    filtered = []
    for item in downloads:
        media_type = get_media_type(item["filename"])
        if (video_only and media_type in ("videos", "gifs")) or (
            image_only and media_type == "images"
        ):
            filtered.append(item)

    return filtered

Engine

Concurrent file downloading with progress tracking.

python_reddit_scraper.downloader.engine

Download engine: concurrent file downloading with progress tracking.

download_file(url, filepath, *, fallback_urls=None)

Download a file from URL to filepath with retries.

Returns:

Type Description
bool

(True, "") on success, or (False, reason) on failure.

str

reason is a short label like "http_403" or "timeout".

Source code in src/python_reddit_scraper/downloader/engine.py
def download_file(
    url: str,
    filepath: str,
    *,
    fallback_urls: list[str] | None = None,
) -> tuple[bool, str]:
    """Download a file from URL to filepath with retries.

    Returns:
        ``(True, "")`` on success, or ``(False, reason)`` on failure.
        *reason* is a short label like ``"http_403"`` or ``"timeout"``.
    """
    all_urls = [url] + (fallback_urls or [])

    for candidate_url in all_urls:
        for attempt in range(_MAX_RETRIES):
            try:
                _fetch_url(candidate_url, filepath)
                return True, ""
            except HTTPError as exc:
                code = exc.code
                reason = f"http_{code}"
                if code in _PERMANENT_CODES:
                    break  # try next fallback URL, don't retry this one
                if attempt < _MAX_RETRIES - 1:
                    time.sleep(_BACKOFF_BASE**attempt)
                    continue
            except (URLError, TimeoutError, OSError) as exc:
                reason = "timeout" if "timed out" in str(exc) else "connection_error"
                if attempt < _MAX_RETRIES - 1:
                    time.sleep(_BACKOFF_BASE**attempt)
                    continue
            except Exception as exc:
                reason = f"error_{type(exc).__name__}"
                break

    return False, reason

download_all(downloads, output_dir, workers=16, on_file_done=None, on_file_failed=None, progress=None)

Download all media files concurrently.

Parameters:

Name Type Description Default
downloads list[dict[str, str]]

List of dicts with 'url', 'filename', optionally 'subreddit', 'optional', and 'audio_fallbacks' keys.

required
output_dir str

Base output directory (files sorted into subdirectories).

required
workers int

Number of parallel download threads.

16
on_file_done

Optional callback (url: str) -> None on success.

None
on_file_failed

Optional callback (url: str, reason: str, permanent: bool) -> None.

None
progress ProgressDisplay | None

Optional shared :class:ProgressDisplay instance. When None (standalone / resume mode), a local rich.progress.Progress bar is used as a fallback.

None

Returns:

Type Description
int

Tuple of (successful, failed, error_counts) where error_counts

int

is a :class:~collections.Counter mapping reason labels to counts.

Source code in src/python_reddit_scraper/downloader/engine.py
def download_all(
    downloads: list[dict[str, str]],
    output_dir: str,
    workers: int = 16,
    on_file_done=None,
    on_file_failed=None,
    progress: ProgressDisplay | None = None,
) -> tuple[int, int, Counter]:
    """Download all media files concurrently.

    Args:
        downloads: List of dicts with 'url', 'filename', optionally 'subreddit',
            'optional', and 'audio_fallbacks' keys.
        output_dir: Base output directory (files sorted into subdirectories).
        workers: Number of parallel download threads.
        on_file_done: Optional callback ``(url: str) -> None`` on success.
        on_file_failed: Optional callback ``(url: str, reason: str, permanent: bool) -> None``.
        progress: Optional shared :class:`ProgressDisplay` instance. When *None*
            (standalone / resume mode), a local ``rich.progress.Progress`` bar is
            used as a fallback.

    Returns:
        Tuple of ``(successful, failed, error_counts)`` where *error_counts*
        is a :class:`~collections.Counter` mapping reason labels to counts.
    """
    from concurrent.futures import ThreadPoolExecutor, as_completed

    # Build (url, filepath, fallback_urls, optional) tuples
    download_items: list[tuple[str, str, list[str], bool]] = []
    for media in downloads:
        media_type = get_media_type(media["filename"])
        subreddit = media.get("subreddit")
        if subreddit:
            filepath = os.path.join(output_dir, subreddit, media_type, media["filename"])
        else:
            filepath = os.path.join(output_dir, media_type, media["filename"])

        fallbacks = (
            media.get("audio_fallbacks", "").split("|") if media.get("audio_fallbacks") else []
        )
        optional = media.get("optional") == "true"
        download_items.append((media["url"], filepath, fallbacks, optional))

    if not download_items:
        return 0, 0, Counter()

    seen_dirs: set[str] = set()
    for _, filepath, _, _ in download_items:
        d = os.path.dirname(filepath)
        if d not in seen_dirs:
            seen_dirs.add(d)
            Path(d).mkdir(parents=True, exist_ok=True)

    successful = 0
    failed = 0
    skipped_optional = 0
    error_counts: Counter = Counter()

    # Fallback: local rich progress bar when no shared ProgressDisplay is provided
    local_progress = None
    local_task_id = None
    if progress is None:
        import rich.progress

        local_progress = rich.progress.Progress(
            rich.progress.TextColumn("[bold blue]{task.description}"),
            rich.progress.BarColumn(),
            rich.progress.MofNCompleteColumn(),
            rich.progress.TimeElapsedColumn(),
            rich.progress.TransferSpeedColumn(),
        )
        local_progress.start()
        local_task_id = local_progress.add_task("Downloading", total=len(download_items))

    try:
        with ThreadPoolExecutor(max_workers=workers) as executor:
            future_map = {
                executor.submit(
                    download_file,
                    url,
                    filepath,
                    fallback_urls=fallbacks,
                ): (url, filepath, optional)
                for url, filepath, fallbacks, optional in download_items
            }
            for future in as_completed(future_map):
                url, filepath, is_optional = future_map[future]
                success, reason = future.result()
                if success:
                    successful += 1
                    if on_file_done:
                        on_file_done(url)
                else:
                    permanent = (
                        reason.startswith("http_") and int(reason.split("_")[1]) in _PERMANENT_CODES
                    )
                    if is_optional and permanent:
                        skipped_optional += 1
                    else:
                        failed += 1
                    error_counts[reason] += 1
                    if on_file_failed:
                        on_file_failed(url, reason, permanent)

                if progress is not None:
                    progress.advance_download()
                elif local_progress is not None and local_task_id is not None:
                    local_progress.advance(local_task_id)
    finally:
        if local_progress is not None:
            local_progress.stop()

    if skipped_optional:
        logger.info(
            "Skipped {} optional files (audio tracks blocked by Reddit CDN)",
            skipped_optional,
        )
    if error_counts:
        summary = ", ".join(f"{c}x {r}" for r, c in error_counts.most_common())
        logger.warning("Download errors: {}", summary)

    return successful, failed, error_counts

run_download_queue(download_q, output_dir, workers, video_only, image_only, state=None, progress=None)

Consumer thread: pulls (subreddit, posts) from queue, downloads one sub at a time.

Returns cumulative (successful, failed) counts.

Source code in src/python_reddit_scraper/downloader/engine.py
def run_download_queue(
    download_q: queue.Queue[tuple[str, list[dict]] | None],
    output_dir: str,
    workers: int,
    video_only: bool,
    image_only: bool,
    state=None,
    progress: ProgressDisplay | None = None,
) -> tuple[int, int]:
    """Consumer thread: pulls (subreddit, posts) from queue, downloads one sub at a time.

    Returns cumulative (successful, failed) counts.
    """
    total_ok = 0
    total_fail = 0

    while True:
        item = download_q.get()
        if item is None:
            break
        sub, posts = item

        media = extract_all_media(posts)
        media = filter_by_media_type(media, video_only=video_only, image_only=image_only)
        if not media:
            logger.info("r/{}: no media after filtering", sub)
            download_q.task_done()
            continue

        if state:
            state.set_media_manifest(state.media + media)

        if progress is not None:
            progress.init_download(total_files=len(media), sub=sub, queued=download_q.qsize())

        logger.info("r/{}: downloading {} files...", sub, len(media))
        ok, fail, _errors = download_all(
            media,
            output_dir,
            workers=workers,
            on_file_done=state.mark_downloaded if state else None,
            on_file_failed=state.mark_permanently_failed if state else None,
            progress=progress,
        )
        total_ok += ok
        total_fail += fail
        logger.info("r/{}: {} downloaded, {} failed", sub, ok, fail)
        download_q.task_done()

    return total_ok, total_fail

Session State

The state module manages resume/session persistence.

python_reddit_scraper.downloader.state

Session state management for resume support.

Persists scraping progress and download manifests to .scraper-state/ so interrupted runs can be resumed with --resume.

SessionState

Manages persistent state for a single scrape+download session.

State is saved to a JSON file in .scraper-state/{timestamp}.json. The file tracks which subreddits have been scraped, the full media manifest, and which files have been successfully downloaded.

Parameters:

Name Type Description Default
output_dir str

The download output directory for this session.

required
video_only bool

Whether --video-only filter is active.

False
image_only bool

Whether --image-only filter is active.

False
state_path str | None

Explicit path to a state file (used when resuming).

None
Source code in src/python_reddit_scraper/downloader/state.py
class SessionState:
    """
    Manages persistent state for a single scrape+download session.

    State is saved to a JSON file in ``.scraper-state/{timestamp}.json``.
    The file tracks which subreddits have been scraped, the full media
    manifest, and which files have been successfully downloaded.

    Args:
        output_dir: The download output directory for this session.
        video_only: Whether ``--video-only`` filter is active.
        image_only: Whether ``--image-only`` filter is active.
        state_path: Explicit path to a state file (used when resuming).
    """

    def __init__(
        self,
        output_dir: str,
        video_only: bool = False,
        image_only: bool = False,
        state_path: str | None = None,
    ):
        self.output_dir = output_dir
        self.video_only = video_only
        self.image_only = image_only
        self.subreddits: dict[str, str] = {}
        self.media: list[dict] = []
        self._lock = threading.Lock()
        self._dirty_count = 0

        if state_path:
            self.state_path = state_path
        else:
            Path(STATE_DIR).mkdir(parents=True, exist_ok=True)
            ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            self.state_path = os.path.join(STATE_DIR, f"{ts}.json")

    def _to_dict(self) -> dict:
        return {
            "output_dir": self.output_dir,
            "filters": {
                "video_only": self.video_only,
                "image_only": self.image_only,
            },
            "subreddits": self.subreddits,
            "media": self.media,
        }

    def save(self) -> None:
        """Write current state to disk atomically."""
        tmp = self.state_path + ".tmp"
        with open(tmp, "w", encoding="utf-8") as f:
            json.dump(self._to_dict(), f, ensure_ascii=False)
        os.replace(tmp, self.state_path)
        self._dirty_count = 0

    @classmethod
    def load(cls, path: str) -> "SessionState":
        """
        Load a session state from a JSON file.

        Args:
            path: Path to the state JSON file.

        Returns:
            A populated SessionState instance.
        """
        with open(path, encoding="utf-8") as f:
            data = json.load(f)

        filters = data.get("filters", {})
        state = cls(
            output_dir=data["output_dir"],
            video_only=filters.get("video_only", False),
            image_only=filters.get("image_only", False),
            state_path=path,
        )
        state.subreddits = data.get("subreddits", {})
        state.media = data.get("media", [])
        return state

    @classmethod
    def find_latest(cls) -> str | None:
        """
        Find the most recent state file in the state directory.

        Returns:
            Path to the newest state file, or None if none exist.
        """
        state_dir = Path(STATE_DIR)
        if not state_dir.exists():
            return None
        files = sorted(state_dir.glob("*.json"), reverse=True)
        return str(files[0]) if files else None

    def mark_subreddit_scraped(self, sub: str) -> None:
        """Mark a subreddit as having been fully scraped."""
        self.subreddits[sub] = "scraped"

    def set_media_manifest(self, media_list: list[dict]) -> None:
        """
        Set the full media manifest (list of files to download).

        Each item should have ``url``, ``filename``, ``subreddit`` keys.
        A ``downloaded`` field is added and defaults to ``False``.
        """
        self.media = [{**m, "downloaded": m.get("downloaded", False)} for m in media_list]
        self.save()

    def mark_downloaded(self, url: str, batch_size: int = 50) -> None:
        """Mark a media URL as successfully downloaded.

        State is flushed to disk every ``batch_size`` completions for performance.

        Args:
            url: The URL that was downloaded.
            batch_size: How often to flush state to disk.
        """
        with self._lock:
            for item in self.media:
                if item["url"] == url:
                    item["downloaded"] = True
                    break
            self._dirty_count += 1
            if self._dirty_count >= batch_size:
                self.save()

    def mark_permanently_failed(self, url: str, reason: str, permanent: bool) -> None:
        """Mark a media URL as permanently failed (e.g. HTTP 403/404).

        These items will be skipped on future resume attempts.
        Only stores permanent failures; transient ones can be retried.
        """
        if not permanent:
            return
        with self._lock:
            for item in self.media:
                if item["url"] == url:
                    item["failed"] = True
                    item["fail_reason"] = reason
                    break
            self._dirty_count += 1
            if self._dirty_count >= 50:
                self.save()

    def get_pending_media(self) -> list[dict]:
        """Get media items that have not yet been downloaded.

        Also checks whether the file already exists on disk (handles
        the case where the file was downloaded but state wasn't saved).
        Permanently failed items (HTTP 403/404) are skipped.

        Returns:
            List of media dicts that still need downloading.
        """
        from python_reddit_scraper.downloader.media import get_media_type

        pending = []
        for item in self.media:
            if item.get("downloaded"):
                continue
            if item.get("failed"):
                continue
            media_type = get_media_type(item["filename"])
            sub = item.get("subreddit")
            if sub:
                filepath = os.path.join(self.output_dir, sub, media_type, item["filename"])
            else:
                filepath = os.path.join(self.output_dir, media_type, item["filename"])
            if os.path.exists(filepath):
                item["downloaded"] = True
                continue
            pending.append(item)
        return pending

    def flush_and_cleanup(self) -> None:
        """Save final state and remove the state file on completion."""
        import contextlib

        self.save()
        with contextlib.suppress(OSError):
            os.remove(self.state_path)
        with contextlib.suppress(OSError):
            os.rmdir(STATE_DIR)

save()

Write current state to disk atomically.

Source code in src/python_reddit_scraper/downloader/state.py
def save(self) -> None:
    """Write current state to disk atomically."""
    tmp = self.state_path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(self._to_dict(), f, ensure_ascii=False)
    os.replace(tmp, self.state_path)
    self._dirty_count = 0

load(path) classmethod

Load a session state from a JSON file.

Parameters:

Name Type Description Default
path str

Path to the state JSON file.

required

Returns:

Type Description
SessionState

A populated SessionState instance.

Source code in src/python_reddit_scraper/downloader/state.py
@classmethod
def load(cls, path: str) -> "SessionState":
    """
    Load a session state from a JSON file.

    Args:
        path: Path to the state JSON file.

    Returns:
        A populated SessionState instance.
    """
    with open(path, encoding="utf-8") as f:
        data = json.load(f)

    filters = data.get("filters", {})
    state = cls(
        output_dir=data["output_dir"],
        video_only=filters.get("video_only", False),
        image_only=filters.get("image_only", False),
        state_path=path,
    )
    state.subreddits = data.get("subreddits", {})
    state.media = data.get("media", [])
    return state

find_latest() classmethod

Find the most recent state file in the state directory.

Returns:

Type Description
str | None

Path to the newest state file, or None if none exist.

Source code in src/python_reddit_scraper/downloader/state.py
@classmethod
def find_latest(cls) -> str | None:
    """
    Find the most recent state file in the state directory.

    Returns:
        Path to the newest state file, or None if none exist.
    """
    state_dir = Path(STATE_DIR)
    if not state_dir.exists():
        return None
    files = sorted(state_dir.glob("*.json"), reverse=True)
    return str(files[0]) if files else None

mark_subreddit_scraped(sub)

Mark a subreddit as having been fully scraped.

Source code in src/python_reddit_scraper/downloader/state.py
def mark_subreddit_scraped(self, sub: str) -> None:
    """Mark a subreddit as having been fully scraped."""
    self.subreddits[sub] = "scraped"

set_media_manifest(media_list)

Set the full media manifest (list of files to download).

Each item should have url, filename, subreddit keys. A downloaded field is added and defaults to False.

Source code in src/python_reddit_scraper/downloader/state.py
def set_media_manifest(self, media_list: list[dict]) -> None:
    """
    Set the full media manifest (list of files to download).

    Each item should have ``url``, ``filename``, ``subreddit`` keys.
    A ``downloaded`` field is added and defaults to ``False``.
    """
    self.media = [{**m, "downloaded": m.get("downloaded", False)} for m in media_list]
    self.save()

mark_downloaded(url, batch_size=50)

Mark a media URL as successfully downloaded.

State is flushed to disk every batch_size completions for performance.

Parameters:

Name Type Description Default
url str

The URL that was downloaded.

required
batch_size int

How often to flush state to disk.

50
Source code in src/python_reddit_scraper/downloader/state.py
def mark_downloaded(self, url: str, batch_size: int = 50) -> None:
    """Mark a media URL as successfully downloaded.

    State is flushed to disk every ``batch_size`` completions for performance.

    Args:
        url: The URL that was downloaded.
        batch_size: How often to flush state to disk.
    """
    with self._lock:
        for item in self.media:
            if item["url"] == url:
                item["downloaded"] = True
                break
        self._dirty_count += 1
        if self._dirty_count >= batch_size:
            self.save()

mark_permanently_failed(url, reason, permanent)

Mark a media URL as permanently failed (e.g. HTTP 403/404).

These items will be skipped on future resume attempts. Only stores permanent failures; transient ones can be retried.

Source code in src/python_reddit_scraper/downloader/state.py
def mark_permanently_failed(self, url: str, reason: str, permanent: bool) -> None:
    """Mark a media URL as permanently failed (e.g. HTTP 403/404).

    These items will be skipped on future resume attempts.
    Only stores permanent failures; transient ones can be retried.
    """
    if not permanent:
        return
    with self._lock:
        for item in self.media:
            if item["url"] == url:
                item["failed"] = True
                item["fail_reason"] = reason
                break
        self._dirty_count += 1
        if self._dirty_count >= 50:
            self.save()

get_pending_media()

Get media items that have not yet been downloaded.

Also checks whether the file already exists on disk (handles the case where the file was downloaded but state wasn't saved). Permanently failed items (HTTP 403/404) are skipped.

Returns:

Type Description
list[dict]

List of media dicts that still need downloading.

Source code in src/python_reddit_scraper/downloader/state.py
def get_pending_media(self) -> list[dict]:
    """Get media items that have not yet been downloaded.

    Also checks whether the file already exists on disk (handles
    the case where the file was downloaded but state wasn't saved).
    Permanently failed items (HTTP 403/404) are skipped.

    Returns:
        List of media dicts that still need downloading.
    """
    from python_reddit_scraper.downloader.media import get_media_type

    pending = []
    for item in self.media:
        if item.get("downloaded"):
            continue
        if item.get("failed"):
            continue
        media_type = get_media_type(item["filename"])
        sub = item.get("subreddit")
        if sub:
            filepath = os.path.join(self.output_dir, sub, media_type, item["filename"])
        else:
            filepath = os.path.join(self.output_dir, media_type, item["filename"])
        if os.path.exists(filepath):
            item["downloaded"] = True
            continue
        pending.append(item)
    return pending

flush_and_cleanup()

Save final state and remove the state file on completion.

Source code in src/python_reddit_scraper/downloader/state.py
def flush_and_cleanup(self) -> None:
    """Save final state and remove the state file on completion."""
    import contextlib

    self.save()
    with contextlib.suppress(OSError):
        os.remove(self.state_path)
    with contextlib.suppress(OSError):
        os.rmdir(STATE_DIR)

CLI

Commands

The CLI commands module provides the main download command.

python_reddit_scraper.cli.commands

CLI commands for the Reddit media downloader.

Handles the main download command and its sub-modes (live scrape, resume, from-json).

download(subreddits=None, output_dir='./redditdownloads', video_only=False, image_only=False, from_json=False, save_json=False, max_pages=50, workers=16, scrape_workers=max(1, (os.cpu_count() or 2) // 2), resume=False, version=False)

Download media from Reddit subreddits.

Source code in src/python_reddit_scraper/cli/commands.py
def download(
    subreddits: Annotated[
        str | None,
        typer.Option(
            "--subreddits",
            "-s",
            help="Comma-separated subreddit names (e.g. 'buildapc,dataengineering').",
        ),
    ] = None,
    output_dir: Annotated[
        str,
        typer.Option(
            "--output-dir",
            "-o",
            help="Base directory for downloaded files. A timestamped subdirectory is created inside.",
        ),
    ] = "./redditdownloads",
    video_only: Annotated[
        bool,
        typer.Option("--video-only", help="Download only videos and GIFs/animations."),
    ] = False,
    image_only: Annotated[
        bool,
        typer.Option("--image-only", help="Download only images."),
    ] = False,
    from_json: Annotated[
        bool,
        typer.Option(
            "--from-json", help="Use existing JSON files in ./input/ instead of scraping."
        ),
    ] = False,
    save_json: Annotated[
        bool,
        typer.Option(
            "--save-json", help="Save scraped JSON to ./input/{subreddit}/ for later reuse."
        ),
    ] = False,
    max_pages: Annotated[
        int,
        typer.Option("--max-pages", help="Max pages to scrape per subreddit (100 posts/page)."),
    ] = 50,
    workers: Annotated[
        int,
        typer.Option("--workers", "-w", help="Number of parallel download threads."),
    ] = 16,
    scrape_workers: Annotated[
        int,
        typer.Option(
            "--scrape-workers",
            "-sw",
            help="Max parallel camoufox scraper processes (default: cpu_count // 2).",
        ),
    ] = max(1, (os.cpu_count() or 2) // 2),
    resume: Annotated[
        bool,
        typer.Option("--resume", help="Resume the most recent interrupted download session."),
    ] = False,
    version: Annotated[
        bool,
        typer.Option(
            "--version",
            "-V",
            help="Show version and exit.",
            callback=_version_callback,
            is_eager=True,
        ),
    ] = False,
) -> None:
    """Download media from Reddit subreddits."""
    if video_only and image_only:
        logger.error("Cannot use --video-only and --image-only together.")
        raise typer.Exit(1)

    if resume:
        _handle_resume(workers)
        return

    if from_json:
        _handle_from_json(video_only, image_only, workers, output_dir)
        return

    check_camoufox_binary()

    if subreddits:
        sub_list = [s.strip().lstrip("r/") for s in subreddits.split(",") if s.strip()]
    else:
        sub_list = prompt_subreddits()

    session_dir = _build_output_dir(output_dir)

    logger.info(
        "Scraping {} subreddit(s): {}",
        len(sub_list),
        ", ".join(f"r/{s}" for s in sub_list),
    )

    from python_reddit_scraper.downloader.state import SessionState
    from python_reddit_scraper.progress import ProgressDisplay
    from python_reddit_scraper.scraper.json_io import save_scraped_json
    from python_reddit_scraper.scraper.parallel import scrape_parallel

    state = SessionState(output_dir=session_dir, video_only=video_only, image_only=image_only)
    for sub in sub_list:
        state.subreddits[sub] = "pending"
    state.save()

    download_q: queue.Queue[tuple[str, list[dict]] | None] = queue.Queue()
    download_results: list[tuple[int, int]] = []

    progress = ProgressDisplay(total_subs=len(sub_list))

    def download_consumer():
        ok, fail = run_download_queue(
            download_q, session_dir, workers, video_only, image_only, state, progress=progress
        )
        download_results.append((ok, fail))

    consumer = threading.Thread(target=download_consumer, daemon=True)
    consumer.start()

    def on_sub_complete(sub: str, posts: list[dict]):
        """Called when a subreddit finishes scraping -- queues its downloads."""
        state.mark_subreddit_scraped(sub)
        if save_json and posts:
            path = save_scraped_json(posts, sub)
            logger.info("r/{}: saved JSON to {}", sub, path)
        state.save()
        download_q.put((sub, posts))

    with progress:
        scrape_parallel(
            sub_list,
            max_pages=max_pages,
            max_workers=min(len(sub_list), scrape_workers),
            on_complete=on_sub_complete,
            progress=progress,
        )

        download_q.put(None)
        consumer.join()

    total_ok = sum(r[0] for r in download_results)
    total_fail = sum(r[1] for r in download_results)

    _print_summary(session_dir, total_ok, total_fail, list(state.subreddits.keys()))

    if total_fail == 0:
        state.flush_and_cleanup()
    else:
        state.save()
        logger.info("Resume with: rye run download-reddit-media --resume")

Prompt

Interactive prompts and environment checks.

python_reddit_scraper.cli.prompt

Interactive prompts and environment checks for the CLI.

prompt_subreddits()

Interactively prompt for subreddit names using prompt-toolkit.

Source code in src/python_reddit_scraper/cli/prompt.py
def prompt_subreddits() -> list[str]:
    """Interactively prompt for subreddit names using prompt-toolkit."""
    from prompt_toolkit import prompt

    raw = prompt("Enter subreddits (comma-separated): ")
    subs = [s.strip().lstrip("r/") for s in raw.split(",") if s.strip()]
    if not subs:
        logger.error("No subreddits provided. Exiting.")
        raise typer.Exit(1)
    return subs

check_camoufox_binary()

Check if the camoufox Firefox binary is installed.

Source code in src/python_reddit_scraper/cli/prompt.py
def check_camoufox_binary() -> None:
    """Check if the camoufox Firefox binary is installed."""
    try:
        from camoufox.pkgman import installed_verstr

        ver = installed_verstr()
        if not ver:
            raise FileNotFoundError
    except Exception:
        logger.error(
            "Camoufox browser not found. Run this command first:\n\n"
            "    rye run camoufox fetch\n\n"
            "This downloads the stealth Firefox binary (~80 MB, one-time setup)."
        )
        raise typer.Exit(1) from None