label_playlist #1

Merged
bspeice merged 2 commits from label_playlist into master 2021-09-10 22:17:32 -04:00
9 changed files with 225 additions and 177 deletions

View File

@ -1,46 +1,103 @@
# pylint: disable=missing-module-docstring, missing-function-docstring # pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import date, datetime, timedelta
from typing import Iterable
from spotipy import Spotify
from spotify_actions.album import ( from spotify_actions.album import (
album_filter_label, album_filter_label,
album_filter_release,
album_from_ids,
album_sort_release, album_sort_release,
album_to_simplified,
album_to_tracks, album_to_tracks,
) )
from spotify_actions.combinator import combinator_join
from spotify_actions.playlist import ( from spotify_actions.playlist import (
playlist_current_user_all, playlist_current_user_all,
playlist_current_user_assure, playlist_current_user_assure,
playlist_replace, playlist_replace,
playlist_tracks,
) )
from spotify_actions.search import Query, search_albums from spotify_actions.search import Query, search_albums
from spotify_actions.track import track_unique_albums
from spotify_actions.util import read_credentials_oauth from spotify_actions.util import read_credentials_oauth
def label_playlist(client: Spotify, label_name: str, playlist_id: str) -> None:
# Given a label name, replace all songs in the provided `playlist_id` with the
# label's songs, ordered by descending release date.
albums_search = search_albums(client, Query(label=label_name))
albums_unfiltered = album_from_ids(client, albums_search)
albums_unsorted = album_filter_label(albums_unfiltered, label_name)
albums = album_sort_release(albums_unsorted, descending=True)
tracks = album_to_tracks(client, albums)
playlist_replace(client, playlist_id, tracks)
def label_recent(client: Spotify, label_playlist_ids: Iterable[str], playlist_id: str, released_after: date) -> None:
album_iterables = []
for label_playlist_id in label_playlist_ids:
# Get all albums in a playlist released after the provided date
tracks = playlist_tracks(client, [label_playlist_id])
album_ids = track_unique_albums(tracks)
albums = album_from_ids(client, album_ids)
# Because the playlists were created in descending release date order,
# `is_sorted=True` is enabled to reduce the number of API queries needed
album_iterables.append(album_filter_release(albums, released_after, is_sorted=True))
# Merge all the albums from each label playlist
recent_albums = combinator_join(*album_iterables)
recent_tracks = album_to_tracks(client, recent_albums)
# Create the recent releases playlist
playlist_replace(client, playlist_id, recent_tracks)
def main() -> None: def main() -> None:
# Intentionally 6 days - if running on a Friday, we don't want to include last Friday's releases
one_week_ago = (datetime.now().date() - timedelta(days=6)).strftime("%Y-%m-%d")
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True) parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label") parser.add_argument("-r", "--redirect-uri", required=True)
parser.add_argument("--recent-release", help='Name of the "recent releases" playlist constructed from all labels.')
parser.add_argument(
"--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago
)
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args() cmdline = parser.parse_args()
client = read_credentials_oauth( client = read_credentials_oauth(
cmdline.credentials, cmdline.credentials,
redirect_uri="https://speice.io/spotify/", redirect_uri=cmdline.redirect_uri,
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"], scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
) )
user_playlists = playlist_current_user_all(client) # Get all user playlists; we'll be iterating over this a couple times
playlists = playlist_current_user_assure(client, user_playlists, cmdline.label) user_playlists = list(playlist_current_user_all(client))
playlist = list(playlists)[0]
albums_search = search_albums(client, Query(label=cmdline.label)) # To simplify, this assumes that the label playlist name is unique for this user
albums_unfiltered = album_to_simplified(client, albums_search) def _locate_playlist(name: str) -> str:
albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label) assured = playlist_current_user_assure(client, user_playlists, name)
albums = album_sort_release(albums_unsorted, descending=True) # The `str()` wrapper is technically unnecessary, but keeps mypy happy
tracks = album_to_tracks(client, albums) return str(list(assured)[0].spotify_id)
playlist_replace(client, playlist.spotify_id, tracks) label_ids = {name: _locate_playlist(name) for name in cmdline.label}
for label_name, playlist_id in label_ids.items():
label_playlist(client, label_name, playlist_id)
if cmdline.recent_release:
recent_release = _locate_playlist(cmdline.recent_release)
released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d").date()
label_recent(client, label_ids.values(), recent_release, released_after)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,35 +0,0 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from datetime import date, timedelta
from spotify_actions.album import album_to_simplified, album_to_tracks
from spotify_actions.combinator import combinator_join
from spotify_actions.echo import echo_tracks
from spotify_actions.search import Query, search_albums
from spotify_actions.temporal import temporal_released_after
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("label", nargs="+")
cmdline = parser.parse_args()
today = date.today()
four_weeks = timedelta(days=28)
client = read_credentials(cmdline.credentials)
label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label]
albums_search = combinator_join(*label_albums_search)
albums_search_recent = temporal_released_after(albums_search, today - four_weeks)
albums_recent = album_to_simplified(client, albums_search_recent)
tracks_recent = album_to_tracks(client, albums_recent)
echo_tracks(tracks_recent)
if __name__ == "__main__":
main()

View File

@ -1,25 +0,0 @@
# pylint: disable=missing-module-docstring, missing-function-docstring
from argparse import ArgumentParser
from spotify_actions.combinator import combinator_take
from spotify_actions.echo import echo_tracks
from spotify_actions.search import search_tracks
from spotify_actions.util import read_credentials
def main() -> None:
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("query")
cmdline = parser.parse_args()
client = read_credentials(cmdline.credentials)
tracks = search_tracks(client, cmdline.query)
tracks_take = combinator_take(tracks, count=100)
echo_tracks(tracks_take)
if __name__ == "__main__":
main()

View File

@ -1,14 +1,14 @@
""" """
Selectors for working with albums Selectors for working with albums
""" """
from datetime import date
from functools import partial from functools import partial
from typing import Iterable from typing import Iterable, Union
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .temporal import temporal_convert from .util import chunk, exhaust, parse_release_date
from .util import chunk, exhaust
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]: def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
@ -18,36 +18,71 @@ def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterabl
yield album yield album
def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]: def album_filter_release(
albums: Iterable[SimplifiedAlbum], released_after: date, is_sorted: bool = False, fast_forward: bool = False
) -> Iterable[SearchAlbum]:
"""
Filter albums to those released on or after a provided date.
If `is_sorted` is True, iteration will stop once the first album released prior to
`released_after` is encountered (may be useful to avoid extra API calls when
iterating over a pre-sorted playlist).
See `temporal_convert` for more information on how album release dates are
resolved, and usage of `fast_forward`.
"""
for album in albums:
effective_release = parse_release_date(
album.release_date, album.release_date_precision, fast_forward=fast_forward
)
if effective_release >= released_after:
yield album
elif is_sorted:
return
def album_from_ids(
client: Spotify, albums: Iterable[Union[str, SearchAlbum]], chunk_size: int = 20
) -> Iterable[SimplifiedAlbum]:
"""
Given a stream of album IDs (or base album objects), retrieve the full album objects
"""
def _to_id() -> Iterable[SimplifiedAlbum]:
for album in albums:
yield album if isinstance(album, str) else album.spotify_id
for album_id_chunk in chunk(_to_id(), chunk_size):
album_chunk = client.albums(album_id_chunk)
for album in album_chunk["albums"]:
yield SimplifiedAlbum(**album)
def album_sort_release(
albums: Iterable[SearchAlbum], descending: bool = False, fast_forward: bool = False
) -> Iterable[SearchAlbum]:
"Sort albums by release date" "Sort albums by release date"
all_albums = list(albums) all_albums = list(albums)
for album in sorted(
all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending def _sort_key(album: SearchAlbum) -> date:
): return parse_release_date(album.release_date, album.release_date_precision, fast_forward=fast_forward)
for album in sorted(all_albums, key=_sort_key, reverse=descending):
yield album yield album
def album_to_simplified(
client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20
) -> Iterable[SimplifiedAlbum]:
"Retrieve the actual album objects associated with the albums received from searching"
for album_chunk in chunk(albums, chunk_size):
album_ids = [a.spotify_id for a in album_chunk]
for album in client.albums(albums=album_ids)["albums"]:
yield SimplifiedAlbum(**album)
def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]: def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]:
"Convert an album stream to the tracks on that album" "Convert an album stream to the tracks on that album"
def _album_tracks(album_id: str, limit: int, offset: int) -> Paging: def _album_tracks(album_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset)) return Paging(**client.album_tracks(album_id, limit=limit, offset=offset))
# Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent # Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent
for album in albums: for album in albums:
tracks_function = partial(_album_tracks, album_id=album.spotify_id) tracks_function = partial(_album_tracks, album_id=album.spotify_id)
for track in exhaust(tracks_function, album.tracks): for track in exhaust(tracks_function, initial=album.tracks):
yield SimplifiedTrack(**track) yield SimplifiedTrack(**track)

View File

@ -1,21 +0,0 @@
"""
Methods for printing results to console; primarily useful when developing/debugging pipelines to
check results before committing.
"""
from typing import Iterable
from spotify_model import SearchAlbum, SimplifiedTrack
def echo_albums(albums: Iterable[SearchAlbum]) -> None:
"Print album metadata"
for album in albums:
print(album.name)
def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None:
"Print track metadata"
for track in tracks:
print(track.name)

View File

@ -1,9 +1,10 @@
""" """
Selectors for querying and modifying playlists Selectors for querying and modifying playlists
""" """
from functools import partial
from typing import Iterable from typing import Iterable
from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .user import user_current from .user import user_current
@ -74,3 +75,18 @@ def playlist_replace(
continue continue
client.playlist_add_items(playlist_id, track_chunk) client.playlist_add_items(playlist_id, track_chunk)
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]:
"""
Given a playlist_id, fetch all songs currently in the playlist
"""
def _playlist_tracks(playlist_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.playlist_items(playlist_id, limit=limit, offset=offset))
for playlist_id in playlist_ids:
playlist_function = partial(_playlist_tracks, playlist_id=playlist_id)
for track in exhaust(playlist_function):
yield PlaylistTrack(**track).track

View File

@ -1,61 +0,0 @@
"""
Actions for filtering based on temporal information
"""
from calendar import monthrange
from datetime import date, datetime
from logging import getLogger
from typing import Iterable
from spotify_model import ReleaseDatePrecision, SearchAlbum
def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date:
"""
For date strings that don't have date-level precision, the date is treated as the final day within that period;
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
treated as "1981-07-31"
"""
if precision == ReleaseDatePrecision.YEAR:
actual = datetime.strptime(date_str, "%Y")
effective = date(actual.year, 12, 31)
elif precision == ReleaseDatePrecision.MONTH:
actual = datetime.strptime(date_str, "%Y-%m")
final_day = monthrange(actual.year, actual.month)[1] - 1
effective = date(actual.year, actual.month, final_day)
else:
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
return effective
def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]:
"""
Filter albums to after a specific release date.
For albums that don't have date-level precision, the release date is treated as the final day within that period;
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
treated as "1981-07-31"
"""
logger = getLogger(__name__)
for album in albums:
effective_release = temporal_convert(album.release_date, album.release_date_precision)
if effective_release >= released_after:
logger.debug(
"Including album=%s released on date=%s (prior to date=%s)",
album.name,
effective_release,
released_after,
)
yield album
else:
logger.debug(
"Skipping album=%s released on date=%s (prior to date=%s)",
album.name,
effective_release,
released_after,
)

43
spotify_actions/track.py Normal file
View File

@ -0,0 +1,43 @@
"""
Selectors for working with individual tracks
"""
from typing import Iterable, Union
from spotify_model import SimplifiedTrack, Track
from spotipy import Spotify
from .util import chunk
def track_from_ids(
client: Spotify, tracks: Union[Iterable[str], Iterable[SimplifiedTrack]], chunk_size: int = 50
) -> Iterable[Track]:
"""
Given a stream of track IDs (or simplified tracks), retrieve the full track objects
"""
def _to_id() -> Iterable[Track]:
for track in tracks:
yield track if isinstance(track, str) else track.spotify_id
for track_id_chunk in chunk(_to_id(), chunk_size):
track_chunk = client.tracks(track_id_chunk)
for track in track_chunk:
yield Track(**track)
def track_unique_albums(tracks: Iterable[Track]) -> Iterable[str]:
"""
Given a stream of tracks, yield all unique album IDs
"""
album_ids = set()
for track in tracks:
album_id = track.album.spotify_id
if album_id in album_ids:
continue
album_ids.add(album_id)
yield album_id

View File

@ -1,12 +1,14 @@
""" """
Utility methods for working with the Spotify API Utility methods for working with the Spotify API
""" """
from calendar import monthrange
from datetime import date, datetime
from math import ceil from math import ceil
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml import yaml
from spotify_model import Paging from spotify_model import Paging, ReleaseDatePrecision
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
DEFAULT_LIMIT = 50 DEFAULT_LIMIT = 50
@ -14,6 +16,13 @@ DEFAULT_LIMIT = 50
T = TypeVar("T") # pylint: disable=invalid-name T = TypeVar("T") # pylint: disable=invalid-name
def echo(elements: Iterable[T]) -> Iterable[T]:
"Echo the elements of an iterable and re-yield them"
for element in elements:
print(element)
yield element
def read_credentials_server(path: Path) -> Spotify: def read_credentials_server(path: Path) -> Spotify:
"Read credentials from a YAML file and construct a Spotify client using the server workflow" "Read credentials from a YAML file and construct a Spotify client using the server workflow"
@ -71,3 +80,33 @@ def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
if return_items: if return_items:
yield return_items yield return_items
def parse_release_date(date_str: str, precision: ReleaseDatePrecision, fast_forward: bool = False) -> date:
"""
Parse a date string with provided precision to a concrete date.
`fast_forward` controls how precision is resolved:
- If `False` (default), dates are assumed to be at the start of the period
(e.g. "1970" becomes "1970-01-01", "1970-08" becomes "1907-08-01")
- If `True`, dates are "fast-forwarded" to the end of the given period
(e.g. "1970" becomes "1970-12-31", "1970-08" becomes "1970-08-31")
"""
if precision == ReleaseDatePrecision.YEAR:
effective = datetime.strptime(date_str, "%Y").date()
if fast_forward:
effective = date(effective.year, 12, 31)
elif precision == ReleaseDatePrecision.MONTH:
effective = datetime.strptime(date_str, "%Y-%m").date()
if fast_forward:
final_day = monthrange(effective.year, effective.month)[1] - 1
effective = date(effective.year, effective.month, final_day)
else:
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
return effective