197 lines
6.7 KiB
Python
Executable File
197 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import sys
|
|
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
|
|
_playlists = {}
|
|
|
|
|
|
def _truncate_title(title: str, length: int = 30) -> str:
|
|
if length <= 0:
|
|
return ""
|
|
if length <= 3:
|
|
return title[:length]
|
|
if len(title) <= length:
|
|
return title
|
|
return title[:length-3] + (title[length:] and '...')
|
|
|
|
def get_yt_creds():
|
|
""" Get YouTube API credentials """
|
|
creds = None
|
|
client_secrets_file = "client_secrets_file.json"
|
|
scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]
|
|
|
|
# The file token.json stores the user's access and refresh tokens, and is
|
|
# created automatically when the authorization flow completes for the first
|
|
# time.
|
|
if os.path.exists("token.json"):
|
|
creds = Credentials.from_authorized_user_file("token.json", scopes)
|
|
|
|
# If there are no (valid) credentials available, let the user log in.
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
else:
|
|
flow = InstalledAppFlow.from_client_secrets_file(
|
|
client_secrets_file, scopes
|
|
)
|
|
creds = flow.run_local_server(port=0)
|
|
# Save the credentials for the next run
|
|
with open("token.json", "w") as token:
|
|
token.write(creds.to_json())
|
|
|
|
return creds
|
|
|
|
|
|
def read_playlists_file():
|
|
""" Read playlists.csv and return a dictionary of playlist names to playlist IDs """
|
|
global _playlists
|
|
if not os.path.exists('playlists.csv'):
|
|
print('playlists.csv not found')
|
|
return {}
|
|
with open('playlists.csv', newline='') as csvfile:
|
|
reader = csv.DictReader(csvfile)
|
|
_playlists = {row['name']: row['playlist_id'] for row in reader}
|
|
|
|
|
|
def get_playlist_id(playlist_name: str) -> str:
|
|
if not _playlists:
|
|
read_playlists_file()
|
|
return _playlists.get(playlist_name, playlist_name)
|
|
|
|
|
|
def get_playlist_name(playlist_id: str) -> str:
|
|
if not _playlists:
|
|
read_playlists_file()
|
|
return next((name for name, id in _playlists.items() if id == playlist_id), playlist_id)
|
|
|
|
|
|
def get_videos(yt_api, playlist_id):
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
videos = []
|
|
fetched = 0
|
|
overall = -1
|
|
page_token = ""
|
|
try:
|
|
while fetched < overall or overall == -1:
|
|
response = yt_api.playlistItems().list(
|
|
part="snippet,contentDetails",
|
|
maxResults=50,
|
|
playlistId=playlist_id,
|
|
pageToken=page_token
|
|
).execute()
|
|
if overall == -1:
|
|
overall = response['pageInfo']['totalResults']
|
|
fetched += len(response['items'])
|
|
page_token = response.get('nextPageToken', "")
|
|
for item in response['items']:
|
|
videos.append(item)
|
|
except HttpError as e:
|
|
print(f'Error getting video IDs from playlist {playlist_name}: {e}')
|
|
|
|
print(f'Fetched {fetched} videos from playlist {playlist_name}')
|
|
return videos
|
|
|
|
|
|
def add_video_to_playlist(yt_api, video, playlist_id, dry_run: bool = False) -> bool:
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
video_id = video['snippet']['resourceId']['videoId']
|
|
video_title = _truncate_title(video['snippet']['title'])
|
|
try:
|
|
if dry_run:
|
|
print(f"Would add video '{video_title}' [{video_id}] to playlist {playlist_name}")
|
|
return True
|
|
yt_api.playlistItems().insert(
|
|
part='snippet',
|
|
body={
|
|
'snippet': {
|
|
'playlistId': playlist_id,
|
|
'resourceId': {
|
|
'kind': 'youtube#video',
|
|
'videoId': video_id
|
|
}
|
|
}
|
|
}
|
|
).execute()
|
|
print(f"Added video '{video_title}' [{video_id}] to playlist {playlist_name}")
|
|
return True
|
|
except HttpError as e:
|
|
print(f"Error adding video '{video_title}' [{video_id}] to playlist {playlist_name}: {e}")
|
|
return False
|
|
|
|
|
|
def remove_video_from_playlist(yt_api, video, playlist_id, dry_run: bool = False) -> bool:
|
|
playlist_name = get_playlist_name(playlist_id)
|
|
video_id = video['snippet']['resourceId']['videoId']
|
|
video_title = _truncate_title(video['snippet']['title'])
|
|
try:
|
|
if dry_run:
|
|
print(f"Would remove video '{video_title}' [{video_id}] from playlist {playlist_name}")
|
|
return True
|
|
yt_api.playlistItems().delete(
|
|
id=video['id']
|
|
).execute()
|
|
print(f"Removed video '{video_title}' [{video_id}] from playlist {playlist_name}")
|
|
return True
|
|
except HttpError as e:
|
|
print(f"Error removing video '{video_title}' [{video_id}] from playlist {playlist_name}: {e}")
|
|
return False
|
|
|
|
|
|
def move_all_videos(yt_api, src_playlist: str, dst_playlist: str, limit: int = -1, dry_run: bool = False):
|
|
if limit < 0:
|
|
limit = len(src_playlist)
|
|
|
|
src_videos = get_videos(yt_api, src_playlist)
|
|
dst_videos = get_videos(yt_api, dst_playlist)
|
|
|
|
dst_video_map = {video['snippet']['resourceId']['videoId']: video['id']
|
|
for video in dst_videos}
|
|
|
|
for src_video in src_videos[:limit]:
|
|
video_id = src_video['snippet']['resourceId']['videoId']
|
|
if video_id not in dst_video_map:
|
|
add_video_to_playlist(yt_api, src_video, dst_playlist, dry_run)
|
|
remove_video_from_playlist(yt_api, src_video, src_playlist, dry_run)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('src_playlist', help='Source playlist ID')
|
|
parser.add_argument('dst_playlist', help='Destination playlist ID')
|
|
parser.add_argument(
|
|
'-l', '--limit', help='Limit number of videos to move', type=int, default=-1
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--dry-run', action='store_true',
|
|
help='Dry run, do not send changes to YoutubeAPI'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Disable OAuthlib's HTTPS verification when running locally.
|
|
# *DO NOT* leave this option enabled in production.
|
|
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
|
|
|
|
src_playlist = get_playlist_id(args.src_playlist)
|
|
dst_playlist = get_playlist_id(args.dst_playlist)
|
|
|
|
api_service_name = "youtube"
|
|
api_version = "v3"
|
|
|
|
creds = get_yt_creds()
|
|
youtube = build(api_service_name, api_version, credentials=creds)
|
|
|
|
move_all_videos(youtube, src_playlist, dst_playlist,
|
|
limit=args.limit, dry_run=args.dry_run)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|