Get the label playlist example working
This is the first relatively big milestone for me.
This commit is contained in:
parent
8d12f9dd1b
commit
21adb3acdc
47
examples/label_playlist.py
Normal file
47
examples/label_playlist.py
Normal file
@ -0,0 +1,47 @@
|
||||
# pylint: disable=missing-module-docstring, missing-function-docstring
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from spotify_actions.album import (
|
||||
album_filter_label,
|
||||
album_sort_release,
|
||||
album_to_simplified,
|
||||
album_to_tracks,
|
||||
)
|
||||
from spotify_actions.playlist import (
|
||||
playlist_current_user_all,
|
||||
playlist_current_user_assure,
|
||||
playlist_replace,
|
||||
)
|
||||
from spotify_actions.search import Query, search_albums
|
||||
from spotify_actions.util import read_credentials_oauth
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("-c", "--credentials", required=True)
|
||||
parser.add_argument("label")
|
||||
|
||||
cmdline = parser.parse_args()
|
||||
|
||||
client = read_credentials_oauth(
|
||||
cmdline.credentials,
|
||||
redirect_uri="https://speice.io/spotify/",
|
||||
scopes=["playlist-read-private", "playlist-modify-private", "playlist-modify-public"],
|
||||
)
|
||||
|
||||
user_playlists = playlist_current_user_all(client)
|
||||
playlists = playlist_current_user_assure(client, user_playlists, cmdline.label)
|
||||
playlist = list(playlists)[0]
|
||||
|
||||
albums_search = search_albums(client, Query(label=cmdline.label))
|
||||
albums_unfiltered = album_to_simplified(client, albums_search)
|
||||
albums_unsorted = album_filter_label(albums_unfiltered, cmdline.label)
|
||||
albums = album_sort_release(albums_unsorted, descending=True)
|
||||
tracks = album_to_tracks(client, albums)
|
||||
|
||||
playlist_replace(client, playlist.spotify_id, tracks)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,3 +1,7 @@
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
multi_line_output = 3
|
||||
|
||||
[tool.poetry]
|
||||
name = "spotify_actions"
|
||||
version = "0.1.0"
|
||||
|
@ -7,9 +7,26 @@ from typing import Iterable
|
||||
from spotify_model import Paging, SearchAlbum, SimplifiedAlbum, SimplifiedTrack
|
||||
from spotipy import Spotify
|
||||
|
||||
from .temporal import temporal_convert
|
||||
from .util import chunk, exhaust
|
||||
|
||||
|
||||
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
|
||||
"Filter albums that match an exact label string"
|
||||
for album in albums:
|
||||
if album.label == label:
|
||||
yield album
|
||||
|
||||
|
||||
def album_sort_release(albums: Iterable[SimplifiedAlbum], descending: bool = False) -> Iterable[SimplifiedAlbum]:
|
||||
"Sort albums by release date"
|
||||
all_albums = list(albums)
|
||||
for album in sorted(
|
||||
all_albums, key=lambda a: temporal_convert(a.release_date, a.release_date_precision), reverse=descending
|
||||
):
|
||||
yield album
|
||||
|
||||
|
||||
def album_to_simplified(
|
||||
client: Spotify, albums: Iterable[SearchAlbum], chunk_size: int = 20
|
||||
) -> Iterable[SimplifiedAlbum]:
|
||||
|
76
spotify_actions/playlist.py
Normal file
76
spotify_actions/playlist.py
Normal file
@ -0,0 +1,76 @@
|
||||
"""
|
||||
Selectors for querying and modifying playlists
|
||||
"""
|
||||
from typing import Iterable
|
||||
|
||||
from spotify_model import Paging, SimplifiedPlaylist, SimplifiedTrack
|
||||
from spotipy import Spotify
|
||||
|
||||
from .user import user_current
|
||||
from .util import chunk, exhaust
|
||||
|
||||
|
||||
def playlist_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist:
|
||||
"""
|
||||
Create a playlist for a user
|
||||
|
||||
https://developer.spotify.com/documentation/web-api/reference/#endpoint-create-playlist
|
||||
"""
|
||||
return SimplifiedPlaylist(**client.user_playlist_create(user_id, name))
|
||||
|
||||
|
||||
def playlist_current_user_all(client: Spotify) -> Iterable[SimplifiedPlaylist]:
|
||||
"""
|
||||
Get all playlists that belong to the current user
|
||||
"""
|
||||
|
||||
def _playlists(limit: int, offset: int) -> Paging:
|
||||
return Paging(**client.current_user_playlists(limit, offset))
|
||||
|
||||
for playlist in exhaust(_playlists):
|
||||
yield SimplifiedPlaylist(**playlist)
|
||||
|
||||
|
||||
def playlist_current_user_find(playlists: Iterable[SimplifiedPlaylist], name: str) -> Iterable[str]:
|
||||
"""
|
||||
Find all playlists for the current user that match a specific name
|
||||
"""
|
||||
for playlist in playlists:
|
||||
if playlist.name == name:
|
||||
yield playlist
|
||||
|
||||
|
||||
def playlist_current_user_assure(
|
||||
client: Spotify, playlists: Iterable[SimplifiedPlaylist], name: str
|
||||
) -> Iterable[SimplifiedPlaylist]:
|
||||
"""
|
||||
Find (or create if it doesn't exist) all playlists for the current user that match a specific name
|
||||
"""
|
||||
found = False
|
||||
|
||||
for playlist in playlist_current_user_find(playlists, name):
|
||||
found = True
|
||||
yield playlist
|
||||
|
||||
if not found:
|
||||
current_user_id = user_current(client).spotify_id
|
||||
playlist_create(client, current_user_id, name)
|
||||
|
||||
|
||||
def playlist_replace(
|
||||
client: Spotify, playlist_id: str, tracks: Iterable[SimplifiedTrack], chunk_size: int = 100
|
||||
) -> None:
|
||||
"""
|
||||
Replace all tracks in a playlist with the new provided tracks
|
||||
"""
|
||||
all_tracks = [t.spotify_id for t in tracks]
|
||||
|
||||
first = True
|
||||
|
||||
for track_chunk in chunk(all_tracks, chunk_size):
|
||||
if first:
|
||||
client.playlist_replace_items(playlist_id, track_chunk)
|
||||
first = False
|
||||
continue
|
||||
|
||||
client.playlist_add_items(playlist_id, track_chunk)
|
@ -10,6 +10,26 @@ from typing import Iterable
|
||||
from spotify_model import ReleaseDatePrecision, SearchAlbum
|
||||
|
||||
|
||||
def temporal_convert(date_str: str, precision: ReleaseDatePrecision) -> date:
|
||||
"""
|
||||
For date strings that don't have date-level precision, the date is treated as the final day within that period;
|
||||
thus, albums released in "1981" are effectively released on 1981-12-31, and albums released in "1981-07" are
|
||||
treated as "1981-07-31"
|
||||
"""
|
||||
|
||||
if precision == ReleaseDatePrecision.YEAR:
|
||||
actual = datetime.strptime(date_str, "%Y")
|
||||
effective = date(actual.year, 12, 31)
|
||||
elif precision == ReleaseDatePrecision.MONTH:
|
||||
actual = datetime.strptime(date_str, "%Y-%m")
|
||||
final_day = monthrange(actual.year, actual.month)[1] - 1
|
||||
effective = date(actual.year, actual.month, final_day)
|
||||
else:
|
||||
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||||
|
||||
return effective
|
||||
|
||||
|
||||
def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date) -> Iterable[SearchAlbum]:
|
||||
"""
|
||||
Filter albums to after a specific release date.
|
||||
@ -22,15 +42,7 @@ def temporal_released_after(albums: Iterable[SearchAlbum], released_after: date)
|
||||
logger = getLogger(__name__)
|
||||
|
||||
for album in albums:
|
||||
if album.release_date_precision == ReleaseDatePrecision.YEAR:
|
||||
actual_release = datetime.strptime(album.release_date, "%Y")
|
||||
effective_release = date(actual_release.year, 12, 31)
|
||||
elif album.release_date_precision == ReleaseDatePrecision.MONTH:
|
||||
actual_release = datetime.strptime(album.release_date, "%Y-%m")
|
||||
final_day = monthrange(actual_release.year, actual_release.month)[1] - 1
|
||||
effective_release = date(actual_release.year, actual_release.month, final_day)
|
||||
else:
|
||||
effective_release = datetime.strptime(album.release_date, "%Y-%m-%d").date()
|
||||
effective_release = temporal_convert(album.release_date, album.release_date_precision)
|
||||
|
||||
if effective_release >= released_after:
|
||||
logger.debug(
|
||||
|
11
spotify_actions/user.py
Normal file
11
spotify_actions/user.py
Normal file
@ -0,0 +1,11 @@
|
||||
"""
|
||||
Actions related to user information
|
||||
"""
|
||||
|
||||
from spotify_model import PrivateUser
|
||||
from spotipy import Spotify
|
||||
|
||||
|
||||
def user_current(client: Spotify) -> PrivateUser:
|
||||
"Get all details of the current user"
|
||||
return PrivateUser(**client.current_user())
|
@ -7,15 +7,15 @@ from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
|
||||
|
||||
import yaml
|
||||
from spotify_model import Paging
|
||||
from spotipy import Spotify, SpotifyClientCredentials
|
||||
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
|
||||
|
||||
DEFAULT_LIMIT = 50
|
||||
|
||||
T = TypeVar("T") # pylint: disable=invalid-name
|
||||
|
||||
|
||||
def read_credentials(path: Path) -> Spotify:
|
||||
"Read credentials from a YAML file and construct a Spotify client"
|
||||
def read_credentials_server(path: Path) -> Spotify:
|
||||
"Read credentials from a YAML file and construct a Spotify client using the server workflow"
|
||||
|
||||
with open(path, "r") as credentials_file:
|
||||
credentials = yaml.safe_load(credentials_file)
|
||||
@ -23,6 +23,17 @@ def read_credentials(path: Path) -> Spotify:
|
||||
return Spotify(client_credentials_manager=credentials_manager)
|
||||
|
||||
|
||||
def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str]) -> Spotify:
|
||||
"Read credentials from a YAML file and authorize a user"
|
||||
|
||||
with open(path, "r") as credentials_file:
|
||||
credentials = yaml.safe_load(credentials_file)
|
||||
credentials_manager = SpotifyOAuth(
|
||||
credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes
|
||||
)
|
||||
return Spotify(client_credentials_manager=credentials_manager)
|
||||
|
||||
|
||||
class Paginated(Protocol):
|
||||
"Protocol definition for functions that will be provided to the `exhaust` handler"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user