Start adding tracks functionality

Next step is turning those tracks into a playlist
This commit is contained in:
Bradlee Speice 2021-07-04 02:28:24 -04:00
parent 78009dfbe6
commit 8d12f9dd1b
9 changed files with 201 additions and 68 deletions

35
examples/recent_albums.py Normal file
View File

@ -0,0 +1,35 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from datetime import date, timedelta
from spotify_actions.album import album_to_simplified, album_to_tracks
from spotify_actions.combinator import combinator_join
from spotify_actions.echo import echo_tracks
from spotify_actions.search import Query, search_albums
from spotify_actions.temporal import temporal_released_after
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args()
today = date.today()
four_weeks = timedelta(days=28)
client = read_credentials(cmdline.credentials)
label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label]
albums_search = combinator_join(*label_albums_search)
albums_search_recent = temporal_released_after(albums_search, today - four_weeks)
albums_recent = album_to_simplified(client, albums_search_recent)
tracks_recent = album_to_tracks(client, albums_recent)
echo_tracks(tracks_recent)
if __name__ == "__main__":
main()

View File

@ -1,32 +0,0 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from datetime import date, timedelta
from spotify_actions.echo import echo_album
from spotify_actions.join import join_albums
from spotify_actions.search import search_albums
from spotify_actions.temporal import temporal_released_after
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args()
today = date.today()
four_weeks = timedelta(days=28)
client = read_credentials(cmdline.credentials)
label_albums = [search_albums(client, label=l) for l in cmdline.label]
albums = join_albums(*label_albums)
albums_recent = temporal_released_after(albums, today - four_weeks)
echo_album(albums_recent)
if __name__ == "__main__":
main()

25
examples/search_tracks.py Normal file
View File

@ -0,0 +1,25 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from spotify_actions.combinator import combinator_take
from spotify_actions.echo import echo_tracks
from spotify_actions.search import search_tracks
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("query")
cmdline = parser.parse_args()
client = read_credentials(cmdline.credentials)
tracks = search_tracks(client, cmdline.query)
tracks_take = combinator_take(tracks, count=100)
echo_tracks(tracks_take)
if __name__ == "__main__":
main()

36
spotify_actions/album.py Normal file
View File

@ -0,0 +1,36 @@
"""
Selectors for working with albums
"""
from functools import partial
from typing import Iterable
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
from spotipy import Spotify
from .util import chunk, exhaust
def album_to_simplified(
client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20
) -> Iterable[SimplifiedAlbum]:
"Retrieve the actual album objects associated with the albums received from searching"
for album_chunk in chunk(albums, chunk_size):
album_ids = [a.spotify_id for a in album_chunk]
for album in client.albums(albums=album_ids)["albums"]:
yield SimplifiedAlbum(**album)
def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]:
"Convert an album stream to the tracks on that album"
def _album_tracks(album_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset))
# Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent
for album in albums:
tracks_function = partial(client.album_tracks, album_id=album.spotify_id)
for track in exhaust(tracks_function, album.tracks):
yield SimplifiedTrack(**track)

View File

@ -0,0 +1,29 @@
"""
Join objects from multiple sources into a single stream
"""
from functools import partial
from typing import Iterable, TypeVar
T = TypeVar("T") # pylint: disable=invalid-name
def combinator_take(items: Iterable[T], count: int) -> Iterable[T]:
"Retrieve the first `count` items from an iterator"
observed = 0
for item in items:
observed += 1
yield item
if observed >= count:
break
combinator_first = partial(combinator_take, count=1)
def combinator_join(*args: Iterable[T]) -> Iterable[T]:
"Join the results of many album producers by exhausting all albums from each producer"
for arg in args:
for item in arg:
yield item

View File

@ -4,10 +4,18 @@ check results before committing.
""" """
from typing import Iterable from typing import Iterable
from spotify_model.album import SearchAlbum from spotify_model import SearchAlbum, SimplifiedTrack
def echo_album(albums: Iterable[SearchAlbum]) -> None: def echo_albums(albums: Iterable[SearchAlbum]) -> None:
"Print album metadata" "Print album metadata"
for album in albums: for album in albums:
print(album.name) print(album.name)
def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None:
"Print track metadata"
for track in tracks:
print(track.name)

View File

@ -1,14 +0,0 @@
"""
Join objects from multiple sources into a single stream
"""
from typing import Iterable
from spotify_model import SearchAlbum
def join_albums(*args: Iterable[SearchAlbum]) -> Iterable[SearchAlbum]:
"Join the results of many album producers by exhausting all albums from each producer"
for arg in args:
for album in arg:
yield album

View File

@ -1,35 +1,60 @@
""" """
Utility methods for the Spotify query API Utility methods for the Spotify query API
""" """
from typing import Iterable, Optional from functools import partial
from typing import Iterable, Optional, Union
from spotify_model import Paging, SearchAlbum from spotify_model import Paging, SearchAlbum, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .util import exhaust from .util import exhaust
def _search_albums(client: Spotify, search_str: str) -> Paging: class Query:
def _search(limit: int, offset: int) -> Paging: "Query builder for Spotify search API"
return Paging(**client.search(search_str, limit=limit, offset=offset, type="album")["albums"])
return _search def __init__(self, query: Optional[str] = None, artist: Optional[str] = None, label: Optional[str] = None) -> None:
self.query = query
self.artist = artist
def search_albums( self.label = label
client: Spotify, search_str: Optional[str] = None, artist: Optional[str] = None, label: Optional[str] = None
) -> Iterable[SearchAlbum]:
"Display albums from a search string"
def __str__(self) -> str:
query_items = [ query_items = [
search_str, self.query,
f'artist:"{artist}"' if artist is not None else None, f'artist:"{self.artist}"' if self.artist is not None else None,
f'label:"{label}"' if label is not None else None, f'label:"{self.label}"' if self.label is not None else None,
] ]
query_str = " ".join([i for i in query_items if i is not None]) return " ".join([i for i in query_items if i is not None])
# pylint: disable=too-many-arguments
def _search(client: Spotify, query_str: str, query_type: str, item_key: str, limit: int, offset: int) -> Paging:
return Paging(**client.search(query_str, type=query_type, limit=limit, offset=offset)[item_key])
def search_albums(client: Spotify, query: Union[str, Query]) -> Iterable[SearchAlbum]:
"Retrieve albums from a search string"
query = query if isinstance(query, Query) else Query(query)
query_str = str(query)
if not query_str: if not query_str:
return return
for item in exhaust(_search_albums(client, query_str)): search_function = partial(_search, client, query_str, "album", "albums")
for item in exhaust(search_function):
yield SearchAlbum(**item) yield SearchAlbum(**item)
def search_tracks(client: Spotify, query: Union[str, Query]) -> Iterable[SimplifiedTrack]:
"Retrieve tracks from a search string"
query = query if isinstance(query, Query) else Query(query)
query_str = str(query)
if not query_str:
return
search_function = partial(_search, client, query_str, "track", "tracks")
for item in exhaust(search_function):
yield SimplifiedTrack(**item)

View File

@ -3,7 +3,7 @@ Utility methods for working with the Spotify API
""" """
from math import ceil from math import ceil
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, Protocol from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml import yaml
from spotify_model import Paging from spotify_model import Paging
@ -11,6 +11,8 @@ from spotipy import Spotify, SpotifyClientCredentials
DEFAULT_LIMIT = 50 DEFAULT_LIMIT = 50
T = TypeVar("T") # pylint: disable=invalid-name
def read_credentials(path: Path) -> Spotify: def read_credentials(path: Path) -> Spotify:
"Read credentials from a YAML file and construct a Spotify client" "Read credentials from a YAML file and construct a Spotify client"
@ -28,9 +30,11 @@ class Paginated(Protocol):
... ...
def exhaust(function: Paginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[str, Any]]: def exhaust(
function: Paginated, initial: Optional[Paging] = None, limit: int = DEFAULT_LIMIT
) -> Iterable[Dict[str, Any]]:
"Exhaust a function that returns a pagination object" "Exhaust a function that returns a pagination object"
response = function(limit=limit, offset=0) response = initial if initial is not None else function(limit=limit, offset=0)
for item in response.items: for item in response.items:
yield item yield item
@ -39,3 +43,20 @@ def exhaust(function: Paginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[st
response = function(limit=response.limit, offset=response.limit * i) response = function(limit=response.limit, offset=response.limit * i)
for item in response.items: for item in response.items:
yield item yield item
def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
"Split an iterable into smaller chunks"
assert size >= 1
return_items = []
for item in items:
return_items.append(item)
if len(return_items) == size:
yield return_items
return_items = []
if return_items:
yield return_items