Reorganization and features to support label_playlist

First practical-scale usecase!
This commit is contained in:
Bradlee Speice 2021-09-10 00:38:10 -04:00
parent b8ab83ae63
commit ce8b8a7e00
8 changed files with 220 additions and 155 deletions

View File

@ -1,46 +1,106 @@
# pylint: disable=missing-module-docstring, missing-function-docstring # pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import date, datetime, timedelta
from typing import Dict, Iterable
from spotify_model import SimplifiedAlbum
from spotipy import Spotify
from spotify_actions.album import ( from spotify_actions.album import (
album_filter_label, album_filter_label,
album_filter_release,
album_from_ids,
album_sort_release, album_sort_release,
album_to_simplified,
album_to_tracks, album_to_tracks,
) )
from spotify_actions.combinator import combinator_join
from spotify_actions.playlist import ( from spotify_actions.playlist import (
playlist_current_user_all, playlist_current_user_all,
playlist_current_user_assure, playlist_current_user_assure,
playlist_replace, playlist_replace,
playlist_tracks,
) )
from spotify_actions.search import Query, search_albums from spotify_actions.search import Query, search_albums
from spotify_actions.track import track_unique_albums
from spotify_actions.util import read_credentials_oauth from spotify_actions.util import read_credentials_oauth
def label_playlist(client: Spotify, label_name: str, playlist_id: str) -> None:
# Given a label name, replace all songs in the provided `playlist_id` with the
# label's songs, ordered by descending release date.
albums_search = search_albums(client, Query(label=label_name))
albums_unfiltered = album_from_ids(client, albums_search)
albums_unsorted = album_filter_label(albums_unfiltered, label_name)
albums = album_sort_release(albums_unsorted, descending=True)
tracks = album_to_tracks(client, albums)
playlist_replace(client, playlist_id, tracks)
def label_recent(client: Spotify, playlist_id: str, released_after: date) -> Iterable[SimplifiedAlbum]:
# Get all albums in a playlist released after the provided date
tracks = playlist_tracks(client, [playlist_id])
albums = track_unique_albums(tracks)
# Because the playlists were created in descending release date order,
# `is_sorted=True` is enabled to reduce the number of API queries needed
return album_filter_release(albums, released_after, is_sorted=True)
def run(client: Spotify, recent_releases_id: str, label_ids: Dict[str, str], released_after: date) -> None:
# Create the individual label playlists
for label_name, playlist_id in label_ids.items():
label_playlist(client, label_name, playlist_id)
# Get albums from the playlists we just created
album_iterables = [label_recent(client, playlist_id, released_after) for _, playlist_id in label_ids.items()]
# Merge all the albums from each label playlist
recent_albums = combinator_join(*album_iterables)
recent_tracks = album_to_tracks(client, recent_albums)
# Create the recent releases playlist
playlist_replace(client, recent_releases_id, recent_tracks)
def main() -> None: def main() -> None:
one_week_ago = (datetime.now().date() - timedelta(days=7)).strftime("%Y-%m-%d")
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True) parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label") parser.add_argument("-r", "--redirect-uri", required=True)
parser.add_argument(
"-d", "--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago
)
parser.add_argument("recent_release", help='Name of the "recent releases" playlist constructed from all labels')
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args() cmdline = parser.parse_args()
client = read_credentials_oauth( client = read_credentials_oauth(
cmdline.credentials, cmdline.credentials,
redirect_uri="https://speice.io/spotify/", redirect_uri=cmdline.redirect_uri,
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"], scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
) )
user_playlists = playlist_current_user_all(client) released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d")
playlists = playlist_current_user_assure(client, user_playlists, cmdline.label)
playlist = list(playlists)[0]
albums_search = search_albums(client, Query(label=cmdline.label)) # Get all user playlists; we'll be iterating over this a couple times
albums_unfiltered = album_to_simplified(client, albums_search) user_playlists = list(playlist_current_user_all(client))
albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label)
albums = album_sort_release(albums_unsorted, descending=True)
tracks = album_to_tracks(client, albums)
playlist_replace(client, playlist.spotify_id, tracks) # To simplify, this assumes that the label playlist name is unique for this user
def _locate_playlist(name: str) -> str:
assured = playlist_current_user_assure(client, user_playlists, name)
# The `str()` wrapper is technically unnecessary, but keeps mypy happy
return str(list(assured)[0].spotify_id)
recent_releases = _locate_playlist(cmdline.recent_release)
label_playlists = {label: _locate_playlist(label) for label in cmdline.label}
run(client, recent_releases, label_playlists, released_after)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,35 +0,0 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from datetime import date, timedelta
from spotify_actions.album import album_to_simplified, album_to_tracks
from spotify_actions.combinator import combinator_join
from spotify_actions.echo import echo_tracks
from spotify_actions.search import Query, search_albums
from spotify_actions.temporal import temporal_released_after
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args()
today = date.today()
four_weeks = timedelta(days=28)
client = read_credentials(cmdline.credentials)
label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label]
albums_search = combinator_join(*label_albums_search)
albums_search_recent = temporal_released_after(albums_search, today - four_weeks)
albums_recent = album_to_simplified(client, albums_search_recent)
tracks_recent = album_to_tracks(client, albums_recent)
echo_tracks(tracks_recent)
if __name__ == "__main__":
main()

View File

@ -1,25 +0,0 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from spotify_actions.combinator import combinator_take
from spotify_actions.echo import echo_tracks
from spotify_actions.search import search_tracks
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("query")
cmdline = parser.parse_args()
client = read_credentials(cmdline.credentials)
tracks = search_tracks(client, cmdline.query)
tracks_take = combinator_take(tracks, count=100)
echo_tracks(tracks_take)
if __name__ == "__main__":
main()

View File

@ -1,14 +1,14 @@
""" """
Selectors for working with albums Selectors for working with albums
""" """
from datetime import date
from functools import partial from functools import partial
from typing import Iterable from typing import Iterable, Union
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .temporal import temporal_convert from .util import chunk, exhaust, parse_release_date
from .util import chunk, exhaust
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]: def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
@ -18,36 +18,71 @@ def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterabl
yield album yield album
def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]: def album_filter_release(
albums: Iterable[SimplifiedAlbum], released_after: date, is_sorted: bool = False, fast_forward: bool = False
) -> Iterable[SearchAlbum]:
"""
Filter albums to those released on or after a provided date.
If `is_sorted` is True, iteration will stop once the first album released prior to
`released_after` is encountered (may be useful to avoid extra API calls when
iterating over a pre-sorted playlist).
See `temporal_convert` for more information on how album release dates are
resolved, and usage of `fast_forward`.
"""
for album in albums:
effective_release = parse_release_date(
album.release_date, album.release_date_precision, fast_forward=fast_forward
)
if effective_release >= released_after:
yield album
elif is_sorted:
return
def album_from_ids(
client: Spotify, albums: Iterable[Union[str, SearchAlbum]], chunk_size: int = 20
) -> Iterable[SimplifiedAlbum]:
"""
Given a stream of album IDs (or base album objects), retrieve the full album objects
"""
def _to_id() -> Iterable[SimplifiedAlbum]:
for album in albums:
yield album if isinstance(album, str) else album.spotify_id
for album_id_chunk in chunk(_to_id(), chunk_size):
album_chunk = client.albums(album_id_chunk)
for album in album_chunk:
yield SimplifiedAlbum(**album)
def album_sort_release(
albums: Iterable[SearchAlbum], descending: bool = False, fast_forward: bool = False
) -> Iterable[SearchAlbum]:
"Sort albums by release date" "Sort albums by release date"
all_albums = list(albums) all_albums = list(albums)
for album in sorted(
all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending def _sort_key(album: SearchAlbum) -> date:
): return parse_release_date(album.release_date, album.release_date_precision, fast_forward=fast_forward)
for album in sorted(all_albums, key=_sort_key, reverse=descending):
yield album yield album
def album_to_simplified(
client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20
) -> Iterable[SimplifiedAlbum]:
"Retrieve the actual album objects associated with the albums received from searching"
for album_chunk in chunk(albums, chunk_size):
album_ids = [a.spotify_id for a in album_chunk]
for album in client.albums(albums=album_ids)["albums"]:
yield SimplifiedAlbum(**album)
def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]: def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]:
"Convert an album stream to the tracks on that album" "Convert an album stream to the tracks on that album"
def _album_tracks(album_id: str, limit: int, offset: int) -> Paging: def _album_tracks(album_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset)) return Paging(**client.album_tracks(album_id, limit=limit, offset=offset))
# Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent # Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent
for album in albums: for album in albums:
tracks_function = partial(_album_tracks, album_id=album.spotify_id) tracks_function = partial(_album_tracks, album_id=album.spotify_id)
for track in exhaust(tracks_function, album.tracks): for track in exhaust(tracks_function, initial=album.tracks):
yield SimplifiedTrack(**track) yield SimplifiedTrack(**track)

View File

@ -1,6 +1,7 @@
""" """
Selectors for querying and modifying playlists Selectors for querying and modifying playlists
""" """
from functools import partial
from typing import Iterable from typing import Iterable
from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack
@ -74,3 +75,18 @@ def playlist_replace(
continue continue
client.playlist_add_items(playlist_id, track_chunk) client.playlist_add_items(playlist_id, track_chunk)
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]:
"""
Given a playlist_id, fetch all songs currently in the playlist
"""
def _playlist_tracks(playlist_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.playlist_items(playlist_id, limit=limit, offset=offset))
for playlist_id in playlist_ids:
playlist_function = partial(_playlist_tracks, playlist_id=playlist_id)
for track in exhaust(playlist_function):
yield SimplifiedTrack(**track)

View File

@ -1,61 +0,0 @@
"""
Actions for filtering based on temporal information
"""
from calendar import monthrange
from datetime import date, datetime
from logging import getLogger
from typing import Iterable
from spotify_model import ReleaseDatePrecision, SearchAlbum
def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date:
"""
For date strings that don't have date-level precision, the date is treated as the final day within that period;
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
treated as "1981-07-31"
"""
if precision == ReleaseDatePrecision.YEAR:
actual = datetime.strptime(date_str, "%Y")
effective = date(actual.year, 12, 31)
elif precision == ReleaseDatePrecision.MONTH:
actual = datetime.strptime(date_str, "%Y-%m")
final_day = monthrange(actual.year, actual.month)[1] - 1
effective = date(actual.year, actual.month, final_day)
else:
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
return effective
def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]:
"""
Filter albums to after a specific release date.
For albums that don't have date-level precision, the release date is treated as the final day within that period;
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
treated as "1981-07-31"
"""
logger = getLogger(__name__)
for album in albums:
effective_release = temporal_convert(album.release_date, album.release_date_precision)
if effective_release >= released_after:
logger.debug(
"Including album=%s released on date=%s (prior to date=%s)",
album.name,
effective_release,
released_after,
)
yield album
else:
logger.debug(
"Skipping album=%s released on date=%s (prior to date=%s)",
album.name,
effective_release,
released_after,
)

43
spotify_actions/track.py Normal file
View File

@ -0,0 +1,43 @@
"""
Selectors for working with individual tracks
"""
from typing import Iterable, Union
from spotify_model import SimplifiedTrack, Track
from spotipy import Spotify
from .util import chunk
def track_from_ids(
client: Spotify, tracks: Union[Iterable[str], Iterable[SimplifiedTrack]], chunk_size: int = 50
) -> Iterable[Track]:
"""
Given a stream of track IDs (or simplified tracks), retrieve the full track objects
"""
def _to_id() -> Iterable[Track]:
for track in tracks:
yield track if isinstance(track, str) else track.spotify_id
for track_id_chunk in chunk(_to_id(), chunk_size):
track_chunk = client.tracks(track_id_chunk)
for track in track_chunk:
yield Track(**track)
def track_unique_albums(tracks: Iterable[Track]) -> Iterable[str]:
"""
Given a stream of tracks, yield all unique album IDs
"""
album_ids = set()
for track in tracks:
album_id = track.album.spotify_id
if album_id in album_ids:
continue
album_ids.add(album_id)
yield album_id

View File

@ -1,12 +1,14 @@
""" """
Utility methods for working with the Spotify API Utility methods for working with the Spotify API
""" """
from calendar import monthrange
from datetime import date, datetime
from math import ceil from math import ceil
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml import yaml
from spotify_model import Paging from spotify_model import Paging, ReleaseDatePrecision
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
DEFAULT_LIMIT = 50 DEFAULT_LIMIT = 50
@ -71,3 +73,33 @@ def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
if return_items: if return_items:
yield return_items yield return_items
def parse_release_date(date_str: str, precision: ReleaseDatePrecision, fast_forward: bool = False) -> date:
"""
Parse a date string with provided precision to a concrete date.
`fast_forward` controls how precision is resolved:
- If `False` (default), dates are assumed to be at the start of the period
(e.g. "1970" becomes "1970-01-01", "1970-08" becomes "1907-08-01")
- If `True`, dates are "fast-forwarded" to the end of the given period
(e.g. "1970" becomes "1970-12-31", "1970-08" becomes "1970-08-31")
"""
if precision == ReleaseDatePrecision.YEAR:
effective = datetime.strptime(date_str, "%Y").date()
if fast_forward:
effective = date(effective.year, 12, 31)
elif precision == ReleaseDatePrecision.MONTH:
effective = datetime.strptime(date_str, "%Y-%m").date()
if fast_forward:
final_day = monthrange(effective.year, effective.month)[1] - 1
effective = date(effective.year, effective.month, final_day)
else:
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
return effective