#!/usr/bin/env python import argparse import csv import os import sys from google.auth.transport.requests import Request from google.auth.exceptions import RefreshError from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from tinydb import TinyDB, Query from yt_dlp import YoutubeDL _playlists = {} # output template: CHANNEL/DATE_TITLE_[ID].ext DEFAULT_OUTPUT_TMPL = "%(channel)s/%(upload_date)s_%(title)s_[%(id)s].%(ext)s" # download video 1080p or lower with audio DEFAULT_FORMAT = "bestvideo[height<=720][ext=mp4]+bestaudio/best[height<=720]" program_dir = os.path.dirname(os.path.realpath(__file__)) def _truncate_title(title: str, length: int = 30) -> str: if length <= 0: return "" if length <= 3: return title[:length] if len(title) <= length: return title return title[:length-3].strip() + (title[length:] and '...') def get_yt_creds(): """ Get YouTube API credentials """ creds = None client_secrets_file = os.path.join(program_dir, "secrets_python.json") scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"] # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. token_file = os.path.join(program_dir, "token.json") if os.path.exists(token_file): creds = Credentials.from_authorized_user_file(token_file, scopes) # Valid credentials if creds and creds.valid: return creds # Expired credentials if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) with open(token_file, "w") as token: token.write(creds.to_json()) return creds except RefreshError as err: print(f'Error refreshing token: {err}') # No credentials or cannot refresh if not os.path.exists(client_secrets_file): print(f'Client secrets file {client_secrets_file} not found') sys.exit(1) flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file, scopes ) creds = flow.run_local_server(port=0) with open(token_file, "w") as token: token.write(creds.to_json()) return creds def exit_on_exceeded_quota(e: HttpError): if e.resp.status == 403: for error in e.error_details: if error.get('reason') == 'quotaExceeded': print('Youtube quota exceeded, exiting') sys.exit(1) def read_playlists_file(): """ Read playlists.csv and return a dictionary of playlist names to playlist IDs """ global _playlists playlists_file = os.path.join(program_dir, 'playlists.csv') if not os.path.exists(playlists_file): print(f'{playlists_file} not found') return {} with open(playlists_file, newline='') as csvfile: reader = csv.DictReader(csvfile) _playlists = {row['name']: row['playlist_id'] for row in reader} def get_playlist_id(playlist_name: str) -> str: if not _playlists: read_playlists_file() return _playlists.get(playlist_name, playlist_name) def get_playlist_name(playlist_id: str) -> str: if not _playlists: read_playlists_file() return next((name for name, plid in _playlists.items() if plid == playlist_id), playlist_id) def list_playlist(yt_api, playlist_id: str): playlist_name = get_playlist_name(playlist_id) videos = [] fetched = 0 overall = -1 page_token = "" try: while fetched < overall or overall == -1: response = yt_api.playlistItems().list( part="snippet,contentDetails", maxResults=50, playlistId=playlist_id, pageToken=page_token ).execute() if overall == -1: overall = response['pageInfo']['totalResults'] fetched += len(response['items']) page_token = response.get('nextPageToken', "") for item in response['items']: videos.append(item) except HttpError as e: exit_on_exceeded_quota(e) print(f'Error getting video IDs from playlist {playlist_name}: {e}') print(f'Fetched {fetched} videos from playlist {playlist_name}') return videos def add_video_to_playlist(yt_api, video_id: str, playlist_id: str, dry_run: bool = False) -> bool: playlist_name = get_playlist_name(playlist_id) video_info = get_video_info(yt_api, video_id) if not video_info: return False video_title = _truncate_title(video_info['snippet']['title']) try: if dry_run: print(f"Would add video '{video_title}' [{video_id}]" f" to playlist {playlist_name}") return True yt_api.playlistItems().insert( part='snippet', body={ 'snippet': { 'playlistId': playlist_id, 'resourceId': { 'kind': 'youtube#video', 'videoId': video_id } } } ).execute() print(f"Added video '{video_title}' [{video_id}]" f" to playlist {playlist_name}") return True except HttpError as e: exit_on_exceeded_quota(e) print(f"Error adding video '{video_title}' [{video_id}]" f" to playlist {playlist_name}: {e}") return False def remove_video_from_playlist(yt_api, plitem_id: str, playlist_id: str, dry_run: bool = False) -> bool: playlist_name = get_playlist_name(playlist_id) plitem_info = get_playlistitem_info(yt_api, plitem_id) if not plitem_info: return False video_title = _truncate_title(plitem_info['snippet']['title']) video_id = plitem_info['snippet']['resourceId']['videoId'] try: if dry_run: print(f"Would remove video '{video_title}' [{video_id}]" f" from playlist {playlist_name}") return True yt_api.playlistItems().delete(id=plitem_id).execute() print(f"Removed video '{video_title}' [{video_id}]" f" from playlist {playlist_name}") return True except HttpError as e: exit_on_exceeded_quota(e) print(f"Error removing video '{video_title}' [{video_id}]" f" from playlist {playlist_name}: {e}") return False def copy_playlist_items(yt_api, src_playlist_id: str, dst_playlist_id: str, delete_from_src: bool = False, limit: int = -1, dry_run: bool = False): src_playlist_items = list_playlist(yt_api, src_playlist_id) dst_playlist_items = list_playlist(yt_api, dst_playlist_id) dst_videos = {pl_item['snippet']['resourceId']['videoId'] for pl_item in dst_playlist_items} processed_amt = 0 for src_pl_item in src_playlist_items: if 0 <= limit <= processed_amt: break was_processed = False video_id = src_pl_item['snippet']['resourceId']['videoId'] if video_id not in dst_videos: add_video_to_playlist(yt_api, video_id, dst_playlist_id, dry_run) was_processed = True if delete_from_src: remove_video_from_playlist(yt_api, src_pl_item["id"], src_playlist_id, dry_run) was_processed = True if was_processed: processed_amt += 1 def get_video_info(youtube, video_id: str): try: # TODO maybe remove 'status' response = youtube.videos().list( part="localizations,snippet,contentDetails," "statistics,status,topicDetails", id=video_id ).execute() except HttpError as e: exit_on_exceeded_quota(e) print(f'Error getting video {video_id}: {e}') return None if not response['items']: print(f'Video {video_id} not found') return None return response['items'][0] def get_playlistitem_info(youtube, playlistitem_id: str): try: response = youtube.playlistItems().list( part="snippet,contentDetails", id=playlistitem_id ).execute() except HttpError as e: exit_on_exceeded_quota(e) print(f'Error getting playlist item {playlistitem_id}: {e}') return None if not response['items']: print(f'Playlist item {playlistitem_id} not found') return None return response['items'][0] def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--dry-run', action='store_true', help='Dry run, do not send changes to YoutubeAPI') subparsers = parser.add_subparsers(title='commands', dest='command') parser_add = subparsers.add_parser('add', help='Add videos to a playlist') parser_add.add_argument('playlist', help='Playlist name/ID') parser_add.add_argument('video_ids', nargs='*', help='Video IDs to add') parser_add_csv = subparsers.add_parser( 'add-csv', help='Add videos to a playlist from a CSV file' ) parser_add_csv.add_argument('playlist', help='Playlist name/ID') parser_add_csv.add_argument('csv', help='CSV file with video IDs') parser_add_csv.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_copy = subparsers.add_parser( 'copy', help='Copy videos from one playlist to another' ) parser_copy.add_argument('src_playlist', help='Source playlist name/ID') parser_copy.add_argument('dst_playlist', help='Destination playlist name/ID') parser_copy.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_move = subparsers.add_parser( 'move', help='Move videos from one playlist to another' ) parser_move.add_argument('src_playlist', help='Source playlist name/ID') parser_move.add_argument('dst_playlist', help='Destination playlist name/ID') parser_move.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_dups = subparsers.add_parser( 'dups', help='Remove duplicate videos in a playlist' ) parser_dups.add_argument('playlist', help='Playlist name/ID') parser_dups.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_download = subparsers.add_parser( 'download', help='Download videos from a playlist' ) parser_download.add_argument('playlist', help='Playlist name/ID') parser_download.add_argument('dst_folder', help='Destination folder') parser_download.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_download.add_argument( '-r', '--remove-from-playlist', action='store_true', help='Remove downloaded videos from the playlist' ) args = parser.parse_args() if args.command is None: parser.print_help() return 1 # Disable OAuthlib's HTTPS verification when running locally. # *DO NOT* leave this option enabled in production. os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" api_service_name = "youtube" api_version = "v3" creds = get_yt_creds() youtube = build(api_service_name, api_version, credentials=creds) if args.command in ('copy', 'move'): delete_from_src = args.command == 'move' copy_playlist_items( youtube, get_playlist_id(args.src_playlist), get_playlist_id(args.dst_playlist), delete_from_src=delete_from_src, limit=args.limit, dry_run=args.dry_run ) elif args.command == 'add': playlist_id = get_playlist_id(args.playlist) # {video_id: video_title} for videos already in the playlist pl_videos = { pl_item['snippet']['resourceId']['videoId']: pl_item['snippet']['title'] for pl_item in list_playlist(youtube, playlist_id) } for video_id in args.video_ids: if video_id in pl_videos: short_title = _truncate_title(pl_videos[video_id]) print(f"Video '{short_title}' [{video_id}]" f" is already in playlist {args.playlist}") continue add_video_to_playlist(youtube, video_id, playlist_id, args.dry_run) elif args.command == 'add-csv': video_ids = [] with open(args.csv, newline='') as csvfile: reader = csv.reader(csvfile) next(reader, None) # skip the headers video_ids.extend(row[0] for row in reader if row) playlist_id = get_playlist_id(args.playlist) # {video_id: video_title} for videos already in the playlist pl_videos = { pl_item['snippet']['resourceId']['videoId']: pl_item['snippet']['title'] for pl_item in list_playlist(youtube, playlist_id) } processed = 0 for video_id in video_ids: if 0 <= args.limit <= processed: break if video_id in pl_videos: continue processed += int(add_video_to_playlist( youtube, video_id, playlist_id, args.dry_run )) elif args.command == "dups": processed = 0 playlist_id = get_playlist_id(args.playlist) plitems = list_playlist(youtube, playlist_id) plitems_processed = set() for plitem in plitems: if 0 <= args.limit <= processed: break video_id = plitem["snippet"]["resourceId"]["videoId"] if video_id in plitems_processed: remove_video_from_playlist(youtube, plitem["id"], playlist_id, args.dry_run) processed += 1 else: plitems_processed.add(video_id) elif args.command == "download": db = TinyDB(os.path.join(program_dir, 'db.json')) query = Query() ydl_opts = { 'outtmpl': os.path.join(args.dst_folder, DEFAULT_OUTPUT_TMPL), 'format': DEFAULT_FORMAT, } # load playlist items playlist_id = get_playlist_id(args.playlist) plitems = list_playlist(youtube, playlist_id) # limit number of videos to process if args.limit > 0: plitems = plitems[:args.limit] for plitem in plitems: video_id = plitem["snippet"]["resourceId"]["videoId"] # skip if video is already in the database if db.search(query.id == video_id): continue video_info = get_video_info(youtube, video_id) # skip if video is not found on YouTube if not video_info: continue video_title = _truncate_title(video_info['snippet']['title']) if args.dry_run: print(f"Would download video '{video_title}' [{video_id}]" f" from playlist {args.playlist}" f" to folder {args.dst_folder}") else: # download video with YoutubeDL(ydl_opts) as ydl: ydl.download( ['https://www.youtube.com/watch?v=' + video_id] ) db.insert(video_info) # remove video from playlist if args.remove_from_playlist: remove_video_from_playlist( youtube, plitem["id"], playlist_id, args.dry_run ) return 0 if __name__ == '__main__': sys.exit(main())