Merge pull request 'label_playlist' (#1) from label_playlist into master
This commit is contained in:
commit
2b25e01ac5
@ -1,46 +1,103 @@
|
|||||||
# pylint: disable=missing-module-docstring, missing-function-docstring
|
# pylint: disable=missing-module-docstring, missing-function-docstring
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
|
from datetime import date, datetime, timedelta
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
from spotipy import Spotify
|
||||||
|
|
||||||
from spotify_actions.album import (
|
from spotify_actions.album import (
|
||||||
album_filter_label,
|
album_filter_label,
|
||||||
|
album_filter_release,
|
||||||
|
album_from_ids,
|
||||||
album_sort_release,
|
album_sort_release,
|
||||||
album_to_simplified,
|
|
||||||
album_to_tracks,
|
album_to_tracks,
|
||||||
)
|
)
|
||||||
|
from spotify_actions.combinator import combinator_join
|
||||||
from spotify_actions.playlist import (
|
from spotify_actions.playlist import (
|
||||||
playlist_current_user_all,
|
playlist_current_user_all,
|
||||||
playlist_current_user_assure,
|
playlist_current_user_assure,
|
||||||
playlist_replace,
|
playlist_replace,
|
||||||
|
playlist_tracks,
|
||||||
)
|
)
|
||||||
from spotify_actions.search import Query, search_albums
|
from spotify_actions.search import Query, search_albums
|
||||||
|
from spotify_actions.track import track_unique_albums
|
||||||
from spotify_actions.util import read_credentials_oauth
|
from spotify_actions.util import read_credentials_oauth
|
||||||
|
|
||||||
|
|
||||||
|
def label_playlist(client: Spotify, label_name: str, playlist_id: str) -> None:
|
||||||
|
# Given a label name, replace all songs in the provided `playlist_id` with the
|
||||||
|
# label's songs, ordered by descending release date.
|
||||||
|
|
||||||
|
albums_search = search_albums(client, Query(label=label_name))
|
||||||
|
albums_unfiltered = album_from_ids(client, albums_search)
|
||||||
|
albums_unsorted = album_filter_label(albums_unfiltered, label_name)
|
||||||
|
albums = album_sort_release(albums_unsorted, descending=True)
|
||||||
|
tracks = album_to_tracks(client, albums)
|
||||||
|
|
||||||
|
playlist_replace(client, playlist_id, tracks)
|
||||||
|
|
||||||
|
|
||||||
|
def label_recent(client: Spotify, label_playlist_ids: Iterable[str], playlist_id: str, released_after: date) -> None:
|
||||||
|
|
||||||
|
album_iterables = []
|
||||||
|
for label_playlist_id in label_playlist_ids:
|
||||||
|
# Get all albums in a playlist released after the provided date
|
||||||
|
tracks = playlist_tracks(client, [label_playlist_id])
|
||||||
|
album_ids = track_unique_albums(tracks)
|
||||||
|
albums = album_from_ids(client, album_ids)
|
||||||
|
|
||||||
|
# Because the playlists were created in descending release date order,
|
||||||
|
# `is_sorted=True` is enabled to reduce the number of API queries needed
|
||||||
|
album_iterables.append(album_filter_release(albums, released_after, is_sorted=True))
|
||||||
|
|
||||||
|
# Merge all the albums from each label playlist
|
||||||
|
recent_albums = combinator_join(*album_iterables)
|
||||||
|
recent_tracks = album_to_tracks(client, recent_albums)
|
||||||
|
|
||||||
|
# Create the recent releases playlist
|
||||||
|
playlist_replace(client, playlist_id, recent_tracks)
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
# Intentionally 6 days - if running on a Friday, we don't want to include last Friday's releases
|
||||||
|
one_week_ago = (datetime.now().date() - timedelta(days=6)).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
parser = ArgumentParser()
|
parser = ArgumentParser()
|
||||||
parser.add_argument("-c", "--credentials", required=True)
|
parser.add_argument("-c", "--credentials", required=True)
|
||||||
parser.add_argument("label")
|
parser.add_argument("-r", "--redirect-uri", required=True)
|
||||||
|
parser.add_argument("--recent-release", help='Name of the "recent releases" playlist constructed from all labels.')
|
||||||
|
parser.add_argument(
|
||||||
|
"--released-after", help="YYYY-MM-DD date that albums must be released after", default=one_week_ago
|
||||||
|
)
|
||||||
|
parser.add_argument("label", nargs="+")
|
||||||
|
|
||||||
cmdline = parser.parse_args()
|
cmdline = parser.parse_args()
|
||||||
|
|
||||||
client = read_credentials_oauth(
|
client = read_credentials_oauth(
|
||||||
cmdline.credentials,
|
cmdline.credentials,
|
||||||
redirect_uri="https://speice.io/spotify/",
|
redirect_uri=cmdline.redirect_uri,
|
||||||
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
|
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
user_playlists = playlist_current_user_all(client)
|
# Get all user playlists; we'll be iterating over this a couple times
|
||||||
playlists = playlist_current_user_assure(client, user_playlists, cmdline.label)
|
user_playlists = list(playlist_current_user_all(client))
|
||||||
playlist = list(playlists)[0]
|
|
||||||
|
|
||||||
albums_search = search_albums(client, Query(label=cmdline.label))
|
# To simplify, this assumes that the label playlist name is unique for this user
|
||||||
albums_unfiltered = album_to_simplified(client, albums_search)
|
def _locate_playlist(name: str) -> str:
|
||||||
albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label)
|
assured = playlist_current_user_assure(client, user_playlists, name)
|
||||||
albums = album_sort_release(albums_unsorted, descending=True)
|
# The `str()` wrapper is technically unnecessary, but keeps mypy happy
|
||||||
tracks = album_to_tracks(client, albums)
|
return str(list(assured)[0].spotify_id)
|
||||||
|
|
||||||
playlist_replace(client, playlist.spotify_id, tracks)
|
label_ids = {name: _locate_playlist(name) for name in cmdline.label}
|
||||||
|
|
||||||
|
for label_name, playlist_id in label_ids.items():
|
||||||
|
label_playlist(client, label_name, playlist_id)
|
||||||
|
|
||||||
|
if cmdline.recent_release:
|
||||||
|
recent_release = _locate_playlist(cmdline.recent_release)
|
||||||
|
released_after = datetime.strptime(cmdline.released_after, "%Y-%m-%d").date()
|
||||||
|
label_recent(client, label_ids.values(), recent_release, released_after)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
# pylint: disable=missing-module-docstring, missing-function-docstring
|
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
|
||||||
from datetime import date, timedelta
|
|
||||||
|
|
||||||
from spotify_actions.album import album_to_simplified, album_to_tracks
|
|
||||||
from spotify_actions.combinator import combinator_join
|
|
||||||
from spotify_actions.echo import echo_tracks
|
|
||||||
from spotify_actions.search import Query, search_albums
|
|
||||||
from spotify_actions.temporal import temporal_released_after
|
|
||||||
from spotify_actions.util import read_credentials
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
parser = ArgumentParser()
|
|
||||||
parser.add_argument("-c", "--credentials", required=True)
|
|
||||||
parser.add_argument("label", nargs="+")
|
|
||||||
|
|
||||||
cmdline = parser.parse_args()
|
|
||||||
|
|
||||||
today = date.today()
|
|
||||||
four_weeks = timedelta(days=28)
|
|
||||||
|
|
||||||
client = read_credentials(cmdline.credentials)
|
|
||||||
|
|
||||||
label_albums_search = [search_albums(client, Query(label=l)) for l in cmdline.label]
|
|
||||||
albums_search = combinator_join(*label_albums_search)
|
|
||||||
albums_search_recent = temporal_released_after(albums_search, today - four_weeks)
|
|
||||||
albums_recent = album_to_simplified(client, albums_search_recent)
|
|
||||||
tracks_recent = album_to_tracks(client, albums_recent)
|
|
||||||
echo_tracks(tracks_recent)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,25 +0,0 @@
|
|||||||
# pylint: disable=missing-module-docstring, missing-function-docstring
|
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
|
||||||
|
|
||||||
from spotify_actions.combinator import combinator_take
|
|
||||||
from spotify_actions.echo import echo_tracks
|
|
||||||
from spotify_actions.search import search_tracks
|
|
||||||
from spotify_actions.util import read_credentials
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
parser = ArgumentParser()
|
|
||||||
parser.add_argument("-c", "--credentials", required=True)
|
|
||||||
parser.add_argument("query")
|
|
||||||
|
|
||||||
cmdline = parser.parse_args()
|
|
||||||
|
|
||||||
client = read_credentials(cmdline.credentials)
|
|
||||||
tracks = search_tracks(client, cmdline.query)
|
|
||||||
tracks_take = combinator_take(tracks, count=100)
|
|
||||||
echo_tracks(tracks_take)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,14 +1,14 @@
|
|||||||
"""
|
"""
|
||||||
Selectors for working with albums
|
Selectors for working with albums
|
||||||
"""
|
"""
|
||||||
|
from datetime import date
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import Iterable
|
from typing import Iterable, Union
|
||||||
|
|
||||||
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
|
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
|
||||||
from spotipy import Spotify
|
from spotipy import Spotify
|
||||||
|
|
||||||
from .temporal import temporal_convert
|
from .util import chunk, exhaust, parse_release_date
|
||||||
from .util import chunk, exhaust
|
|
||||||
|
|
||||||
|
|
||||||
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
|
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
|
||||||
@ -18,36 +18,71 @@ def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterabl
|
|||||||
yield album
|
yield album
|
||||||
|
|
||||||
|
|
||||||
def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]:
|
def album_filter_release(
|
||||||
|
albums: Iterable[SimplifiedAlbum], released_after: date, is_sorted: bool = False, fast_forward: bool = False
|
||||||
|
) -> Iterable[SearchAlbum]:
|
||||||
|
"""
|
||||||
|
Filter albums to those released on or after a provided date.
|
||||||
|
|
||||||
|
If `is_sorted` is True, iteration will stop once the first album released prior to
|
||||||
|
`released_after` is encountered (may be useful to avoid extra API calls when
|
||||||
|
iterating over a pre-sorted playlist).
|
||||||
|
|
||||||
|
See `temporal_convert` for more information on how album release dates are
|
||||||
|
resolved, and usage of `fast_forward`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for album in albums:
|
||||||
|
effective_release = parse_release_date(
|
||||||
|
album.release_date, album.release_date_precision, fast_forward=fast_forward
|
||||||
|
)
|
||||||
|
|
||||||
|
if effective_release >= released_after:
|
||||||
|
yield album
|
||||||
|
elif is_sorted:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def album_from_ids(
|
||||||
|
client: Spotify, albums: Iterable[Union[str, SearchAlbum]], chunk_size: int = 20
|
||||||
|
) -> Iterable[SimplifiedAlbum]:
|
||||||
|
"""
|
||||||
|
Given a stream of album IDs (or base album objects), retrieve the full album objects
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _to_id() -> Iterable[SimplifiedAlbum]:
|
||||||
|
for album in albums:
|
||||||
|
yield album if isinstance(album, str) else album.spotify_id
|
||||||
|
|
||||||
|
for album_id_chunk in chunk(_to_id(), chunk_size):
|
||||||
|
album_chunk = client.albums(album_id_chunk)
|
||||||
|
|
||||||
|
for album in album_chunk["albums"]:
|
||||||
|
yield SimplifiedAlbum(**album)
|
||||||
|
|
||||||
|
|
||||||
|
def album_sort_release(
|
||||||
|
albums: Iterable[SearchAlbum], descending: bool = False, fast_forward: bool = False
|
||||||
|
) -> Iterable[SearchAlbum]:
|
||||||
"Sort albums by release date"
|
"Sort albums by release date"
|
||||||
all_albums = list(albums)
|
all_albums = list(albums)
|
||||||
for album in sorted(
|
|
||||||
all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending
|
def _sort_key(album: SearchAlbum) -> date:
|
||||||
):
|
return parse_release_date(album.release_date, album.release_date_precision, fast_forward=fast_forward)
|
||||||
|
|
||||||
|
for album in sorted(all_albums, key=_sort_key, reverse=descending):
|
||||||
yield album
|
yield album
|
||||||
|
|
||||||
|
|
||||||
def album_to_simplified(
|
|
||||||
client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20
|
|
||||||
) -> Iterable[SimplifiedAlbum]:
|
|
||||||
"Retrieve the actual album objects associated with the albums received from searching"
|
|
||||||
|
|
||||||
for album_chunk in chunk(albums, chunk_size):
|
|
||||||
album_ids = [a.spotify_id for a in album_chunk]
|
|
||||||
|
|
||||||
for album in client.albums(albums=album_ids)["albums"]:
|
|
||||||
yield SimplifiedAlbum(**album)
|
|
||||||
|
|
||||||
|
|
||||||
def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]:
|
def album_to_tracks(client: Spotify, albums: Iterable[SimplifiedAlbum]) -> Iterable[SimplifiedTrack]:
|
||||||
"Convert an album stream to the tracks on that album"
|
"Convert an album stream to the tracks on that album"
|
||||||
|
|
||||||
def _album_tracks(album_id: str, limit: int, offset: int) -> Paging:
|
def _album_tracks(album_id: str, limit: int, offset: int) -> Paging:
|
||||||
return Paging(**client.album_tracks(album_id=album_id, limit=limit, offset=offset))
|
return Paging(**client.album_tracks(album_id, limit=limit, offset=offset))
|
||||||
|
|
||||||
# Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent
|
# Because most album tracklists don't need to use paging, it's expected that API calls are relatively infrequent
|
||||||
for album in albums:
|
for album in albums:
|
||||||
tracks_function = partial(_album_tracks, album_id=album.spotify_id)
|
tracks_function = partial(_album_tracks, album_id=album.spotify_id)
|
||||||
|
|
||||||
for track in exhaust(tracks_function, album.tracks):
|
for track in exhaust(tracks_function, initial=album.tracks):
|
||||||
yield SimplifiedTrack(**track)
|
yield SimplifiedTrack(**track)
|
||||||
|
@ -1,21 +0,0 @@
|
|||||||
"""
|
|
||||||
Methods for printing results to console; primarily useful when developing/debugging pipelines to
|
|
||||||
check results before committing.
|
|
||||||
"""
|
|
||||||
from typing import Iterable
|
|
||||||
|
|
||||||
from spotify_model import SearchAlbum, SimplifiedTrack
|
|
||||||
|
|
||||||
|
|
||||||
def echo_albums(albums: Iterable[SearchAlbum]) -> None:
|
|
||||||
"Print album metadata"
|
|
||||||
|
|
||||||
for album in albums:
|
|
||||||
print(album.name)
|
|
||||||
|
|
||||||
|
|
||||||
def echo_tracks(tracks: Iterable[SimplifiedTrack]) -> None:
|
|
||||||
"Print track metadata"
|
|
||||||
|
|
||||||
for track in tracks:
|
|
||||||
print(track.name)
|
|
@ -1,9 +1,10 @@
|
|||||||
"""
|
"""
|
||||||
Selectors for querying and modifying playlists
|
Selectors for querying and modifying playlists
|
||||||
"""
|
"""
|
||||||
|
from functools import partial
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
|
|
||||||
from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack
|
from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack
|
||||||
from spotipy import Spotify
|
from spotipy import Spotify
|
||||||
|
|
||||||
from .user import user_current
|
from .user import user_current
|
||||||
@ -74,3 +75,18 @@ def playlist_replace(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
client.playlist_add_items(playlist_id, track_chunk)
|
client.playlist_add_items(playlist_id, track_chunk)
|
||||||
|
|
||||||
|
|
||||||
|
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]:
|
||||||
|
"""
|
||||||
|
Given a playlist_id, fetch all songs currently in the playlist
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _playlist_tracks(playlist_id: str, limit: int, offset: int) -> Paging:
|
||||||
|
return Paging(**client.playlist_items(playlist_id, limit=limit, offset=offset))
|
||||||
|
|
||||||
|
for playlist_id in playlist_ids:
|
||||||
|
playlist_function = partial(_playlist_tracks, playlist_id=playlist_id)
|
||||||
|
|
||||||
|
for track in exhaust(playlist_function):
|
||||||
|
yield PlaylistTrack(**track).track
|
||||||
|
@ -1,61 +0,0 @@
|
|||||||
"""
|
|
||||||
Actions for filtering based on temporal information
|
|
||||||
"""
|
|
||||||
|
|
||||||
from calendar import monthrange
|
|
||||||
from datetime import date, datetime
|
|
||||||
from logging import getLogger
|
|
||||||
from typing import Iterable
|
|
||||||
|
|
||||||
from spotify_model import ReleaseDatePrecision, SearchAlbum
|
|
||||||
|
|
||||||
|
|
||||||
def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date:
|
|
||||||
"""
|
|
||||||
For date strings that don't have date-level precision, the date is treated as the final day within that period;
|
|
||||||
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
|
|
||||||
treated as "1981-07-31"
|
|
||||||
"""
|
|
||||||
|
|
||||||
if precision == ReleaseDatePrecision.YEAR:
|
|
||||||
actual = datetime.strptime(date_str, "%Y")
|
|
||||||
effective = date(actual.year, 12, 31)
|
|
||||||
elif precision == ReleaseDatePrecision.MONTH:
|
|
||||||
actual = datetime.strptime(date_str, "%Y-%m")
|
|
||||||
final_day = monthrange(actual.year, actual.month)[1] - 1
|
|
||||||
effective = date(actual.year, actual.month, final_day)
|
|
||||||
else:
|
|
||||||
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
|
|
||||||
|
|
||||||
return effective
|
|
||||||
|
|
||||||
|
|
||||||
def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]:
|
|
||||||
"""
|
|
||||||
Filter albums to after a specific release date.
|
|
||||||
|
|
||||||
For albums that don't have date-level precision, the release date is treated as the final day within that period;
|
|
||||||
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
|
|
||||||
treated as "1981-07-31"
|
|
||||||
"""
|
|
||||||
|
|
||||||
logger = getLogger(__name__)
|
|
||||||
|
|
||||||
for album in albums:
|
|
||||||
effective_release = temporal_convert(album.release_date, album.release_date_precision)
|
|
||||||
|
|
||||||
if effective_release >= released_after:
|
|
||||||
logger.debug(
|
|
||||||
"Including album=%s released on date=%s (prior to date=%s)",
|
|
||||||
album.name,
|
|
||||||
effective_release,
|
|
||||||
released_after,
|
|
||||||
)
|
|
||||||
yield album
|
|
||||||
else:
|
|
||||||
logger.debug(
|
|
||||||
"Skipping album=%s released on date=%s (prior to date=%s)",
|
|
||||||
album.name,
|
|
||||||
effective_release,
|
|
||||||
released_after,
|
|
||||||
)
|
|
43
spotify_actions/track.py
Normal file
43
spotify_actions/track.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
"""
|
||||||
|
Selectors for working with individual tracks
|
||||||
|
"""
|
||||||
|
from typing import Iterable, Union
|
||||||
|
|
||||||
|
from spotify_model import SimplifiedTrack, Track
|
||||||
|
from spotipy import Spotify
|
||||||
|
|
||||||
|
from .util import chunk
|
||||||
|
|
||||||
|
|
||||||
|
def track_from_ids(
|
||||||
|
client: Spotify, tracks: Union[Iterable[str], Iterable[SimplifiedTrack]], chunk_size: int = 50
|
||||||
|
) -> Iterable[Track]:
|
||||||
|
"""
|
||||||
|
Given a stream of track IDs (or simplified tracks), retrieve the full track objects
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _to_id() -> Iterable[Track]:
|
||||||
|
for track in tracks:
|
||||||
|
yield track if isinstance(track, str) else track.spotify_id
|
||||||
|
|
||||||
|
for track_id_chunk in chunk(_to_id(), chunk_size):
|
||||||
|
track_chunk = client.tracks(track_id_chunk)
|
||||||
|
|
||||||
|
for track in track_chunk:
|
||||||
|
yield Track(**track)
|
||||||
|
|
||||||
|
|
||||||
|
def track_unique_albums(tracks: Iterable[Track]) -> Iterable[str]:
|
||||||
|
"""
|
||||||
|
Given a stream of tracks, yield all unique album IDs
|
||||||
|
"""
|
||||||
|
|
||||||
|
album_ids = set()
|
||||||
|
|
||||||
|
for track in tracks:
|
||||||
|
album_id = track.album.spotify_id
|
||||||
|
if album_id in album_ids:
|
||||||
|
continue
|
||||||
|
|
||||||
|
album_ids.add(album_id)
|
||||||
|
yield album_id
|
@ -1,12 +1,14 @@
|
|||||||
"""
|
"""
|
||||||
Utility methods for working with the Spotify API
|
Utility methods for working with the Spotify API
|
||||||
"""
|
"""
|
||||||
|
from calendar import monthrange
|
||||||
|
from datetime import date, datetime
|
||||||
from math import ceil
|
from math import ceil
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
|
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from spotify_model import Paging
|
from spotify_model import Paging, ReleaseDatePrecision
|
||||||
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
|
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
|
||||||
|
|
||||||
DEFAULT_LIMIT = 50
|
DEFAULT_LIMIT = 50
|
||||||
@ -14,6 +16,13 @@ DEFAULT_LIMIT = 50
|
|||||||
T = TypeVar("T") # pylint: disable=invalid-name
|
T = TypeVar("T") # pylint: disable=invalid-name
|
||||||
|
|
||||||
|
|
||||||
|
def echo(elements: Iterable[T]) -> Iterable[T]:
|
||||||
|
"Echo the elements of an iterable and re-yield them"
|
||||||
|
for element in elements:
|
||||||
|
print(element)
|
||||||
|
yield element
|
||||||
|
|
||||||
|
|
||||||
def read_credentials_server(path: Path) -> Spotify:
|
def read_credentials_server(path: Path) -> Spotify:
|
||||||
"Read credentials from a YAML file and construct a Spotify client using the server workflow"
|
"Read credentials from a YAML file and construct a Spotify client using the server workflow"
|
||||||
|
|
||||||
@ -71,3 +80,33 @@ def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
|
|||||||
|
|
||||||
if return_items:
|
if return_items:
|
||||||
yield return_items
|
yield return_items
|
||||||
|
|
||||||
|
|
||||||
|
def parse_release_date(date_str: str, precision: ReleaseDatePrecision, fast_forward: bool = False) -> date:
|
||||||
|
"""
|
||||||
|
Parse a date string with provided precision to a concrete date.
|
||||||
|
|
||||||
|
`fast_forward` controls how precision is resolved:
|
||||||
|
- If `False` (default), dates are assumed to be at the start of the period
|
||||||
|
(e.g. "1970" becomes "1970-01-01", "1970-08" becomes "1907-08-01")
|
||||||
|
- If `True`, dates are "fast-forwarded" to the end of the given period
|
||||||
|
(e.g. "1970" becomes "1970-12-31", "1970-08" becomes "1970-08-31")
|
||||||
|
"""
|
||||||
|
|
||||||
|
if precision == ReleaseDatePrecision.YEAR:
|
||||||
|
effective = datetime.strptime(date_str, "%Y").date()
|
||||||
|
|
||||||
|
if fast_forward:
|
||||||
|
effective = date(effective.year, 12, 31)
|
||||||
|
|
||||||
|
elif precision == ReleaseDatePrecision.MONTH:
|
||||||
|
effective = datetime.strptime(date_str, "%Y-%m").date()
|
||||||
|
|
||||||
|
if fast_forward:
|
||||||
|
final_day = monthrange(effective.year, effective.month)[1] - 1
|
||||||
|
effective = date(effective.year, effective.month, final_day)
|
||||||
|
|
||||||
|
else:
|
||||||
|
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||||||
|
|
||||||
|
return effective
|
||||||
|
Loading…
Reference in New Issue
Block a user