From 21adb3acdc74f24f753c0360ffcc1191bd9e7895 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 8 Sep 2021 21:46:22 -0400 Subject: [PATCH] Get the label playlist example working This is the first relatively big milestone for me. --- examples/label_playlist.py | 47 +++++++++++++++++++++++ pyproject.toml | 4 ++ spotify_actions/album.py | 17 +++++++++ spotify_actions/playlist.py | 76 +++++++++++++++++++++++++++++++++++++ spotify_actions/temporal.py | 30 ++++++++++----- spotify_actions/user.py | 11 ++++++ spotify_actions/util.py | 17 +++++++-- 7 files changed, 190 insertions(+), 12 deletions(-) create mode 100644 examples/label_playlist.py create mode 100644 spotify_actions/playlist.py create mode 100644 spotify_actions/user.py diff --git a/examples/label_playlist.py b/examples/label_playlist.py new file mode 100644 index 0000000..6b76420 --- /dev/null +++ b/examples/label_playlist.py @@ -0,0 +1,47 @@ +# pylint: disable=missing-module-docstring, missing-function-docstring + +from argparse import ArgumentParser + +from spotify_actions.album import ( + album_filter_label, + album_sort_release, + album_to_simplified, + album_to_tracks, +) +from spotify_actions.playlist import ( + playlist_current_user_all, + playlist_current_user_assure, + playlist_replace, +) +from spotify_actions.search import Query, search_albums +from spotify_actions.util import read_credentials_oauth + + +def main() -> None: + parser = ArgumentParser() + parser.add_argument("-c", "--credentials", required=True) + parser.add_argument("label") + + cmdline = parser.parse_args() + + client = read_credentials_oauth( + cmdline.credentials, + redirect_uri="https://speice.io/spotify/", + scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"], + ) + + user_playlists = playlist_current_user_all(client) + playlists = playlist_current_user_assure(client, user_playlists, cmdline.label) + playlist = list(playlists)[0] + + albums_search = search_albums(client, Query(label=cmdline.label)) + albums_unfiltered = album_to_simplified(client, albums_search) + albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label) + albums = album_sort_release(albums_unsorted, descending=True) + tracks = album_to_tracks(client, albums) + + playlist_replace(client, playlist.spotify_id, tracks) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 21e0ec5..9b53490 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[tool.isort] +profile = "black" +multi_line_output = 3 + [tool.poetry] name = "spotify_actions" version = "0.1.0" diff --git a/spotify_actions/album.py b/spotify_actions/album.py index bc3520b..ddf6bd6 100644 --- a/spotify_actions/album.py +++ b/spotify_actions/album.py @@ -7,9 +7,26 @@ from typing import Iterable from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack from spotipy import Spotify +from .temporal import temporal_convert from .util import chunk, exhaust +def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]: + "Filter albums that match an exact label string" + for album in albums: + if album.label == label: + yield album + + +def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]: + "Sort albums by release date" + all_albums = list(albums) + for album in sorted( + all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending + ): + yield album + + def album_to_simplified( client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20 ) -> Iterable[SimplifiedAlbum]: diff --git a/spotify_actions/playlist.py b/spotify_actions/playlist.py new file mode 100644 index 0000000..ba787d8 --- /dev/null +++ b/spotify_actions/playlist.py @@ -0,0 +1,76 @@ +""" +Selectors for querying and modifying playlists +""" +from typing import Iterable + +from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack +from spotipy import Spotify + +from .user import user_current +from .util import chunk, exhaust + + +def playlist_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist: + """ + Create a playlist for a user + + https://developer.spotify.com/documentation/web-api/reference/#endpoint-create-playlist + """ + return SimplifiedPlaylist(**client.user_playlist_create(user_id, name)) + + +def playlist_current_user_all(client: Spotify) -> Iterable[SimplifiedPlaylist]: + """ + Get all playlists that belong to the current user + """ + + def _playlists(limit: int, offset: int) -> Paging: + return Paging(**client.current_user_playlists(limit, offset)) + + for playlist in exhaust(_playlists): + yield SimplifiedPlaylist(**playlist) + + +def playlist_current_user_find(playlists: Iterable[SimplifiedPlaylist], name: str) -> Iterable[str]: + """ + Find all playlists for the current user that match a specific name + """ + for playlist in playlists: + if playlist.name == name: + yield playlist + + +def playlist_current_user_assure( + client: Spotify, playlists: Iterable[SimplifiedPlaylist], name: str +) -> Iterable[SimplifiedPlaylist]: + """ + Find (or create if it doesn't exist) all playlists for the current user that match a specific name + """ + found = False + + for playlist in playlist_current_user_find(playlists, name): + found = True + yield playlist + + if not found: + current_user_id = user_current(client).spotify_id + playlist_create(client, current_user_id, name) + + +def playlist_replace( + client: Spotify, playlist_id: str, tracks: Iterable[SimplifiedTrack], chunk_size: int = 100 +) -> None: + """ + Replace all tracks in a playlist with the new provided tracks + """ + all_tracks = [t.spotify_id for t in tracks] + + first = True + + for track_chunk in chunk(all_tracks, chunk_size): + if first: + client.playlist_replace_items(playlist_id, track_chunk) + first = False + continue + + client.playlist_add_items(playlist_id, track_chunk) diff --git a/spotify_actions/temporal.py b/spotify_actions/temporal.py index df929f3..127c879 100644 --- a/spotify_actions/temporal.py +++ b/spotify_actions/temporal.py @@ -10,6 +10,26 @@ from typing import Iterable from spotify_model import ReleaseDatePrecision, SearchAlbum +def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date: + """ + For date strings that don't have date-level precision, the date is treated as the final day within that period; + thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are + treated as "1981-07-31" + """ + + if precision == ReleaseDatePrecision.YEAR: + actual = datetime.strptime(date_str, "%Y") + effective = date(actual.year, 12, 31) + elif precision == ReleaseDatePrecision.MONTH: + actual = datetime.strptime(date_str, "%Y-%m") + final_day = monthrange(actual.year, actual.month)[1] - 1 + effective = date(actual.year, actual.month, final_day) + else: + effective = datetime.strptime(date_str, "%Y-%m-%d").date() + + return effective + + def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]: """ Filter albums to after a specific release date. @@ -22,15 +42,7 @@ def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) logger = getLogger(__name__) for album in albums: - if album.release_date_precision == ReleaseDatePrecision.YEAR: - actual_release = datetime.strptime(album.release_date, "%Y") - effective_release = date(actual_release.year, 12, 31) - elif album.release_date_precision == ReleaseDatePrecision.MONTH: - actual_release = datetime.strptime(album.release_date, "%Y-%m") - final_day = monthrange(actual_release.year, actual_release.month)[1] - 1 - effective_release = date(actual_release.year, actual_release.month, final_day) - else: - effective_release = datetime.strptime(album.release_date, "%Y-%m-%d").date() + effective_release = temporal_convert(album.release_date, album.release_date_precision) if effective_release >= released_after: logger.debug( diff --git a/spotify_actions/user.py b/spotify_actions/user.py new file mode 100644 index 0000000..47589eb --- /dev/null +++ b/spotify_actions/user.py @@ -0,0 +1,11 @@ +""" +Actions related to user information +""" + +from spotify_model import PrivateUser +from spotipy import Spotify + + +def user_current(client: Spotify) -> PrivateUser: + "Get all details of the current user" + return PrivateUser(**client.current_user()) diff --git a/spotify_actions/util.py b/spotify_actions/util.py index 1021c58..11a8405 100644 --- a/spotify_actions/util.py +++ b/spotify_actions/util.py @@ -7,15 +7,15 @@ from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar import yaml from spotify_model import Paging -from spotipy import Spotify, SpotifyClientCredentials +from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth DEFAULT_LIMIT = 50 T = TypeVar("T") # pylint: disable=invalid-name -def read_credentials(path: Path) -> Spotify: - "Read credentials from a YAML file and construct a Spotify client" +def read_credentials_server(path: Path) -> Spotify: + "Read credentials from a YAML file and construct a Spotify client using the server workflow" with open(path, "r") as credentials_file: credentials = yaml.safe_load(credentials_file) @@ -23,6 +23,17 @@ def read_credentials(path: Path) -> Spotify: return Spotify(client_credentials_manager=credentials_manager) +def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str]) -> Spotify: + "Read credentials from a YAML file and authorize a user" + + with open(path, "r") as credentials_file: + credentials = yaml.safe_load(credentials_file) + credentials_manager = SpotifyOAuth( + credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes + ) + return Spotify(client_credentials_manager=credentials_manager) + + class Paginated(Protocol): "Protocol definition for functions that will be provided to the `exhaust` handler"