scriptory/car-talk-downloader.py

447 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Download NPR Car Talk podcast episodes by crawling NPR's listing endpoint.
NPR's HTML currently embeds the audio URL in a JSON blob inside a `data-audio`
attribute. This script extracts that JSON and downloads each episode into
`{date}_cartalk_{title}.{ext}` files, skipping files that already exist.
"""
from __future__ import annotations
import argparse
import concurrent.futures as futures
import dataclasses
import html
import json
import logging
import os
import random
import re
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Iterable, Iterator
@dataclasses.dataclass(frozen=True)
class Episode:
date: str
audio_url: str
title: str | None = None
def filename(self) -> str:
ext = guess_extension(self.audio_url)
# remove all non-alphanumeric characters and spaces
clean_title = re.sub(r"[^a-zA-Z0-9\s]", "", self.title or "")
clean_title = clean_title.strip()
clean_title = re.sub(r"\s+", "_", clean_title)
if clean_title:
clean_title = "_" + clean_title
return f"{self.date}_cartalk{clean_title}{ext}"
ARTICLE_SPLIT_RE = re.compile(r'<article class="item podcast-episode"')
DATE_RE = re.compile(r'<time datetime="([0-9]{4}-[0-9]{2}-[0-9]{2})"')
DATA_AUDIO_RE = re.compile(r"data-audio='([^']+)'")
AUDIO_URL_RE = re.compile(r'"audioUrl":"([^"]+)"')
TITLE_RE = re.compile(r'<h2 class="title"><a [^>]*>([^<]+)</a></h2>')
# fallback: occasionally pages still contain a direct href to the audio file
HREF_AUDIO_RE = re.compile(r'href="(https?://[^"]+\\.(?:mp3|m4a)[^"]*)"')
def guess_extension(url: str) -> str:
parsed = urllib.parse.urlparse(url)
path = parsed.path.lower()
if path.endswith(".m4a"):
return ".m4a"
return ".mp3"
def build_request(
url: str, user_agent: str, timeout_s: float
) -> urllib.request.Request:
request = urllib.request.Request(url)
request.add_header("User-Agent", user_agent)
request.add_header("Accept", "text/html,*/*;q=0.9")
return request
def fetch_text(url: str, user_agent: str, timeout_s: float, retries: int) -> str:
"""
Fetch a URL and return decoded text.
Uses a simple retry loop with jittered backoff for transient errors.
"""
last_error: Exception | None = None
for attempt in range(retries + 1):
if attempt:
delay = min(10.0, (2**attempt)) + random.random()
logging.getLogger("cartalk").warning(
"Retrying page fetch (%s/%s) in %.1fs: %s",
attempt,
retries,
delay,
url,
)
# small jitter keeps multiple workers from syncing retries
time.sleep(delay)
try:
started = time.time()
request = build_request(url, user_agent, timeout_s)
with urllib.request.urlopen(request, timeout=timeout_s) as response:
raw = response.read()
charset = response.headers.get_content_charset() or "utf-8"
elapsed = time.time() - started
logging.getLogger("cartalk").info(
"Fetched %s (%.1f KB) in %.2fs",
url,
len(raw) / 1024.0,
elapsed,
)
return raw.decode(charset, errors="replace")
except (urllib.error.URLError, TimeoutError) as exc:
last_error = exc
assert last_error is not None
raise last_error
def iter_episodes(html_text: str) -> Iterator[Episode]:
"""
Yield episodes found in NPR's "partial/next" HTML.
"""
# Split the page into per-episode chunks. The delimiter itself is discarded,
# so the first element is whatever came before the first <article ...>.
parts = ARTICLE_SPLIT_RE.split(html_text)
for chunk in parts[1:]:
date_match = DATE_RE.search(chunk)
if not date_match:
continue
date = date_match.group(1)
title = None
title_match = TITLE_RE.search(chunk)
if title_match:
title = html.unescape(title_match.group(1)).strip()
audio_url = extract_audio_url(chunk)
if not audio_url:
continue
yield Episode(date=date, audio_url=audio_url, title=title)
def extract_audio_url(article_html: str) -> str | None:
data_audio_match = DATA_AUDIO_RE.search(article_html)
if data_audio_match:
# data-audio is a JSON string, wrapped in single quotes, with backslash-
# escaped slashes and occasionally unicode escapes for &.
raw = html.unescape(data_audio_match.group(1))
try:
payload = json.loads(raw)
except json.JSONDecodeError:
payload = None
if isinstance(payload, dict):
url = payload.get("audioUrl")
if isinstance(url, str) and url.startswith(("http://", "https://")):
return url
# Some variants include the audioUrl key, but JSON is malformed due to
# unexpected escaping. Fall back to a direct search.
audio_url_match = AUDIO_URL_RE.search(raw)
if audio_url_match:
return unescape_audio_url(audio_url_match.group(1))
href_match = HREF_AUDIO_RE.search(article_html)
if href_match:
return html.unescape(href_match.group(1)).replace("&amp;", "&")
return None
def unescape_audio_url(url: str) -> str:
# JSON leaves us with literal \/ and possibly \u0026; html.unescape handles
# any &amp; style entities that slipped in.
url = html.unescape(url)
url = url.replace("\\/", "/").replace("\\u0026", "&")
return url
def cleanup_partial_downloads(output_dir: Path) -> None:
"""Remove any leftover .part files from previous incomplete runs."""
logger = logging.getLogger("cartalk")
part_files = list(output_dir.glob("*.part"))
for part_file in part_files:
logger.info("Cleaning up partial download: %s", part_file)
part_file.unlink()
def download_episode(
episode: Episode,
output_dir: Path,
user_agent: str,
timeout_s: float,
retries: int,
dry_run: bool,
) -> tuple[Episode, str]:
logger = logging.getLogger("cartalk")
filename = episode.filename()
final_path = output_dir / filename
temp_path = output_dir / f".{filename}.part"
# Clean up any existing partial download for this episode
if temp_path.exists():
logger.info("Removing partial download: %s", temp_path)
temp_path.unlink()
if final_path.exists():
logger.debug("Skip existing: %s", final_path)
return episode, "skip"
if dry_run:
return episode, "dry-run"
url = unescape_audio_url(episode.audio_url)
err_msg = ""
for attempt in range(retries + 1):
if attempt:
delay = min(10.0, (2**attempt)) + random.random()
logger.warning(
"Retrying download (%s/%s) in %.1fs: %s",
attempt,
retries,
delay,
filename,
)
time.sleep(delay)
try:
started = time.time()
request = build_request(url, user_agent, timeout_s)
with urllib.request.urlopen(request, timeout=timeout_s) as response:
status = getattr(response, "status", None)
if status is not None:
logger.info("Downloading %s (HTTP %s)", filename, status)
else:
logger.info("Downloading %s", filename)
output_dir.mkdir(parents=True, exist_ok=True)
with open(temp_path, "wb") as file:
while True:
chunk = response.read(1024 * 256)
if not chunk:
break
file.write(chunk)
if temp_path.stat().st_size < 1024:
temp_path.unlink(missing_ok=True)
continue
os.replace(temp_path, final_path)
elapsed = time.time() - started
size_kb = final_path.stat().st_size / 1024.0
logger.info("Saved %s (%.1f KB) in %.2fs", final_path, size_kb, elapsed)
return episode, "ok"
except (urllib.error.URLError, TimeoutError, OSError) as exc:
temp_path.unlink(missing_ok=True)
err_msg = str(exc)
continue
logger.error("Failed after retries: %s: %s", filename, err_msg)
return episode, "fail"
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Download NPR Car Talk episodes into a local folder.",
)
parser.add_argument(
"--base-url",
default="https://www.npr.org/get/510208/render/partial/next?start=",
help="NPR listing endpoint base URL.",
)
parser.add_argument("--start", type=int, default=1, help="Start offset.")
parser.add_argument("--end", type=int, default=1300, help="End offset.")
parser.add_argument(
"--batch-size",
type=int,
default=24,
help="Offset step size (NPR uses 24 items per page).",
)
parser.add_argument(
"--parallel",
type=int,
default=5,
help="Number of parallel downloads.",
)
parser.add_argument(
"--output-dir",
default="car_talk_episodes",
help="Where audio files are saved.",
)
parser.add_argument(
"--timeout",
type=float,
default=30.0,
help="Per-request timeout in seconds.",
)
parser.add_argument(
"--retries",
type=int,
default=3,
help="Retries for page fetch and downloads.",
)
parser.add_argument(
"--user-agent",
default="Mozilla/5.0 (compatible; CarTalk-Downloader/1.0)",
help="User-Agent header for NPR requests.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and print what would be downloaded without writing files.",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Increase verbosity (repeat for more detail).",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Only print errors and final summary.",
)
return parser
def iter_page_offsets(start: int, end: int, batch_size: int) -> Iterable[int]:
if batch_size <= 0:
raise ValueError("batch_size must be > 0")
if end < start:
return range(0)
return range(start, end + 1, batch_size)
def configure_logging(verbose: int, quiet: bool) -> None:
logger = logging.getLogger("cartalk")
logger.propagate = False
level = logging.INFO
if quiet:
level = logging.ERROR
elif verbose >= 2:
level = logging.DEBUG
elif verbose >= 1:
level = logging.INFO
else:
level = logging.WARNING
logger.setLevel(level)
handler = logging.StreamHandler()
handler.setLevel(level)
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.handlers[:] = [handler]
def main() -> int:
parser = build_parser()
args = parser.parse_args()
configure_logging(args.verbose, args.quiet)
logger = logging.getLogger("cartalk")
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Clean up any partial downloads from previous runs
cleanup_partial_downloads(output_dir)
logger.info(
"Config: start=%s end=%s batch=%s parallel=%s output_dir=%s",
args.start,
args.end,
args.batch_size,
args.parallel,
output_dir,
)
started = time.time()
ok = 0
skipped = 0
failed = 0
all_episodes_seen: set[str] = set()
for offset in iter_page_offsets(args.start, args.end, args.batch_size):
url = f"{args.base_url}{offset}"
logger.info("Fetching listing page (start=%s): %s", offset, url)
page = fetch_text(url, args.user_agent, args.timeout, args.retries)
page_episodes = list(iter_episodes(page))
logger.info("Found %s episodes in page (start=%s)", len(page_episodes), offset)
# Filter out episodes we've already seen (deduplication)
new_episodes = []
for episode in page_episodes:
if episode.date not in all_episodes_seen:
all_episodes_seen.add(episode.date)
new_episodes.append(episode)
else:
logger.debug("Skipping duplicate episode: %s", episode.date)
logger.info("New episodes to process: %s", len(new_episodes))
if args.dry_run:
for episode in new_episodes:
title = f" - {episode.title}" if episode.title else ""
print(f"{episode.date} {episode.audio_url}{title}")
continue
# Download episodes from this page immediately
if new_episodes:
with futures.ThreadPoolExecutor(
max_workers=max(1, int(args.parallel))
) as pool:
jobs = [
pool.submit(
download_episode,
episode,
output_dir,
args.user_agent,
args.timeout,
args.retries,
False,
)
for episode in new_episodes
]
for job in futures.as_completed(jobs):
episode, status = job.result()
if status == "ok":
ok += 1
elif status == "skip":
skipped += 1
else:
failed += 1
logger.debug(
"Result: %s date=%s url=%s",
status,
episode.date,
episode.audio_url,
)
if status != "skip" and not args.quiet:
print(f"[{status.upper()}] {episode.date} {episode.audio_url}")
elapsed = time.time() - started
print(
f"Done. ok={ok} skipped={skipped} failed={failed} "
f"output_dir={output_dir} elapsed={elapsed:.1f}s"
)
return 1 if failed else 0
if __name__ == "__main__":
raise SystemExit(main())