diff --git a/examples/label_playlist.py b/examples/label_playlist.py index 6b76420..7315849 100644 --- a/examples/label_playlist.py +++ b/examples/label_playlist.py @@ -1,46 +1,103 @@ # pylint: disable=missing-module-docstring, missing-function-docstring from argparse import ArgumentParser +from datetime import date, datetime, timedelta +from typing import Iterable + +from spotipy import Spotify from spotify_actions.album import ( album_filter_label, + album_filter_release, + album_from_ids, album_sort_release, - album_to_simplified, album_to_tracks, ) +from spotify_actions.combinator import combinator_join from spotify_actions.playlist import ( playlist_current_user_all, playlist_current_user_assure, playlist_replace, + playlist_tracks, ) from spotify_actions.search import Query, search_albums +from spotify_actions.track import track_unique_albums from spotify_actions.util import read_credentials_oauth +def label_playlist(client: Spotify, label_name: str, playlist_id: str) -> None: + # Given a label name, replace all songs in the provided `playlist_id` with the + # label's songs, ordered by descending release date. + + albums_search = search_albums(client, Query(label=label_name)) + albums_unfiltered = album_from_ids(client, albums_search) + albums_unsorted = album_filter_label(albums_unfiltered, label_name) + albums = album_sort_release(albums_unsorted, descending=True) + tracks = album_to_tracks(client, albums) + + playlist_replace(client, playlist_id, tracks) + + +def label_recent(client: Spotify, label_playlist_ids: Iterable[str], playlist_id: str, released_after: date) -> None: + + album_iterables = [] + for label_playlist_id in label_playlist_ids: + # Get all albums in a playlist released after the provided date + tracks = playlist_tracks(client, [label_playlist_id]) + album_ids = track_unique_albums(tracks) + albums = album_from_ids(client, album_ids) + + # Because the playlists were created in descending release date order, + # `is_sorted=True` is enabled to reduce the number of API queries needed + album_iterables.append(album_filter_release(albums, released_after, is_sorted=True)) + + # Merge all the albums from each label playlist + recent_albums = combinator_join(*album_iterables) + recent_tracks = album_to_tracks(client, recent_albums) + + # Create the recent releases playlist + playlist_replace(client, playlist_id, recent_tracks) + + def main() -> None: + # Intentionally 6 days - if running on a Friday, we don't want to include last Friday's releases + one_week_ago = (datetime.now().date() - timedelta(days=6)).strftime("%Y-%m-%d") + parser = ArgumentParser() parser.add_argument("-c", "--credentials", required=True) - parser.add_argument("label") + parser.add_argument("-r", "--redirect-uri", required=True) + parser.add_argument("--recent-release", help='Name of the "recent releases" playlist constructed from all labels.') + parser.add_argument( + "--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago + ) + parser.add_argument("label", nargs="+") cmdline = parser.parse_args() client = read_credentials_oauth( cmdline.credentials, - redirect_uri="https://speice.io/spotify/", + redirect_uri=cmdline.redirect_uri, scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"], ) - user_playlists = playlist_current_user_all(client) - playlists = playlist_current_user_assure(client, user_playlists, cmdline.label) - playlist = list(playlists)[0] + # Get all user playlists; we'll be iterating over this a couple times + user_playlists = list(playlist_current_user_all(client)) - albums_search = search_albums(client, Query(label=cmdline.label)) - albums_unfiltered = album_to_simplified(client, albums_search) - albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label) - albums = album_sort_release(albums_unsorted, descending=True) - tracks = album_to_tracks(client, albums) + # To simplify, this assumes that the label playlist name is unique for this user + def _locate_playlist(name: str) -> str: + assured = playlist_current_user_assure(client, user_playlists, name) + # The `str()` wrapper is technically unnecessary, but keeps mypy happy + return str(list(assured)[0].spotify_id) - playlist_replace(client, playlist.spotify_id, tracks) + label_ids = {name: _locate_playlist(name) for name in cmdline.label} + + for label_name, playlist_id in label_ids.items(): + label_playlist(client, label_name, playlist_id) + + if cmdline.recent_release: + recent_release = _locate_playlist(cmdline.recent_release) + released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d").date() + label_recent(client, label_ids.values(), recent_release, released_after) if __name__ == "__main__": diff --git a/examples/recent_albums.py b/examples/recent_albums.py deleted file mode 100644 index 2cdae6f..0000000 --- a/examples/recent_albums.py +++ /dev/null @@ -1,35 +0,0 @@ -# pylint: disable=missing-module-docstring, missing-function-docstring - -from argparse import ArgumentParser -from datetime import date, timedelta - -from spotify_actions.album import album_to_simplified, album_to_tracks -from spotify_actions.combinator import combinator_join -from spotify_actions.echo import echo_tracks -from spotify_actions.search import Query, search_albums -from spotify_actions.temporal import temporal_released_after -from spotify_actions.util import read_credentials - - -def main() -> None: - parser = ArgumentParser() - parser.add_argument("-c", "--credentials", required=True) - parser.add_argument("label", nargs="+") - - cmdline = parser.parse_args() - - today = date.today() - four_weeks = timedelta(days=28) - - client = read_credentials(cmdline.credentials) - - label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label] - albums_search = combinator_join(*label_albums_search) - albums_search_recent = temporal_released_after(albums_search, today - four_weeks) - albums_recent = album_to_simplified(client, albums_search_recent) - tracks_recent = album_to_tracks(client, albums_recent) - echo_tracks(tracks_recent) - - -if __name__ == "__main__": - main() diff --git a/examples/search_tracks.py b/examples/search_tracks.py deleted file mode 100644 index ebb385e..0000000 --- a/examples/search_tracks.py +++ /dev/null @@ -1,25 +0,0 @@ -# pylint: disable=missing-module-docstring, missing-function-docstring - -from argparse import ArgumentParser - -from spotify_actions.combinator import combinator_take -from spotify_actions.echo import echo_tracks -from spotify_actions.search import search_tracks -from spotify_actions.util import read_credentials - - -def main() -> None: - parser = ArgumentParser() - parser.add_argument("-c", "--credentials", required=True) - parser.add_argument("query") - - cmdline = parser.parse_args() - - client = read_credentials(cmdline.credentials) - tracks = search_tracks(client, cmdline.query) - tracks_take = combinator_take(tracks, count=100) - echo_tracks(tracks_take) - - -if __name__ == "__main__": - main() diff --git a/spotify_actions/album.py b/spotify_actions/album.py index 7d8104a..aebde6c 100644 --- a/spotify_actions/album.py +++ b/spotify_actions/album.py @@ -1,14 +1,14 @@ """ Selectors for working with albums """ +from datetime import date from functools import partial -from typing import Iterable +from typing import Iterable, Union from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack from spotipy import Spotify -from .temporal import temporal_convert -from .util import chunk, exhaust +from .util import chunk, exhaust, parse_release_date def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]: @@ -18,36 +18,71 @@ def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterabl yield album -def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]: +def album_filter_release( + albums: Iterable[SimplifiedAlbum], released_after: date, is_sorted: bool = False, fast_forward: bool = False +) -> Iterable[SearchAlbum]: + """ + Filter albums to those released on or after a provided date. + + If `is_sorted` is True, iteration will stop once the first album released prior to + `released_after` is encountered (may be useful to avoid extra API calls when + iterating over a pre-sorted playlist). + + See `temporal_convert` for more information on how album release dates are + resolved, and usage of `fast_forward`. + """ + + for album in albums: + effective_release = parse_release_date( + album.release_date, album.release_date_precision, fast_forward=fast_forward + ) + + if effective_release >= released_after: + yield album + elif is_sorted: + return + + +def album_from_ids( + client: Spotify, albums: Iterable[Union[str, SearchAlbum]], chunk_size: int = 20 +) -> Iterable[SimplifiedAlbum]: + """ + Given a stream of album IDs (or base album objects), retrieve the full album objects + """ + + def _to_id() -> Iterable[SimplifiedAlbum]: + for album in albums: + yield album if isinstance(album, str) else album.spotify_id + + for album_id_chunk in chunk(_to_id(), chunk_size): + album_chunk = client.albums(album_id_chunk) + + for album in album_chunk["albums"]: + yield SimplifiedAlbum(**album) + + +def album_sort_release( + albums: Iterable[SearchAlbum], descending: bool = False, fast_forward: bool = False +) -> Iterable[SearchAlbum]: "Sort albums by release date" all_albums = list(albums) - for album in sorted( - all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending - ): + + def _sort_key(album: SearchAlbum) -> date: + return parse_release_date(album.release_date, album.release_date_precision, fast_forward=fast_forward) + + for album in sorted(all_albums, key=_sort_key, reverse=descending): yield album -def album_to_simplified( - client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20 -) -> Iterable[SimplifiedAlbum]: - "Retrieve the actual album objects associated with the albums received from searching" - - for album_chunk in chunk(albums, chunk_size): - album_ids = [a.spotify_id for a in album_chunk] - - for album in client.albums(albums=album_ids)["albums"]: - yield SimplifiedAlbum(**album) - - def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]: "Convert an album stream to the tracks on that album" def _album_tracks(album_id: str, limit: int, offset: int) -> Paging: - return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset)) + return Paging(**client.album_tracks(album_id, limit=limit, offset=offset)) # Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent for album in albums: tracks_function = partial(_album_tracks, album_id=album.spotify_id) - for track in exhaust(tracks_function, album.tracks): + for track in exhaust(tracks_function, initial=album.tracks): yield SimplifiedTrack(**track) diff --git a/spotify_actions/echo.py b/spotify_actions/echo.py deleted file mode 100644 index d11cfcf..0000000 --- a/spotify_actions/echo.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Methods for printing results to console; primarily useful when developing/debugging pipelines to -check results before committing. -""" -from typing import Iterable - -from spotify_model import SearchAlbum, SimplifiedTrack - - -def echo_albums(albums: Iterable[SearchAlbum]) -> None: - "Print album metadata" - - for album in albums: - print(album.name) - - -def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None: - "Print track metadata" - - for track in tracks: - print(track.name) diff --git a/spotify_actions/playlist.py b/spotify_actions/playlist.py index de5a10f..898a06d 100644 --- a/spotify_actions/playlist.py +++ b/spotify_actions/playlist.py @@ -1,9 +1,10 @@ """ Selectors for querying and modifying playlists """ +from functools import partial from typing import Iterable -from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack +from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack from spotipy import Spotify from .user import user_current @@ -74,3 +75,18 @@ def playlist_replace( continue client.playlist_add_items(playlist_id, track_chunk) + + +def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]: + """ + Given a playlist_id, fetch all songs currently in the playlist + """ + + def _playlist_tracks(playlist_id: str, limit: int, offset: int) -> Paging: + return Paging(**client.playlist_items(playlist_id, limit=limit, offset=offset)) + + for playlist_id in playlist_ids: + playlist_function = partial(_playlist_tracks, playlist_id=playlist_id) + + for track in exhaust(playlist_function): + yield PlaylistTrack(**track).track diff --git a/spotify_actions/temporal.py b/spotify_actions/temporal.py deleted file mode 100644 index 127c879..0000000 --- a/spotify_actions/temporal.py +++ /dev/null @@ -1,61 +0,0 @@ -""" -Actions for filtering based on temporal information -""" - -from calendar import monthrange -from datetime import date, datetime -from logging import getLogger -from typing import Iterable - -from spotify_model import ReleaseDatePrecision, SearchAlbum - - -def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date: - """ - For date strings that don't have date-level precision, the date is treated as the final day within that period; - thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are - treated as "1981-07-31" - """ - - if precision == ReleaseDatePrecision.YEAR: - actual = datetime.strptime(date_str, "%Y") - effective = date(actual.year, 12, 31) - elif precision == ReleaseDatePrecision.MONTH: - actual = datetime.strptime(date_str, "%Y-%m") - final_day = monthrange(actual.year, actual.month)[1] - 1 - effective = date(actual.year, actual.month, final_day) - else: - effective = datetime.strptime(date_str, "%Y-%m-%d").date() - - return effective - - -def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]: - """ - Filter albums to after a specific release date. - - For albums that don't have date-level precision, the release date is treated as the final day within that period; - thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are - treated as "1981-07-31" - """ - - logger = getLogger(__name__) - - for album in albums: - effective_release = temporal_convert(album.release_date, album.release_date_precision) - - if effective_release >= released_after: - logger.debug( - "Including album=%s released on date=%s (prior to date=%s)", - album.name, - effective_release, - released_after, - ) - yield album - else: - logger.debug( - "Skipping album=%s released on date=%s (prior to date=%s)", - album.name, - effective_release, - released_after, - ) diff --git a/spotify_actions/track.py b/spotify_actions/track.py new file mode 100644 index 0000000..dc73bc8 --- /dev/null +++ b/spotify_actions/track.py @@ -0,0 +1,43 @@ +""" +Selectors for working with individual tracks +""" +from typing import Iterable, Union + +from spotify_model import SimplifiedTrack, Track +from spotipy import Spotify + +from .util import chunk + + +def track_from_ids( + client: Spotify, tracks: Union[Iterable[str], Iterable[SimplifiedTrack]], chunk_size: int = 50 +) -> Iterable[Track]: + """ + Given a stream of track IDs (or simplified tracks), retrieve the full track objects + """ + + def _to_id() -> Iterable[Track]: + for track in tracks: + yield track if isinstance(track, str) else track.spotify_id + + for track_id_chunk in chunk(_to_id(), chunk_size): + track_chunk = client.tracks(track_id_chunk) + + for track in track_chunk: + yield Track(**track) + + +def track_unique_albums(tracks: Iterable[Track]) -> Iterable[str]: + """ + Given a stream of tracks, yield all unique album IDs + """ + + album_ids = set() + + for track in tracks: + album_id = track.album.spotify_id + if album_id in album_ids: + continue + + album_ids.add(album_id) + yield album_id diff --git a/spotify_actions/util.py b/spotify_actions/util.py index 11a8405..6a4a946 100644 --- a/spotify_actions/util.py +++ b/spotify_actions/util.py @@ -1,12 +1,14 @@ """ Utility methods for working with the Spotify API """ +from calendar import monthrange +from datetime import date, datetime from math import ceil from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar import yaml -from spotify_model import Paging +from spotify_model import Paging, ReleaseDatePrecision from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth DEFAULT_LIMIT = 50 @@ -14,6 +16,13 @@ DEFAULT_LIMIT = 50 T = TypeVar("T") # pylint: disable=invalid-name +def echo(elements: Iterable[T]) -> Iterable[T]: + "Echo the elements of an iterable and re-yield them" + for element in elements: + print(element) + yield element + + def read_credentials_server(path: Path) -> Spotify: "Read credentials from a YAML file and construct a Spotify client using the server workflow" @@ -71,3 +80,33 @@ def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]: if return_items: yield return_items + + +def parse_release_date(date_str: str, precision: ReleaseDatePrecision, fast_forward: bool = False) -> date: + """ + Parse a date string with provided precision to a concrete date. + + `fast_forward` controls how precision is resolved: + - If `False` (default), dates are assumed to be at the start of the period + (e.g. "1970" becomes "1970-01-01", "1970-08" becomes "1907-08-01") + - If `True`, dates are "fast-forwarded" to the end of the given period + (e.g. "1970" becomes "1970-12-31", "1970-08" becomes "1970-08-31") + """ + + if precision == ReleaseDatePrecision.YEAR: + effective = datetime.strptime(date_str, "%Y").date() + + if fast_forward: + effective = date(effective.year, 12, 31) + + elif precision == ReleaseDatePrecision.MONTH: + effective = datetime.strptime(date_str, "%Y-%m").date() + + if fast_forward: + final_day = monthrange(effective.year, effective.month)[1] - 1 + effective = date(effective.year, effective.month, final_day) + + else: + effective = datetime.strptime(date_str, "%Y-%m-%d").date() + + return effective