Compare commits

..

No commits in common. "master" and "label_playlist" have entirely different histories.

9 changed files with 60 additions and 196 deletions

4
poetry.lock generated
View File

@ -193,7 +193,7 @@ toml = ">=0.7.1"
name = "pyyaml"
version = "5.4.1"
description = "YAML parser and emitter for Python"
category = "main"
category = "dev"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
@ -346,7 +346,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes
[metadata]
lock-version = "1.1"
python-versions = "^3.7"
content-hash = "adf01d1ee8cf03745f2ba22cebfe7740d0460f4cbe120e5a6c9404563fc382e8"
content-hash = "deade799321bffd9f9db8e7e77a17dca75b04473b6d0133c5adb929c8fc2057e"
[metadata.files]
appdirs = [

View File

@ -13,7 +13,6 @@ license = "MIT"
python = "^3.7"
spotipy = "^2.18.0"
spotify-model = {path = "../spotify_model", develop = true}
PyYAML = "=5.3.1"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"

View File

@ -11,17 +11,10 @@ from spotipy import Spotify
from .util import chunk, exhaust, parse_release_date
def album_filter_label(
albums: Iterable[SimplifiedAlbum], label: str, include: bool = True
) -> Iterable[SimplifiedAlbum]:
"""
Filter albums based on the label.
If `include`, yield labels that match the provided string (typical filter behavior).
Otherwise, yield only those albums that don't match the label.
"""
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]:
"Filter albums that match an exact label string"
for album in albums:
if (album.label == label) == include:
if album.label == label:
yield album

View File

@ -1,45 +0,0 @@
"""
Selectors for querying artist information
"""
from argparse import ArgumentParser
from functools import partial
from typing import Iterable
from spotify_model import Paging, SearchAlbum
from spotipy import Spotify
from .util import exhaust, read_credentials_server
def artist_albums(client: Spotify, artist_ids: Iterable[str]) -> Iterable[SearchAlbum]:
"""
https://developer.spotify.com/documentation/web-api/reference/#category-artists
"""
def _artist_albums(artist_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.artist_albums(artist_id, limit=limit, offset=offset))
for artist_id in artist_ids:
albums_function = partial(_artist_albums, artist_id)
for album in exhaust(albums_function):
yield SearchAlbum(**album)
def main() -> None:
"Simple runner for quickly retrieving artist info"
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("artist_ids", nargs="+")
cmdline = parser.parse_args()
client = read_credentials_server(cmdline.credentials)
print("Artist albums:")
for album in artist_albums(client, cmdline.artist_ids):
print(album)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,29 @@
"""
Join objects from multiple sources into a single stream
"""
from functools import partial
from typing import Iterable, TypeVar
T = TypeVar("T") # pylint: disable=invalid-name
def combinator_take(items: Iterable[T], count: int) -> Iterable[T]:
"Retrieve the first `count` items from an iterator"
observed = 0
for item in items:
observed += 1
yield item
if observed >= count:
break
combinator_first = partial(combinator_take, count=1)
def combinator_join(*args: Iterable[T]) -> Iterable[T]:
"Join the results of many album producers by exhausting all albums from each producer"
for arg in args:
for item in arg:
yield item

View File

@ -7,11 +7,11 @@ from typing import Iterable
from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack
from spotipy import Spotify
from .user import current_user
from .user import user_current
from .util import chunk, exhaust
def playlist_current_user_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist:
def playlist_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist:
"""
Create a playlist for a user
@ -20,6 +20,18 @@ def playlist_current_user_create(client: Spotify, user_id: str, name: str) -> Si
return SimplifiedPlaylist(**client.user_playlist_create(user_id, name))
def playlist_current_user_all(client: Spotify) -> Iterable[SimplifiedPlaylist]:
"""
Get all playlists that belong to the current user
"""
def _playlists(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_playlists(limit, offset))
for playlist in exhaust(_playlists):
yield SimplifiedPlaylist(**playlist)
def playlist_current_user_find(playlists: Iterable[SimplifiedPlaylist], name: str) -> Iterable[str]:
"""
Find all playlists for the current user that match a specific name
@ -42,11 +54,11 @@ def playlist_current_user_assure(
yield playlist
if not found:
current_user_id = current_user(client).spotify_id
yield playlist_current_user_create(client, current_user_id, name)
current_user_id = user_current(client).spotify_id
yield playlist_create(client, current_user_id, name)
def playlist_current_user_replace(
def playlist_replace(
client: Spotify, playlist_id: str, tracks: Iterable[SimplifiedTrack], chunk_size: int = 100
) -> None:
"""
@ -65,7 +77,7 @@ def playlist_current_user_replace(
client.playlist_add_items(playlist_id, track_chunk)
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[PlaylistTrack]:
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]:
"""
Given a playlist_id, fetch all songs currently in the playlist
"""

View File

@ -21,7 +21,7 @@ def track_from_ids(
yield track if isinstance(track, str) else track.spotify_id
for track_id_chunk in chunk(_to_id(), chunk_size):
track_chunk = client.tracks(track_id_chunk)["tracks"]
track_chunk = client.tracks(track_id_chunk)
for track in track_chunk:
yield Track(**track)

View File

@ -1,114 +1,11 @@
"""
Actions related to user information
"""
from argparse import ArgumentParser
from typing import Iterable, Optional
from spotify_model import (
Artist,
CursorPaging,
Paging,
PublicUser,
SavedAlbum,
SavedTrack,
SimplifiedPlaylist,
)
from spotify_model import PrivateUser
from spotipy import Spotify
from .util import exhaust, exhaust_cursor, read_credentials_oauth
def current_user(client: Spotify) -> PublicUser:
"""
Get all details of the current user
Required scopes:
- user-read-private
Optional scopes:
- user-read-email
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-current-users-profile
"""
return PublicUser(**client.current_user())
def current_user_library_albums(client: Spotify) -> Iterable[SavedAlbum]:
"""
Get the albums a user has saved to their library
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-users-saved-albums
"""
def _library_albums(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_saved_albums(limit, offset))
for album in exhaust(_library_albums):
yield SavedAlbum(**album)
def current_user_library_tracks(client: Spotify) -> Iterable[SavedTrack]:
"""
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-users-saved-tracks
"""
def _library_tracks(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_saved_tracks(limit, offset))
for track in exhaust(_library_tracks):
yield SavedTrack(**track)
def current_user_playlists(client: Spotify) -> Iterable[SimplifiedPlaylist]:
"""
Get all playlists that belong to the current user
"""
def _playlists(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_playlists(limit, offset))
for playlist in exhaust(_playlists):
yield SimplifiedPlaylist(**playlist)
def current_user_followed_artists(client: Spotify) -> Iterable[Artist]:
"""
Required scopes:
- user-follow-read
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-followed
"""
def _followed_artists(limit: int, after: Optional[str]) -> CursorPaging:
# This one is a little different; rather than using an offset counter,
# we inspect the response to get the "last"
return CursorPaging(**client.current_user_followed_artists(limit=limit, after=after)["artists"])
for artist in exhaust_cursor(_followed_artists):
yield Artist(**artist)
def main() -> None:
"Simple runner for retrieving current user information"
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("-r", "--redirect-uri", required=True)
cmdline = parser.parse_args()
client = read_credentials_oauth(
cmdline.credentials,
redirect_uri=cmdline.redirect_uri,
scopes=["user-read-private", "user-follow-read"],
)
print("Current user:")
print(current_user(client))
print()
print("Current user followed artists:")
for artist in current_user_followed_artists(client):
print(artist)
if __name__ == "__main__":
main()
def user_current(client: Spotify) -> PrivateUser:
"Get all details of the current user"
return PrivateUser(**client.current_user())

View File

@ -8,7 +8,7 @@ from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml
from spotify_model import CursorPaging, Paging, ReleaseDatePrecision
from spotify_model import Paging, ReleaseDatePrecision
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
DEFAULT_LIMIT = 50
@ -32,13 +32,13 @@ def read_credentials_server(path: Path) -> Spotify:
return Spotify(client_credentials_manager=credentials_manager)
def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str], open_browser: bool = True) -> Spotify:
def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str]) -> Spotify:
"Read credentials from a YAML file and authorize a user"
with open(path, "r") as credentials_file:
credentials = yaml.safe_load(credentials_file)
credentials_manager = SpotifyOAuth(
credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes, open_browser=open_browser
credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes
)
return Spotify(client_credentials_manager=credentials_manager)
@ -65,27 +65,6 @@ def exhaust(
yield item
class CursorPaginated(Protocol):
"Protocol definition for functions that will be provided to the `exhaust_cursor` handler"
def __call__(self, limit: int, after: Optional[str]) -> CursorPaging:
...
def exhaust_cursor(function: CursorPaginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[str, Any]]:
"""Exhaust all items provided by a paging object"""
response = function(limit=limit, after=None)
for item in response.items:
yield item
while response.next_href is not None:
response = function(limit=limit, after=response.cursors.after)
for item in response.items:
yield item
def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
"Split an iterable into smaller chunks"