diff --git a/examples/recent_albums.py b/examples/recent_albums.py new file mode 100644 index 0000000..2cdae6f --- /dev/null +++ b/examples/recent_albums.py @@ -0,0 +1,35 @@ +# pylint: disable=missing-module-docstring, missing-function-docstring + +from argparse import ArgumentParser +from datetime import date, timedelta + +from spotify_actions.album import album_to_simplified, album_to_tracks +from spotify_actions.combinator import combinator_join +from spotify_actions.echo import echo_tracks +from spotify_actions.search import Query, search_albums +from spotify_actions.temporal import temporal_released_after +from spotify_actions.util import read_credentials + + +def main() -> None: + parser = ArgumentParser() + parser.add_argument("-c", "--credentials", required=True) + parser.add_argument("label", nargs="+") + + cmdline = parser.parse_args() + + today = date.today() + four_weeks = timedelta(days=28) + + client = read_credentials(cmdline.credentials) + + label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label] + albums_search = combinator_join(*label_albums_search) + albums_search_recent = temporal_released_after(albums_search, today - four_weeks) + albums_recent = album_to_simplified(client, albums_search_recent) + tracks_recent = album_to_tracks(client, albums_recent) + echo_tracks(tracks_recent) + + +if __name__ == "__main__": + main() diff --git a/examples/recent_releases.py b/examples/recent_releases.py deleted file mode 100644 index f607969..0000000 --- a/examples/recent_releases.py +++ /dev/null @@ -1,32 +0,0 @@ -# pylint: disable=missing-module-docstring, missing-function-docstring - -from argparse import ArgumentParser -from datetime import date, timedelta - -from spotify_actions.echo import echo_album -from spotify_actions.join import join_albums -from spotify_actions.search import search_albums -from spotify_actions.temporal import temporal_released_after -from spotify_actions.util import read_credentials - - -def main() -> None: - parser = ArgumentParser() - parser.add_argument("-c", "--credentials", required=True) - parser.add_argument("label", nargs="+") - - cmdline = parser.parse_args() - - today = date.today() - four_weeks = timedelta(days=28) - - client = read_credentials(cmdline.credentials) - - label_albums = [search_albums(client, label=l) for l in cmdline.label] - albums = join_albums(*label_albums) - albums_recent = temporal_released_after(albums, today - four_weeks) - echo_album(albums_recent) - - -if __name__ == "__main__": - main() diff --git a/examples/search_tracks.py b/examples/search_tracks.py new file mode 100644 index 0000000..ebb385e --- /dev/null +++ b/examples/search_tracks.py @@ -0,0 +1,25 @@ +# pylint: disable=missing-module-docstring, missing-function-docstring + +from argparse import ArgumentParser + +from spotify_actions.combinator import combinator_take +from spotify_actions.echo import echo_tracks +from spotify_actions.search import search_tracks +from spotify_actions.util import read_credentials + + +def main() -> None: + parser = ArgumentParser() + parser.add_argument("-c", "--credentials", required=True) + parser.add_argument("query") + + cmdline = parser.parse_args() + + client = read_credentials(cmdline.credentials) + tracks = search_tracks(client, cmdline.query) + tracks_take = combinator_take(tracks, count=100) + echo_tracks(tracks_take) + + +if __name__ == "__main__": + main() diff --git a/spotify_actions/album.py b/spotify_actions/album.py new file mode 100644 index 0000000..bc3520b --- /dev/null +++ b/spotify_actions/album.py @@ -0,0 +1,36 @@ +""" +Selectors for working with albums +""" +from functools import partial +from typing import Iterable + +from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack +from spotipy import Spotify + +from .util import chunk, exhaust + + +def album_to_simplified( + client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20 +) -> Iterable[SimplifiedAlbum]: + "Retrieve the actual album objects associated with the albums received from searching" + + for album_chunk in chunk(albums, chunk_size): + album_ids = [a.spotify_id for a in album_chunk] + + for album in client.albums(albums=album_ids)["albums"]: + yield SimplifiedAlbum(**album) + + +def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]: + "Convert an album stream to the tracks on that album" + + def _album_tracks(album_id: str, limit: int, offset: int) -> Paging: + return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset)) + + # Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent + for album in albums: + tracks_function = partial(client.album_tracks, album_id=album.spotify_id) + + for track in exhaust(tracks_function, album.tracks): + yield SimplifiedTrack(**track) diff --git a/spotify_actions/combinator.py b/spotify_actions/combinator.py new file mode 100644 index 0000000..3859ce2 --- /dev/null +++ b/spotify_actions/combinator.py @@ -0,0 +1,29 @@ +""" +Join objects from multiple sources into a single stream +""" +from functools import partial +from typing import Iterable, TypeVar + +T = TypeVar("T") # pylint: disable=invalid-name + + +def combinator_take(items: Iterable[T], count: int) -> Iterable[T]: + "Retrieve the first `count` items from an iterator" + observed = 0 + for item in items: + observed += 1 + yield item + + if observed >= count: + break + + +combinator_first = partial(combinator_take, count=1) + + +def combinator_join(*args: Iterable[T]) -> Iterable[T]: + "Join the results of many album producers by exhausting all albums from each producer" + + for arg in args: + for item in arg: + yield item diff --git a/spotify_actions/echo.py b/spotify_actions/echo.py index f89215e..d11cfcf 100644 --- a/spotify_actions/echo.py +++ b/spotify_actions/echo.py @@ -4,10 +4,18 @@ check results before committing. """ from typing import Iterable -from spotify_model.album import SearchAlbum +from spotify_model import SearchAlbum, SimplifiedTrack -def echo_album(albums: Iterable[SearchAlbum]) -> None: +def echo_albums(albums: Iterable[SearchAlbum]) -> None: "Print album metadata" + for album in albums: print(album.name) + + +def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None: + "Print track metadata" + + for track in tracks: + print(track.name) diff --git a/spotify_actions/join.py b/spotify_actions/join.py deleted file mode 100644 index 2dd4e94..0000000 --- a/spotify_actions/join.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -Join objects from multiple sources into a single stream -""" -from typing import Iterable - -from spotify_model import SearchAlbum - - -def join_albums(*args: Iterable[SearchAlbum]) -> Iterable[SearchAlbum]: - "Join the results of many album producers by exhausting all albums from each producer" - - for arg in args: - for album in arg: - yield album diff --git a/spotify_actions/search.py b/spotify_actions/search.py index 371ea50..9d301e0 100644 --- a/spotify_actions/search.py +++ b/spotify_actions/search.py @@ -1,35 +1,60 @@ """ Utility methods for the Spotify query API """ -from typing import Iterable, Optional +from functools import partial +from typing import Iterable, Optional, Union -from spotify_model import Paging, SearchAlbum +from spotify_model import Paging, SearchAlbum, SimplifiedTrack from spotipy import Spotify from .util import exhaust -def _search_albums(client: Spotify, search_str: str) -> Paging: - def _search(limit: int, offset: int) -> Paging: - return Paging(**client.search(search_str, limit=limit, offset=offset, type="album")["albums"]) +class Query: + "Query builder for Spotify search API" - return _search + def __init__(self, query: Optional[str] = None, artist: Optional[str] = None, label: Optional[str] = None) -> None: + self.query = query + self.artist = artist + self.label = label + + def __str__(self) -> str: + query_items = [ + self.query, + f'artist:"{self.artist}"' if self.artist is not None else None, + f'label:"{self.label}"' if self.label is not None else None, + ] + return " ".join([i for i in query_items if i is not None]) -def search_albums( - client: Spotify, search_str: Optional[str] = None, artist: Optional[str] = None, label: Optional[str] = None -) -> Iterable[SearchAlbum]: - "Display albums from a search string" +# pylint: disable=too-many-arguments +def _search(client: Spotify, query_str: str, query_type: str, item_key: str, limit: int, offset: int) -> Paging: + return Paging(**client.search(query_str, type=query_type, limit=limit, offset=offset)[item_key]) - query_items = [ - search_str, - f'artist:"{artist}"' if artist is not None else None, - f'label:"{label}"' if label is not None else None, - ] - query_str = " ".join([i for i in query_items if i is not None]) + +def search_albums(client: Spotify, query: Union[str, Query]) -> Iterable[SearchAlbum]: + "Retrieve albums from a search string" + + query = query if isinstance(query, Query) else Query(query) + query_str = str(query) if not query_str: return - for item in exhaust(_search_albums(client, query_str)): + search_function = partial(_search, client, query_str, "album", "albums") + for item in exhaust(search_function): yield SearchAlbum(**item) + + +def search_tracks(client: Spotify, query: Union[str, Query]) -> Iterable[SimplifiedTrack]: + "Retrieve tracks from a search string" + + query = query if isinstance(query, Query) else Query(query) + query_str = str(query) + + if not query_str: + return + + search_function = partial(_search, client, query_str, "track", "tracks") + for item in exhaust(search_function): + yield SimplifiedTrack(**item) diff --git a/spotify_actions/util.py b/spotify_actions/util.py index e9b88e8..1021c58 100644 --- a/spotify_actions/util.py +++ b/spotify_actions/util.py @@ -3,7 +3,7 @@ Utility methods for working with the Spotify API """ from math import ceil from pathlib import Path -from typing import Any, Dict, Iterable, Protocol +from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar import yaml from spotify_model import Paging @@ -11,6 +11,8 @@ from spotipy import Spotify, SpotifyClientCredentials DEFAULT_LIMIT = 50 +T = TypeVar("T") # pylint: disable=invalid-name + def read_credentials(path: Path) -> Spotify: "Read credentials from a YAML file and construct a Spotify client" @@ -28,9 +30,11 @@ class Paginated(Protocol): ... -def exhaust(function: Paginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[str, Any]]: +def exhaust( + function: Paginated, initial: Optional[Paging] = None, limit: int = DEFAULT_LIMIT +) -> Iterable[Dict[str, Any]]: "Exhaust a function that returns a pagination object" - response = function(limit=limit, offset=0) + response = initial if initial is not None else function(limit=limit, offset=0) for item in response.items: yield item @@ -39,3 +43,20 @@ def exhaust(function: Paginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[st response = function(limit=response.limit, offset=response.limit * i) for item in response.items: yield item + + +def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]: + "Split an iterable into smaller chunks" + + assert size >= 1 + + return_items = [] + for item in items: + return_items.append(item) + + if len(return_items) == size: + yield return_items + return_items = [] + + if return_items: + yield return_items