459 lines
16 KiB
Python
Executable File
459 lines
16 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import sys
|
|
from typing import Optional, Iterable
|
|
|
|
from google.auth.transport.requests import Request
|
|
from google.auth.exceptions import RefreshError
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
from tinydb import TinyDB, Query
|
|
from yt_dlp import YoutubeDL
|
|
|
|
_playlists = {}
|
|
# output template: CHANNEL/DATE_TITLE_[ID].ext
|
|
DEFAULT_OUTPUT_TMPL = "%(channel)s/%(upload_date)s_%(title)s_[%(id)s].%(ext)s"
|
|
# download video 1080p or lower with audio
|
|
DEFAULT_FORMAT = "bestvideo[height<=720][ext=mp4]+bestaudio/best[height<=720]"
|
|
program_dir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
class ErrorReasons:
|
|
QUOTA_EXCEEDED = 'quotaExceeded'
|
|
PLAYLIST_ITEMS_NOT_ACCESSIBLE = 'playlistItemsNotAccessible'
|
|
|
|
|
|
def _truncate_title(title: str, length: int = 30) -> str:
|
|
if length <= 0:
|
|
return ""
|
|
if length <= 3:
|
|
return title[:length]
|
|
if len(title) <= length:
|
|
return title
|
|
return title[:length-3].strip() + (title[length:] and '...')
|
|
|
|
|
|
def get_yt_creds():
|
|
""" Get YouTube API credentials """
|
|
creds = None
|
|
client_secrets_file = os.path.join(program_dir, "secrets_python.json")
|
|
scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]
|
|
|
|
# The file token.json stores the user's access and refresh tokens, and is
|
|
# created automatically when the authorization flow completes for the first
|
|
# time.
|
|
token_file = os.path.join(program_dir, "token.json")
|
|
if os.path.exists(token_file):
|
|
creds = Credentials.from_authorized_user_file(token_file, scopes)
|
|
|
|
# Valid credentials
|
|
if creds and creds.valid:
|
|
return creds
|
|
|
|
# Expired credentials
|
|
if creds and creds.expired and creds.refresh_token:
|
|
try:
|
|
creds.refresh(Request())
|
|
with open(token_file, "w") as token:
|
|
token.write(creds.to_json())
|
|
return creds
|
|
except RefreshError as err:
|
|
print(f'Error refreshing token: {err}')
|
|
|
|
# No credentials or cannot refresh
|
|
if not os.path.exists(client_secrets_file):
|
|
print(f'Client secrets file {client_secrets_file} not found')
|
|
sys.exit(1)
|
|
flow = InstalledAppFlow.from_client_secrets_file(
|
|
client_secrets_file, scopes
|
|
)
|
|
creds = flow.run_local_server(port=0)
|
|
with open(token_file, "w") as token:
|
|
token.write(creds.to_json())
|
|
return creds
|
|
|
|
|
|
def handle_http_error(
|
|
e: HttpError,
|
|
msg: Optional[str] = None,
|
|
reasons_to_abort: Optional[Iterable[str]] = (ErrorReasons.QUOTA_EXCEEDED,)
|
|
):
|
|
for error in e.error_details:
|
|
print(f"{msg}: {error.get('message')}")
|
|
if error.get('reason') in reasons_to_abort:
|
|
sys.exit(1)
|
|
|
|
|
|
def read_playlists_file():
|
|
"""
|
|
Read playlists.csv and return a dictionary of playlist names to playlist IDs
|
|
"""
|
|
global _playlists
|
|
playlists_file = os.path.join(program_dir, 'playlists.csv')
|
|
if not os.path.exists(playlists_file):
|
|
print(f'{playlists_file} not found')
|
|
return {}
|
|
with open(playlists_file, newline='') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
_playlists = {row['name']: row['playlist_id'] for row in reader}
|
|
|
|
|
|
def get_playlist_id(playlist_name: str) -> str:
|
|
if not _playlists:
|
|
read_playlists_file()
|
|
return _playlists.get(playlist_name, playlist_name)
|
|
|
|
|
|
def get_playlist_name(playlist_id: str) -> str:
|
|
if not _playlists:
|
|
read_playlists_file()
|
|
return next((name for name, plid in _playlists.items()
|
|
if plid == playlist_id), playlist_id)
|
|
|
|
|
|
def list_playlist(yt_api, playlist_id: str):
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
videos = []
|
|
fetched = 0
|
|
overall = -1
|
|
page_token = ""
|
|
try:
|
|
while fetched < overall or overall == -1:
|
|
response = yt_api.playlistItems().list(
|
|
part="snippet,contentDetails",
|
|
maxResults=50,
|
|
playlistId=playlist_id,
|
|
pageToken=page_token
|
|
).execute()
|
|
if overall == -1:
|
|
overall = response['pageInfo']['totalResults']
|
|
fetched += len(response['items'])
|
|
page_token = response.get('nextPageToken', "")
|
|
for item in response['items']:
|
|
videos.append(item)
|
|
except HttpError as e:
|
|
handle_http_error(e, f'Error getting video IDs from playlist {playlist_name}')
|
|
|
|
print(f'Fetched {fetched} videos from playlist {playlist_name}')
|
|
return videos
|
|
|
|
|
|
def add_video_to_playlist(yt_api, video_id: str, playlist_id: str,
|
|
dry_run: bool = False) -> bool:
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
video_info = get_video_info(yt_api, video_id)
|
|
if not video_info:
|
|
return False
|
|
video_title = _truncate_title(video_info['snippet']['title'])
|
|
try:
|
|
if dry_run:
|
|
print(f"Would add video '{video_title}' [{video_id}]"
|
|
f" to playlist {playlist_name}")
|
|
return True
|
|
yt_api.playlistItems().insert(
|
|
part='snippet',
|
|
body={
|
|
'snippet': {
|
|
'playlistId': playlist_id,
|
|
'resourceId': {
|
|
'kind': 'youtube#video',
|
|
'videoId': video_id
|
|
}
|
|
}
|
|
}
|
|
).execute()
|
|
print(f"Added video '{video_title}' [{video_id}]"
|
|
f" to playlist {playlist_name}")
|
|
return True
|
|
except HttpError as e:
|
|
msg = (f"Error adding video '{video_title}' [{video_id}]"
|
|
f" to playlist {playlist_name}")
|
|
reasons_to_abort = (ErrorReasons.QUOTA_EXCEEDED,
|
|
ErrorReasons.PLAYLIST_ITEMS_NOT_ACCESSIBLE)
|
|
handle_http_error(e, msg, reasons_to_abort)
|
|
return False
|
|
|
|
|
|
def remove_video_from_playlist(yt_api, plitem_id: str, playlist_id: str,
|
|
dry_run: bool = False) -> bool:
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
plitem_info = get_playlistitem_info(yt_api, plitem_id)
|
|
if not plitem_info:
|
|
return False
|
|
video_title = _truncate_title(plitem_info['snippet']['title'])
|
|
video_id = plitem_info['snippet']['resourceId']['videoId']
|
|
try:
|
|
if dry_run:
|
|
print(f"Would remove video '{video_title}' [{video_id}]"
|
|
f" from playlist {playlist_name}")
|
|
return True
|
|
yt_api.playlistItems().delete(id=plitem_id).execute()
|
|
print(f"Removed video '{video_title}' [{video_id}]"
|
|
f" from playlist {playlist_name}")
|
|
return True
|
|
except HttpError as e:
|
|
msg = (f"Error removing video '{video_title}' [{video_id}]"
|
|
f" from playlist {playlist_name}")
|
|
handle_http_error(e, msg)
|
|
return False
|
|
|
|
|
|
def copy_playlist_items(yt_api,
|
|
src_playlist_id: str,
|
|
dst_playlist_id: str,
|
|
delete_from_src: bool = False,
|
|
limit: int = -1,
|
|
dry_run: bool = False):
|
|
src_playlist_items = list_playlist(yt_api, src_playlist_id)
|
|
if not src_playlist_items:
|
|
print(f"No items found in source playlist {src_playlist_id}")
|
|
return
|
|
dst_playlist_items = list_playlist(yt_api, dst_playlist_id)
|
|
|
|
dst_videos = {pl_item['snippet']['resourceId']['videoId']
|
|
for pl_item in dst_playlist_items}
|
|
processed_amt = 0
|
|
|
|
for src_pl_item in src_playlist_items:
|
|
if 0 <= limit <= processed_amt:
|
|
break
|
|
was_processed = False
|
|
video_id = src_pl_item['snippet']['resourceId']['videoId']
|
|
if video_id not in dst_videos:
|
|
add_video_to_playlist(yt_api, video_id, dst_playlist_id, dry_run)
|
|
was_processed = True
|
|
if delete_from_src:
|
|
remove_video_from_playlist(yt_api, src_pl_item["id"],
|
|
src_playlist_id, dry_run)
|
|
was_processed = True
|
|
if was_processed:
|
|
processed_amt += 1
|
|
|
|
|
|
def get_video_info(youtube, video_id: str):
|
|
try:
|
|
# TODO maybe remove 'status'
|
|
response = youtube.videos().list(
|
|
part="localizations,snippet,contentDetails,"
|
|
"statistics,status,topicDetails",
|
|
id=video_id
|
|
).execute()
|
|
except HttpError as e:
|
|
msg = f'Error getting video info for {video_id}'
|
|
handle_http_error(e, msg)
|
|
return None
|
|
if not response['items']:
|
|
print(f'Video {video_id} not found')
|
|
return None
|
|
return response['items'][0]
|
|
|
|
|
|
def get_playlistitem_info(youtube, playlistitem_id: str):
|
|
try:
|
|
response = youtube.playlistItems().list(
|
|
part="snippet,contentDetails",
|
|
id=playlistitem_id
|
|
).execute()
|
|
except HttpError as e:
|
|
msg = f'Error getting playlist item {playlistitem_id}'
|
|
handle_http_error(e, msg)
|
|
return None
|
|
if not response['items']:
|
|
print(f'Playlist item {playlistitem_id} not found')
|
|
return None
|
|
return response['items'][0]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-n', '--dry-run', action='store_true',
|
|
help='Dry run, do not send changes to YoutubeAPI')
|
|
subparsers = parser.add_subparsers(title='commands', dest='command')
|
|
|
|
parser_add = subparsers.add_parser('add', help='Add videos to a playlist')
|
|
parser_add.add_argument('playlist', help='Playlist name/ID')
|
|
parser_add.add_argument('video_ids', nargs='*', help='Video IDs to add')
|
|
|
|
parser_add_csv = subparsers.add_parser(
|
|
'add-csv', help='Add videos to a playlist from a CSV file'
|
|
)
|
|
parser_add_csv.add_argument('playlist', help='Playlist name/ID')
|
|
parser_add_csv.add_argument('csv', help='CSV file with video IDs')
|
|
parser_add_csv.add_argument('-l', '--limit', type=int, default=-1,
|
|
help='Limit number of videos to process')
|
|
|
|
parser_copy = subparsers.add_parser(
|
|
'copy', help='Copy videos from one playlist to another'
|
|
)
|
|
parser_copy.add_argument('src_playlist', help='Source playlist name/ID')
|
|
parser_copy.add_argument('dst_playlist',
|
|
help='Destination playlist name/ID')
|
|
parser_copy.add_argument('-l', '--limit', type=int, default=-1,
|
|
help='Limit number of videos to process')
|
|
|
|
parser_move = subparsers.add_parser(
|
|
'move',
|
|
help='Move videos from one playlist to another'
|
|
)
|
|
parser_move.add_argument('src_playlist', help='Source playlist name/ID')
|
|
parser_move.add_argument('dst_playlist',
|
|
help='Destination playlist name/ID')
|
|
parser_move.add_argument('-l', '--limit', type=int, default=-1,
|
|
help='Limit number of videos to process')
|
|
|
|
parser_dups = subparsers.add_parser(
|
|
'dups', help='Remove duplicate videos in a playlist'
|
|
)
|
|
parser_dups.add_argument('playlist', help='Playlist name/ID')
|
|
parser_dups.add_argument('-l', '--limit', type=int, default=-1,
|
|
help='Limit number of videos to process')
|
|
|
|
parser_download = subparsers.add_parser(
|
|
'download', help='Download videos from a playlist'
|
|
)
|
|
parser_download.add_argument('playlist', help='Playlist name/ID')
|
|
parser_download.add_argument('dst_folder', help='Destination folder')
|
|
parser_download.add_argument('-l', '--limit', type=int, default=-1,
|
|
help='Limit number of videos to process')
|
|
parser_download.add_argument(
|
|
'-r', '--remove-from-playlist',
|
|
action='store_true', help='Remove downloaded videos from the playlist'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
if args.command is None:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
# Disable OAuthlib's HTTPS verification when running locally.
|
|
# *DO NOT* leave this option enabled in production.
|
|
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
|
|
api_service_name = "youtube"
|
|
api_version = "v3"
|
|
creds = get_yt_creds()
|
|
youtube = build(api_service_name, api_version, credentials=creds)
|
|
|
|
if args.command in ('copy', 'move'):
|
|
delete_from_src = args.command == 'move'
|
|
copy_playlist_items(
|
|
youtube,
|
|
get_playlist_id(args.src_playlist),
|
|
get_playlist_id(args.dst_playlist),
|
|
delete_from_src=delete_from_src,
|
|
limit=args.limit,
|
|
dry_run=args.dry_run
|
|
)
|
|
|
|
elif args.command == 'add':
|
|
playlist_id = get_playlist_id(args.playlist)
|
|
# {video_id: video_title} for videos already in the playlist
|
|
pl_videos = {
|
|
pl_item['snippet']['resourceId']['videoId']:
|
|
pl_item['snippet']['title']
|
|
for pl_item in list_playlist(youtube, playlist_id)
|
|
}
|
|
|
|
for video_id in args.video_ids:
|
|
if video_id in pl_videos:
|
|
short_title = _truncate_title(pl_videos[video_id])
|
|
print(f"Video '{short_title}' [{video_id}]"
|
|
f" is already in playlist {args.playlist}")
|
|
continue
|
|
add_video_to_playlist(youtube, video_id, playlist_id, args.dry_run)
|
|
|
|
elif args.command == 'add-csv':
|
|
video_ids = []
|
|
with open(args.csv, newline='') as csvfile:
|
|
reader = csv.reader(csvfile)
|
|
next(reader, None) # skip the headers
|
|
video_ids.extend(row[0] for row in reader if row)
|
|
playlist_id = get_playlist_id(args.playlist)
|
|
# {video_id: video_title} for videos already in the playlist
|
|
pl_videos = {
|
|
pl_item['snippet']['resourceId']['videoId']:
|
|
pl_item['snippet']['title']
|
|
for pl_item in list_playlist(youtube, playlist_id)
|
|
}
|
|
processed = 0
|
|
for video_id in video_ids:
|
|
if 0 <= args.limit <= processed:
|
|
break
|
|
if video_id in pl_videos:
|
|
continue
|
|
processed += int(add_video_to_playlist(
|
|
youtube, video_id, playlist_id, args.dry_run
|
|
))
|
|
|
|
elif args.command == "dups":
|
|
processed = 0
|
|
playlist_id = get_playlist_id(args.playlist)
|
|
plitems = list_playlist(youtube, playlist_id)
|
|
plitems_processed = set()
|
|
for plitem in plitems:
|
|
if 0 <= args.limit <= processed:
|
|
break
|
|
video_id = plitem["snippet"]["resourceId"]["videoId"]
|
|
if video_id in plitems_processed:
|
|
remove_video_from_playlist(youtube, plitem["id"], playlist_id,
|
|
args.dry_run)
|
|
processed += 1
|
|
else:
|
|
plitems_processed.add(video_id)
|
|
|
|
elif args.command == "download":
|
|
db = TinyDB(os.path.join(program_dir, 'db.json'))
|
|
query = Query()
|
|
ydl_opts = {
|
|
'outtmpl': os.path.join(args.dst_folder, DEFAULT_OUTPUT_TMPL),
|
|
'format': DEFAULT_FORMAT,
|
|
}
|
|
|
|
# load playlist items
|
|
playlist_id = get_playlist_id(args.playlist)
|
|
plitems = list_playlist(youtube, playlist_id)
|
|
|
|
# limit number of videos to process
|
|
if args.limit > 0:
|
|
plitems = plitems[:args.limit]
|
|
|
|
for plitem in plitems:
|
|
video_id = plitem["snippet"]["resourceId"]["videoId"]
|
|
# skip if video is already in the database
|
|
if db.search(query.id == video_id):
|
|
continue
|
|
|
|
video_info = get_video_info(youtube, video_id)
|
|
# skip if video is not found on YouTube
|
|
if not video_info:
|
|
continue
|
|
|
|
video_title = _truncate_title(video_info['snippet']['title'])
|
|
if args.dry_run:
|
|
print(f"Would download video '{video_title}' [{video_id}]"
|
|
f" from playlist {args.playlist}"
|
|
f" to folder {args.dst_folder}")
|
|
|
|
else:
|
|
# download video
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download(
|
|
['https://www.youtube.com/watch?v=' + video_id]
|
|
)
|
|
db.insert(video_info)
|
|
|
|
# remove video from playlist
|
|
if args.remove_from_playlist:
|
|
remove_video_from_playlist(
|
|
youtube, plitem["id"], playlist_id, args.dry_run
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|