robocyp/main.py

340 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import csv
import os
import sys
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
_playlists = {}
def _truncate_title(title: str, length: int = 30) -> str:
if length <= 0:
return ""
if length <= 3:
return title[:length]
if len(title) <= length:
return title
return title[:length-3].strip() + (title[length:] and '...')
def get_yt_creds():
""" Get YouTube API credentials """
creds = None
client_secrets_file = "client_secrets_file.json"
scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", scopes)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
if not os.path.exists(client_secrets_file):
print(f'Client secrets file {client_secrets_file} not found')
sys.exit(1)
flow = InstalledAppFlow.from_client_secrets_file(
client_secrets_file, scopes
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open("token.json", "w") as token:
token.write(creds.to_json())
return creds
def exit_on_exceeded_quota(e: HttpError):
if e.resp.status == 403:
for error in e.error_details:
if error.get('reason') == 'quotaExceeded':
print('Youtube quota exceeded, exiting')
sys.exit(1)
def read_playlists_file():
""" Read playlists.csv and return a dictionary of playlist names to playlist IDs """
global _playlists
if not os.path.exists('playlists.csv'):
print('playlists.csv not found')
return {}
with open('playlists.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
_playlists = {row['name']: row['playlist_id'] for row in reader}
def get_playlist_id(playlist_name: str) -> str:
if not _playlists:
read_playlists_file()
return _playlists.get(playlist_name, playlist_name)
def get_playlist_name(playlist_id: str) -> str:
if not _playlists:
read_playlists_file()
return next((name for name, plid in _playlists.items()
if plid == playlist_id), playlist_id)
def list_playlist(yt_api, playlist_id: str):
playlist_name = get_playlist_name(playlist_id)
videos = []
fetched = 0
overall = -1
page_token = ""
try:
while fetched < overall or overall == -1:
response = yt_api.playlistItems().list(
part="snippet,contentDetails",
maxResults=50,
playlistId=playlist_id,
pageToken=page_token
).execute()
if overall == -1:
overall = response['pageInfo']['totalResults']
fetched += len(response['items'])
page_token = response.get('nextPageToken', "")
for item in response['items']:
videos.append(item)
except HttpError as e:
exit_on_exceeded_quota(e)
print(f'Error getting video IDs from playlist {playlist_name}: {e}')
print(f'Fetched {fetched} videos from playlist {playlist_name}')
return videos
def add_video_to_playlist(yt_api, video_id: str, playlist_id: str,
dry_run: bool = False) -> bool:
playlist_name = get_playlist_name(playlist_id)
video_info = get_video_info(yt_api, video_id)
if not video_info:
return False
video_title = _truncate_title(video_info['snippet']['title'])
try:
if dry_run:
print(f"Would add video '{video_title}' [{video_id}] to playlist {playlist_name}")
return True
yt_api.playlistItems().insert(
part='snippet',
body={
'snippet': {
'playlistId': playlist_id,
'resourceId': {
'kind': 'youtube#video',
'videoId': video_id
}
}
}
).execute()
print(f"Added video '{video_title}' [{video_id}] to playlist {playlist_name}")
return True
except HttpError as e:
exit_on_exceeded_quota(e)
print(f"Error adding video '{video_title}' [{video_id}] to playlist {playlist_name}: {e}")
return False
def remove_video_from_playlist(yt_api, plitem_id: str, playlist_id: str,
dry_run: bool = False) -> bool:
playlist_name = get_playlist_name(playlist_id)
plitem_info = get_playlistitem_info(yt_api, plitem_id)
if not plitem_info:
return False
video_title = _truncate_title(plitem_info['snippet']['title'])
video_id = plitem_info['snippet']['resourceId']['videoId']
try:
if dry_run:
print(f"Would remove video '{video_title}' [{video_id}]"
f" from playlist {playlist_name}")
return True
yt_api.playlistItems().delete(id=plitem_id).execute()
print(f"Removed video '{video_title}' [{video_id}]"
f" from playlist {playlist_name}")
return True
except HttpError as e:
exit_on_exceeded_quota(e)
print(f"Error removing video '{video_title}' [{video_id}]"
f" from playlist {playlist_name}: {e}")
return False
def copy_playlist_items(yt_api,
src_playlist: str,
dst_playlist: str,
delete_from_src: bool = False,
limit: int = -1,
dry_run: bool = False):
src_playlist_items = list_playlist(yt_api, src_playlist)
dst_playlist_items = list_playlist(yt_api, dst_playlist)
dst_videos = {pl_item['snippet']['resourceId']['videoId']
for pl_item in dst_playlist_items}
processed_amt = 0
for src_pl_item in src_playlist_items:
if 0 <= limit <= processed_amt:
break
was_processed = False
video_id = src_pl_item['snippet']['resourceId']['videoId']
if video_id not in dst_videos:
add_video_to_playlist(yt_api, video_id, dst_playlist, dry_run)
was_processed = True
if delete_from_src:
remove_video_from_playlist(yt_api, src_pl_item["id"], src_playlist, dry_run)
was_processed = True
if was_processed:
processed_amt += 1
def get_video_info(youtube, video_id: str):
try:
response = youtube.videos().list(
part="snippet",
id=video_id
).execute()
except HttpError as e:
exit_on_exceeded_quota(e)
print(f'Error getting video {video_id}: {e}')
return None
if not response['items']:
print(f'Video {video_id} not found')
return None
return response['items'][0]
def get_playlistitem_info(youtube, playlistitem_id: str):
try:
response = youtube.playlistItems().list(
part="snippet",
id=playlistitem_id
).execute()
except HttpError as e:
exit_on_exceeded_quota(e)
print(f'Error getting playlist item {playlistitem_id}: {e}')
return None
if not response['items']:
print(f'Playlist item {playlistitem_id} not found')
return None
return response['items'][0]
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--dry-run', action='store_true',
help='Dry run, do not send changes to YoutubeAPI')
subparsers = parser.add_subparsers(title='commands', dest='command')
parser_add = subparsers.add_parser('add', help='Add videos to a playlist')
parser_add.add_argument('playlist', help='Playlist name/ID')
parser_add.add_argument('video_ids', nargs='*', help='Video IDs to add')
parser_add_csv = subparsers.add_parser('add-csv', help='Add videos to a playlist from a CSV file')
parser_add_csv.add_argument('playlist', help='Playlist name/ID')
parser_add_csv.add_argument('csv', help='CSV file with video IDs')
parser_add_csv.add_argument('-l', '--limit', type=int, default=-1,
help='Limit number of videos to process')
parser_copy = subparsers.add_parser('copy', help='Copy videos from one playlist to another')
parser_copy.add_argument('src_playlist', help='Source playlist name/ID')
parser_copy.add_argument('dst_playlist', help='Destination playlist name/ID')
parser_copy.add_argument('-l', '--limit', type=int, default=-1,
help='Limit number of videos to process')
parser_move = subparsers.add_parser('move', help='Move videos from one playlist to another')
parser_move.add_argument('src_playlist', help='Source playlist name/ID')
parser_move.add_argument('dst_playlist', help='Destination playlist name/ID')
parser_move.add_argument('-l', '--limit', type=int, default=-1,
help='Limit number of videos to process')
parser_dups = subparsers.add_parser('dups', help='Remove duplicate videos in a playlist')
parser_dups.add_argument('playlist', help='Playlist name/ID')
parser_dups.add_argument('-l', '--limit', type=int, default=-1,
help='Limit number of videos to process')
args = parser.parse_args()
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
api_service_name = "youtube"
api_version = "v3"
creds = get_yt_creds()
youtube = build(api_service_name, api_version, credentials=creds)
if args.command is None:
parser.print_help()
return 1
elif args.command == 'copy':
src_playlist = get_playlist_id(args.src_playlist)
dst_playlist = get_playlist_id(args.dst_playlist)
copy_playlist_items(youtube, src_playlist, dst_playlist,
delete_from_src=False,
limit=args.limit, dry_run=args.dry_run)
elif args.command == 'move':
src_playlist = get_playlist_id(args.src_playlist)
dst_playlist = get_playlist_id(args.dst_playlist)
copy_playlist_items(youtube, src_playlist, dst_playlist,
delete_from_src=True,
limit=args.limit, dry_run=args.dry_run)
elif args.command == 'add':
playlist = get_playlist_id(args.playlist)
pl_videos = {video['snippet']['resourceId']['videoId']
for video in list_playlist(youtube, playlist)}
for video_id in args.video_ids:
if video_id in pl_videos:
print(f'Video {video_id} already in playlist {args.playlist}')
continue
add_video_to_playlist(youtube, video_id, playlist, args.dry_run)
elif args.command == 'add-csv':
video_ids = []
with open(args.csv, newline='') as csvfile:
reader = csv.reader(csvfile)
next(reader, None) # skip the headers
video_ids.extend(row[0] for row in reader if row)
playlist = get_playlist_id(args.playlist)
pl_videos = {video['snippet']['resourceId']['videoId']
for video in list_playlist(youtube, playlist)}
if args.limit > 0:
video_ids = video_ids[:args.limit]
for video_id in video_ids:
if video_id in pl_videos:
print(f'Video {video_id} already in playlist {args.playlist}')
continue
add_video_to_playlist(youtube, video_id, playlist, args.dry_run)
elif args.command == "dups":
processed = 0
playlist_id = get_playlist_id(args.playlist)
plitems = list_playlist(youtube, playlist_id)
plitems_processed = set()
for plitem in plitems:
if 0 <= args.limit <= processed:
break
video_id = plitem["snippet"]["resourceId"]["videoId"]
if video_id in plitems_processed:
remove_video_from_playlist(youtube, plitem["id"], playlist_id, args.dry_run)
processed += 1
else:
plitems_processed.add(video_id)
return 0
if __name__ == '__main__':
sys.exit(main())