#!/usr/bin/env python import argparse import csv import os import sys from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError _playlists = {} def _truncate_title(title: str, length: int = 30) -> str: if length <= 0: return "" if length <= 3: return title[:length] if len(title) <= length: return title return title[:length-3] + (title[length:] and '...') def get_yt_creds(): """ Get YouTube API credentials """ creds = None client_secrets_file = "client_secrets_file.json" scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"] # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists("token.json"): creds = Credentials.from_authorized_user_file("token.json", scopes) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file, scopes ) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open("token.json", "w") as token: token.write(creds.to_json()) return creds def read_playlists_file(): """ Read playlists.csv and return a dictionary of playlist names to playlist IDs """ global _playlists if not os.path.exists('playlists.csv'): print('playlists.csv not found') return {} with open('playlists.csv', newline='') as csvfile: reader = csv.DictReader(csvfile) _playlists = {row['name']: row['playlist_id'] for row in reader} def get_playlist_id(playlist_name: str) -> str: if not _playlists: read_playlists_file() return _playlists.get(playlist_name, playlist_name) def get_playlist_name(playlist_id: str) -> str: if not _playlists: read_playlists_file() return next((name for name, plid in _playlists.items() if plid == playlist_id), playlist_id) def get_videos(yt_api, playlist_id): playlist_name = get_playlist_name(playlist_id) videos = [] fetched = 0 overall = -1 page_token = "" try: while fetched < overall or overall == -1: response = yt_api.playlistItems().list( part="snippet,contentDetails", maxResults=50, playlistId=playlist_id, pageToken=page_token ).execute() if overall == -1: overall = response['pageInfo']['totalResults'] fetched += len(response['items']) page_token = response.get('nextPageToken', "") for item in response['items']: videos.append(item) except HttpError as e: print(f'Error getting video IDs from playlist {playlist_name}: {e}') print(f'Fetched {fetched} videos from playlist {playlist_name}') return videos def add_video_to_playlist(yt_api, video, playlist_id, dry_run: bool = False) -> bool: playlist_name = get_playlist_name(playlist_id) video_id = video['snippet']['resourceId']['videoId'] video_title = _truncate_title(video['snippet']['title']) try: if dry_run: print(f"Would add video '{video_title}' [{video_id}] to playlist {playlist_name}") return True yt_api.playlistItems().insert( part='snippet', body={ 'snippet': { 'playlistId': playlist_id, 'resourceId': { 'kind': 'youtube#video', 'videoId': video_id } } } ).execute() print(f"Added video '{video_title}' [{video_id}] to playlist {playlist_name}") return True except HttpError as e: print(f"Error adding video '{video_title}' [{video_id}] to playlist {playlist_name}: {e}") return False def remove_video_from_playlist(yt_api, video, playlist_id, dry_run: bool = False) -> bool: playlist_name = get_playlist_name(playlist_id) video_id = video['snippet']['resourceId']['videoId'] video_title = _truncate_title(video['snippet']['title']) try: if dry_run: print(f"Would remove video '{video_title}' [{video_id}]" f" from playlist {playlist_name}") return True yt_api.playlistItems().delete( id=video['id'] ).execute() print(f"Removed video '{video_title}' [{video_id}]" f" from playlist {playlist_name}") return True except HttpError as e: print(f"Error removing video '{video_title}' [{video_id}]" f" from playlist {playlist_name}: {e}") return False def copy_playlist_items(yt_api, src_playlist: str, dst_playlist: str, delete_from_src: bool = False, limit: int = -1, dry_run: bool = False): if limit < 0: limit = len(src_playlist) src_videos = get_videos(yt_api, src_playlist) dst_videos = get_videos(yt_api, dst_playlist) dst_video_map = {video['snippet']['resourceId']['videoId']: video['id'] for video in dst_videos} for src_video in src_videos[:limit]: video_id = src_video['snippet']['resourceId']['videoId'] if video_id not in dst_video_map: add_video_to_playlist(yt_api, src_video, dst_playlist, dry_run) if delete_from_src: remove_video_from_playlist(yt_api, src_video, src_playlist, dry_run) def main(): parser = argparse.ArgumentParser() parser.add_argument('-n', '--dry-run', action='store_true', help='Dry run, do not send changes to YoutubeAPI') subparsers = parser.add_subparsers(title='commands', dest='command') parser_move = subparsers.add_parser('move', help='Move videos from one playlist to another') parser_move.add_argument('src_playlist', help='Source playlist ID') parser_move.add_argument('dst_playlist', help='Destination playlist ID') parser_move.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') parser_copy = subparsers.add_parser('copy', help='Copy videos from one playlist to another') parser_copy.add_argument('src_playlist', help='Source playlist ID') parser_copy.add_argument('dst_playlist', help='Destination playlist ID') parser_copy.add_argument('-l', '--limit', type=int, default=-1, help='Limit number of videos to process') args = parser.parse_args() # Disable OAuthlib's HTTPS verification when running locally. # *DO NOT* leave this option enabled in production. os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" api_service_name = "youtube" api_version = "v3" creds = get_yt_creds() youtube = build(api_service_name, api_version, credentials=creds) if args.command is None: parser.print_help() return 1 elif args.command == 'copy': src_playlist = get_playlist_id(args.src_playlist) dst_playlist = get_playlist_id(args.dst_playlist) copy_playlist_items(youtube, src_playlist, dst_playlist, delete_from_src=False, limit=args.limit, dry_run=args.dry_run) elif args.command == 'move': src_playlist = get_playlist_id(args.src_playlist) dst_playlist = get_playlist_id(args.dst_playlist) copy_playlist_items(youtube, src_playlist, dst_playlist, delete_from_src=True, limit=args.limit, dry_run=args.dry_run) if __name__ == '__main__': sys.exit(main())