#!/usr/bin/env python3 """ Download NPR Car Talk podcast episodes by crawling NPR's listing endpoint. NPR's HTML currently embeds the audio URL in a JSON blob inside a `data-audio` attribute. This script extracts that JSON and downloads each episode into `{date}_cartalk_{title}.{ext}` files, skipping files that already exist. """ from __future__ import annotations import argparse import concurrent.futures as futures import dataclasses import html import json import logging import os import random import re import time import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Iterable, Iterator @dataclasses.dataclass(frozen=True) class Episode: date: str audio_url: str title: str | None = None def filename(self) -> str: ext = guess_extension(self.audio_url) # remove all non-alphanumeric characters and spaces clean_title = re.sub(r"[^a-zA-Z0-9\s]", "", self.title or "") clean_title = clean_title.strip() clean_title = re.sub(r"\s+", "_", clean_title) if clean_title: clean_title = "_" + clean_title return f"{self.date}_cartalk{clean_title}{ext}" ARTICLE_SPLIT_RE = re.compile(r'
]*>([^<]+)') # fallback: occasionally pages still contain a direct href to the audio file HREF_AUDIO_RE = re.compile(r'href="(https?://[^"]+\\.(?:mp3|m4a)[^"]*)"') def guess_extension(url: str) -> str: parsed = urllib.parse.urlparse(url) path = parsed.path.lower() if path.endswith(".m4a"): return ".m4a" return ".mp3" def build_request( url: str, user_agent: str, timeout_s: float ) -> urllib.request.Request: request = urllib.request.Request(url) request.add_header("User-Agent", user_agent) request.add_header("Accept", "text/html,*/*;q=0.9") return request def fetch_text(url: str, user_agent: str, timeout_s: float, retries: int) -> str: """ Fetch a URL and return decoded text. Uses a simple retry loop with jittered backoff for transient errors. """ last_error: Exception | None = None for attempt in range(retries + 1): if attempt: delay = min(10.0, (2**attempt)) + random.random() logging.getLogger("cartalk").warning( "Retrying page fetch (%s/%s) in %.1fs: %s", attempt, retries, delay, url, ) # small jitter keeps multiple workers from syncing retries time.sleep(delay) try: started = time.time() request = build_request(url, user_agent, timeout_s) with urllib.request.urlopen(request, timeout=timeout_s) as response: raw = response.read() charset = response.headers.get_content_charset() or "utf-8" elapsed = time.time() - started logging.getLogger("cartalk").info( "Fetched %s (%.1f KB) in %.2fs", url, len(raw) / 1024.0, elapsed, ) return raw.decode(charset, errors="replace") except (urllib.error.URLError, TimeoutError) as exc: last_error = exc assert last_error is not None raise last_error def iter_episodes(html_text: str) -> Iterator[Episode]: """ Yield episodes found in NPR's "partial/next" HTML. """ # Split the page into per-episode chunks. The delimiter itself is discarded, # so the first element is whatever came before the first
. parts = ARTICLE_SPLIT_RE.split(html_text) for chunk in parts[1:]: date_match = DATE_RE.search(chunk) if not date_match: continue date = date_match.group(1) title = None title_match = TITLE_RE.search(chunk) if title_match: title = html.unescape(title_match.group(1)).strip() audio_url = extract_audio_url(chunk) if not audio_url: continue yield Episode(date=date, audio_url=audio_url, title=title) def extract_audio_url(article_html: str) -> str | None: data_audio_match = DATA_AUDIO_RE.search(article_html) if data_audio_match: # data-audio is a JSON string, wrapped in single quotes, with backslash- # escaped slashes and occasionally unicode escapes for &. raw = html.unescape(data_audio_match.group(1)) try: payload = json.loads(raw) except json.JSONDecodeError: payload = None if isinstance(payload, dict): url = payload.get("audioUrl") if isinstance(url, str) and url.startswith(("http://", "https://")): return url # Some variants include the audioUrl key, but JSON is malformed due to # unexpected escaping. Fall back to a direct search. audio_url_match = AUDIO_URL_RE.search(raw) if audio_url_match: return unescape_audio_url(audio_url_match.group(1)) href_match = HREF_AUDIO_RE.search(article_html) if href_match: return html.unescape(href_match.group(1)).replace("&", "&") return None def unescape_audio_url(url: str) -> str: # JSON leaves us with literal \/ and possibly \u0026; html.unescape handles # any & style entities that slipped in. url = html.unescape(url) url = url.replace("\\/", "/").replace("\\u0026", "&") return url def cleanup_partial_downloads(output_dir: Path) -> None: """Remove any leftover .part files from previous incomplete runs.""" logger = logging.getLogger("cartalk") part_files = list(output_dir.glob("*.part")) for part_file in part_files: logger.info("Cleaning up partial download: %s", part_file) part_file.unlink() def download_episode( episode: Episode, output_dir: Path, user_agent: str, timeout_s: float, retries: int, dry_run: bool, ) -> tuple[Episode, str]: logger = logging.getLogger("cartalk") filename = episode.filename() final_path = output_dir / filename temp_path = output_dir / f".{filename}.part" # Clean up any existing partial download for this episode if temp_path.exists(): logger.info("Removing partial download: %s", temp_path) temp_path.unlink() if final_path.exists(): logger.debug("Skip existing: %s", final_path) return episode, "skip" if dry_run: return episode, "dry-run" url = unescape_audio_url(episode.audio_url) err_msg = "" for attempt in range(retries + 1): if attempt: delay = min(10.0, (2**attempt)) + random.random() logger.warning( "Retrying download (%s/%s) in %.1fs: %s", attempt, retries, delay, filename, ) time.sleep(delay) try: started = time.time() request = build_request(url, user_agent, timeout_s) with urllib.request.urlopen(request, timeout=timeout_s) as response: status = getattr(response, "status", None) if status is not None: logger.info("Downloading %s (HTTP %s)", filename, status) else: logger.info("Downloading %s", filename) output_dir.mkdir(parents=True, exist_ok=True) with open(temp_path, "wb") as file: while True: chunk = response.read(1024 * 256) if not chunk: break file.write(chunk) if temp_path.stat().st_size < 1024: temp_path.unlink(missing_ok=True) continue os.replace(temp_path, final_path) elapsed = time.time() - started size_kb = final_path.stat().st_size / 1024.0 logger.info("Saved %s (%.1f KB) in %.2fs", final_path, size_kb, elapsed) return episode, "ok" except (urllib.error.URLError, TimeoutError, OSError) as exc: temp_path.unlink(missing_ok=True) err_msg = str(exc) continue logger.error("Failed after retries: %s: %s", filename, err_msg) return episode, "fail" def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Download NPR Car Talk episodes into a local folder.", ) parser.add_argument( "--base-url", default="https://www.npr.org/get/510208/render/partial/next?start=", help="NPR listing endpoint base URL.", ) parser.add_argument("--start", type=int, default=1, help="Start offset.") parser.add_argument("--end", type=int, default=1300, help="End offset.") parser.add_argument( "--batch-size", type=int, default=24, help="Offset step size (NPR uses 24 items per page).", ) parser.add_argument( "--parallel", type=int, default=5, help="Number of parallel downloads.", ) parser.add_argument( "--output-dir", default="car_talk_episodes", help="Where audio files are saved.", ) parser.add_argument( "--timeout", type=float, default=30.0, help="Per-request timeout in seconds.", ) parser.add_argument( "--retries", type=int, default=3, help="Retries for page fetch and downloads.", ) parser.add_argument( "--user-agent", default="Mozilla/5.0 (compatible; CarTalk-Downloader/1.0)", help="User-Agent header for NPR requests.", ) parser.add_argument( "--dry-run", action="store_true", help="Parse and print what would be downloaded without writing files.", ) parser.add_argument( "-v", "--verbose", action="count", default=0, help="Increase verbosity (repeat for more detail).", ) parser.add_argument( "--quiet", action="store_true", help="Only print errors and final summary.", ) return parser def iter_page_offsets(start: int, end: int, batch_size: int) -> Iterable[int]: if batch_size <= 0: raise ValueError("batch_size must be > 0") if end < start: return range(0) return range(start, end + 1, batch_size) def configure_logging(verbose: int, quiet: bool) -> None: logger = logging.getLogger("cartalk") logger.propagate = False level = logging.INFO if quiet: level = logging.ERROR elif verbose >= 2: level = logging.DEBUG elif verbose >= 1: level = logging.INFO else: level = logging.WARNING logger.setLevel(level) handler = logging.StreamHandler() handler.setLevel(level) handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) logger.handlers[:] = [handler] def main() -> int: parser = build_parser() args = parser.parse_args() configure_logging(args.verbose, args.quiet) logger = logging.getLogger("cartalk") output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Clean up any partial downloads from previous runs cleanup_partial_downloads(output_dir) logger.info( "Config: start=%s end=%s batch=%s parallel=%s output_dir=%s", args.start, args.end, args.batch_size, args.parallel, output_dir, ) started = time.time() ok = 0 skipped = 0 failed = 0 all_episodes_seen: set[str] = set() for offset in iter_page_offsets(args.start, args.end, args.batch_size): url = f"{args.base_url}{offset}" logger.info("Fetching listing page (start=%s): %s", offset, url) page = fetch_text(url, args.user_agent, args.timeout, args.retries) page_episodes = list(iter_episodes(page)) logger.info("Found %s episodes in page (start=%s)", len(page_episodes), offset) # Filter out episodes we've already seen (deduplication) new_episodes = [] for episode in page_episodes: if episode.date not in all_episodes_seen: all_episodes_seen.add(episode.date) new_episodes.append(episode) else: logger.debug("Skipping duplicate episode: %s", episode.date) logger.info("New episodes to process: %s", len(new_episodes)) if args.dry_run: for episode in new_episodes: title = f" - {episode.title}" if episode.title else "" print(f"{episode.date} {episode.audio_url}{title}") continue # Download episodes from this page immediately if new_episodes: with futures.ThreadPoolExecutor( max_workers=max(1, int(args.parallel)) ) as pool: jobs = [ pool.submit( download_episode, episode, output_dir, args.user_agent, args.timeout, args.retries, False, ) for episode in new_episodes ] for job in futures.as_completed(jobs): episode, status = job.result() if status == "ok": ok += 1 elif status == "skip": skipped += 1 else: failed += 1 logger.debug( "Result: %s date=%s url=%s", status, episode.date, episode.audio_url, ) if status != "skip" and not args.quiet: print(f"[{status.upper()}] {episode.date} {episode.audio_url}") elapsed = time.time() - started print( f"Done. ok={ok} skipped={skipped} failed={failed} " f"output_dir={output_dir} elapsed={elapsed:.1f}s" ) return 1 if failed else 0 if __name__ == "__main__": raise SystemExit(main())