Test that label playlists are working

This commit is contained in:
Bradlee Speice 2021-09-10 22:10:59 -04:00
parent ce8b8a7e00
commit f764e5a8e8
5 changed files with 34 additions and 51 deletions

View File

@ -2,9 +2,8 @@
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Dict, Iterable from typing import Iterable
from spotify_model import SimplifiedAlbum
from spotipy import Spotify from spotipy import Spotify
from spotify_actions.album import ( from spotify_actions.album import (
@ -39,43 +38,38 @@ def label_playlist(client: Spotify, label_name: str, playlist_id: str) -> None:
playlist_replace(client, playlist_id, tracks) playlist_replace(client, playlist_id, tracks)
def label_recent(client: Spotify, playlist_id: str, released_after: date) -> Iterable[SimplifiedAlbum]: def label_recent(client: Spotify, label_playlist_ids: Iterable[str], playlist_id: str, released_after: date) -> None:
album_iterables = []
for label_playlist_id in label_playlist_ids:
# Get all albums in a playlist released after the provided date # Get all albums in a playlist released after the provided date
tracks = playlist_tracks(client, [playlist_id]) tracks = playlist_tracks(client, [label_playlist_id])
albums = track_unique_albums(tracks) album_ids = track_unique_albums(tracks)
albums = album_from_ids(client, album_ids)
# Because the playlists were created in descending release date order, # Because the playlists were created in descending release date order,
# `is_sorted=True` is enabled to reduce the number of API queries needed # `is_sorted=True` is enabled to reduce the number of API queries needed
return album_filter_release(albums, released_after, is_sorted=True) album_iterables.append(album_filter_release(albums, released_after, is_sorted=True))
def run(client: Spotify, recent_releases_id: str, label_ids: Dict[str, str], released_after: date) -> None:
# Create the individual label playlists
for label_name, playlist_id in label_ids.items():
label_playlist(client, label_name, playlist_id)
# Get albums from the playlists we just created
album_iterables = [label_recent(client, playlist_id, released_after) for _, playlist_id in label_ids.items()]
# Merge all the albums from each label playlist # Merge all the albums from each label playlist
recent_albums = combinator_join(*album_iterables) recent_albums = combinator_join(*album_iterables)
recent_tracks = album_to_tracks(client, recent_albums) recent_tracks = album_to_tracks(client, recent_albums)
# Create the recent releases playlist # Create the recent releases playlist
playlist_replace(client, recent_releases_id, recent_tracks) playlist_replace(client, playlist_id, recent_tracks)
def main() -> None: def main() -> None:
one_week_ago = (datetime.now().date() - timedelta(days=7)).strftime("%Y-%m-%d") # Intentionally 6 days - if running on a Friday, we don't want to include last Friday's releases
one_week_ago = (datetime.now().date() - timedelta(days=6)).strftime("%Y-%m-%d")
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True) parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("-r", "--redirect-uri", required=True) parser.add_argument("-r", "--redirect-uri", required=True)
parser.add_argument("--recent-release", help='Name of the "recent releases" playlist constructed from all labels.')
parser.add_argument( parser.add_argument(
"-d", "--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago "--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago
) )
parser.add_argument("recent_release", help='Name of the "recent releases" playlist constructed from all labels')
parser.add_argument("label", nargs="+") parser.add_argument("label", nargs="+")
cmdline = parser.parse_args() cmdline = parser.parse_args()
@ -86,8 +80,6 @@ def main() -> None:
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"], scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
) )
released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d")
# Get all user playlists; we'll be iterating over this a couple times # Get all user playlists; we'll be iterating over this a couple times
user_playlists = list(playlist_current_user_all(client)) user_playlists = list(playlist_current_user_all(client))
@ -97,10 +89,15 @@ def main() -> None:
# The `str()` wrapper is technically unnecessary, but keeps mypy happy # The `str()` wrapper is technically unnecessary, but keeps mypy happy
return str(list(assured)[0].spotify_id) return str(list(assured)[0].spotify_id)
recent_releases = _locate_playlist(cmdline.recent_release) label_ids = {name: _locate_playlist(name) for name in cmdline.label}
label_playlists = {label: _locate_playlist(label) for label in cmdline.label}
run(client, recent_releases, label_playlists, released_after) for label_name, playlist_id in label_ids.items():
label_playlist(client, label_name, playlist_id)
if cmdline.recent_release:
recent_release = _locate_playlist(cmdline.recent_release)
released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d").date()
label_recent(client, label_ids.values(), recent_release, released_after)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -57,7 +57,7 @@ def album_from_ids(
for album_id_chunk in chunk(_to_id(), chunk_size): for album_id_chunk in chunk(_to_id(), chunk_size):
album_chunk = client.albums(album_id_chunk) album_chunk = client.albums(album_id_chunk)
for album in album_chunk: for album in album_chunk["albums"]:
yield SimplifiedAlbum(**album) yield SimplifiedAlbum(**album)

View File

@ -1,21 +0,0 @@
"""
Methods for printing results to console; primarily useful when developing/debugging pipelines to
check results before committing.
"""
from typing import Iterable
from spotify_model import SearchAlbum, SimplifiedTrack
def echo_albums(albums: Iterable[SearchAlbum]) -> None:
"Print album metadata"
for album in albums:
print(album.name)
def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None:
"Print track metadata"
for track in tracks:
print(track.name)

View File

@ -4,7 +4,7 @@ Selectors for querying and modifying playlists
from functools import partial from functools import partial
from typing import Iterable from typing import Iterable
from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .user import user_current from .user import user_current
@ -89,4 +89,4 @@ def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[Si
playlist_function = partial(_playlist_tracks, playlist_id=playlist_id) playlist_function = partial(_playlist_tracks, playlist_id=playlist_id)
for track in exhaust(playlist_function): for track in exhaust(playlist_function):
yield SimplifiedTrack(**track) yield PlaylistTrack(**track).track

View File

@ -16,6 +16,13 @@ DEFAULT_LIMIT = 50
T = TypeVar("T") # pylint: disable=invalid-name T = TypeVar("T") # pylint: disable=invalid-name
def echo(elements: Iterable[T]) -> Iterable[T]:
"Echo the elements of an iterable and re-yield them"
for element in elements:
print(element)
yield element
def read_credentials_server(path: Path) -> Spotify: def read_credentials_server(path: Path) -> Spotify:
"Read credentials from a YAML file and construct a Spotify client using the server workflow" "Read credentials from a YAML file and construct a Spotify client using the server workflow"