Compare commits

..

7 Commits

9 changed files with 196 additions and 60 deletions

4
poetry.lock generated
View File

@ -193,7 +193,7 @@ toml = ">=0.7.1"
name = "pyyaml" name = "pyyaml"
version = "5.4.1" version = "5.4.1"
description = "YAML parser and emitter for Python" description = "YAML parser and emitter for Python"
category = "dev" category = "main"
optional = false optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
@ -346,7 +346,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.7" python-versions = "^3.7"
content-hash = "deade799321bffd9f9db8e7e77a17dca75b04473b6d0133c5adb929c8fc2057e" content-hash = "adf01d1ee8cf03745f2ba22cebfe7740d0460f4cbe120e5a6c9404563fc382e8"
[metadata.files] [metadata.files]
appdirs = [ appdirs = [

View File

@ -13,6 +13,7 @@ license = "MIT"
python = "^3.7" python = "^3.7"
spotipy = "^2.18.0" spotipy = "^2.18.0"
spotify-model = {path = "../spotify_model", develop = true} spotify-model = {path = "../spotify_model", develop = true}
PyYAML = "=5.3.1"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pre-commit = "^2.13.0" pre-commit = "^2.13.0"

View File

@ -11,10 +11,17 @@ from spotipy import Spotify
from .util import chunk, exhaust, parse_release_date from .util import chunk, exhaust, parse_release_date
def album_filter_label(albums: Iterable[SimplifiedAlbum], label: str) -> Iterable[SearchAlbum]: def album_filter_label(
"Filter albums that match an exact label string" albums: Iterable[SimplifiedAlbum], label: str, include: bool = True
) -> Iterable[SimplifiedAlbum]:
"""
Filter albums based on the label.
If `include`, yield labels that match the provided string (typical filter behavior).
Otherwise, yield only those albums that don't match the label.
"""
for album in albums: for album in albums:
if album.label == label: if (album.label == label) == include:
yield album yield album

45
spotify_actions/artist.py Normal file
View File

@ -0,0 +1,45 @@
"""
Selectors for querying artist information
"""
from argparse import ArgumentParser
from functools import partial
from typing import Iterable
from spotify_model import Paging, SearchAlbum
from spotipy import Spotify
from .util import exhaust, read_credentials_server
def artist_albums(client: Spotify, artist_ids: Iterable[str]) -> Iterable[SearchAlbum]:
"""
https://developer.spotify.com/documentation/web-api/reference/#category-artists
"""
def _artist_albums(artist_id: str, limit: int, offset: int) -> Paging:
return Paging(**client.artist_albums(artist_id, limit=limit, offset=offset))
for artist_id in artist_ids:
albums_function = partial(_artist_albums, artist_id)
for album in exhaust(albums_function):
yield SearchAlbum(**album)
def main() -> None:
"Simple runner for quickly retrieving artist info"
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("artist_ids", nargs="+")
cmdline = parser.parse_args()
client = read_credentials_server(cmdline.credentials)
print("Artist albums:")
for album in artist_albums(client, cmdline.artist_ids):
print(album)
if __name__ == "__main__":
main()

View File

@ -1,29 +0,0 @@
"""
Join objects from multiple sources into a single stream
"""
from functools import partial
from typing import Iterable, TypeVar
T = TypeVar("T") # pylint: disable=invalid-name
def combinator_take(items: Iterable[T], count: int) -> Iterable[T]:
"Retrieve the first `count` items from an iterator"
observed = 0
for item in items:
observed += 1
yield item
if observed >= count:
break
combinator_first = partial(combinator_take, count=1)
def combinator_join(*args: Iterable[T]) -> Iterable[T]:
"Join the results of many album producers by exhausting all albums from each producer"
for arg in args:
for item in arg:
yield item

View File

@ -7,11 +7,11 @@ from typing import Iterable
from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack from spotify_model import Paging, PlaylistTrack, SimplifiedPlaylist, SimplifiedTrack
from spotipy import Spotify from spotipy import Spotify
from .user import user_current from .user import current_user
from .util import chunk, exhaust from .util import chunk, exhaust
def playlist_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist: def playlist_current_user_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlaylist:
""" """
Create a playlist for a user Create a playlist for a user
@ -20,18 +20,6 @@ def playlist_create(client: Spotify, user_id: str, name: str) -> SimplifiedPlayl
return SimplifiedPlaylist(**client.user_playlist_create(user_id, name)) return SimplifiedPlaylist(**client.user_playlist_create(user_id, name))
def playlist_current_user_all(client: Spotify) -> Iterable[SimplifiedPlaylist]:
"""
Get all playlists that belong to the current user
"""
def _playlists(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_playlists(limit, offset))
for playlist in exhaust(_playlists):
yield SimplifiedPlaylist(**playlist)
def playlist_current_user_find(playlists: Iterable[SimplifiedPlaylist], name: str) -> Iterable[str]: def playlist_current_user_find(playlists: Iterable[SimplifiedPlaylist], name: str) -> Iterable[str]:
""" """
Find all playlists for the current user that match a specific name Find all playlists for the current user that match a specific name
@ -54,11 +42,11 @@ def playlist_current_user_assure(
yield playlist yield playlist
if not found: if not found:
current_user_id = user_current(client).spotify_id current_user_id = current_user(client).spotify_id
yield playlist_create(client, current_user_id, name) yield playlist_current_user_create(client, current_user_id, name)
def playlist_replace( def playlist_current_user_replace(
client: Spotify, playlist_id: str, tracks: Iterable[SimplifiedTrack], chunk_size: int = 100 client: Spotify, playlist_id: str, tracks: Iterable[SimplifiedTrack], chunk_size: int = 100
) -> None: ) -> None:
""" """
@ -77,7 +65,7 @@ def playlist_replace(
client.playlist_add_items(playlist_id, track_chunk) client.playlist_add_items(playlist_id, track_chunk)
def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[SimplifiedTrack]: def playlist_tracks(client: Spotify, playlist_ids: Iterable[str]) -> Iterable[PlaylistTrack]:
""" """
Given a playlist_id, fetch all songs currently in the playlist Given a playlist_id, fetch all songs currently in the playlist
""" """

View File

@ -21,7 +21,7 @@ def track_from_ids(
yield track if isinstance(track, str) else track.spotify_id yield track if isinstance(track, str) else track.spotify_id
for track_id_chunk in chunk(_to_id(), chunk_size): for track_id_chunk in chunk(_to_id(), chunk_size):
track_chunk = client.tracks(track_id_chunk) track_chunk = client.tracks(track_id_chunk)["tracks"]
for track in track_chunk: for track in track_chunk:
yield Track(**track) yield Track(**track)

View File

@ -1,11 +1,114 @@
""" """
Actions related to user information Actions related to user information
""" """
from argparse import ArgumentParser
from typing import Iterable, Optional
from spotify_model import PrivateUser from spotify_model import (
Artist,
CursorPaging,
Paging,
PublicUser,
SavedAlbum,
SavedTrack,
SimplifiedPlaylist,
)
from spotipy import Spotify from spotipy import Spotify
from .util import exhaust, exhaust_cursor, read_credentials_oauth
def user_current(client: Spotify) -> PrivateUser:
"Get all details of the current user" def current_user(client: Spotify) -> PublicUser:
return PrivateUser(**client.current_user()) """
Get all details of the current user
Required scopes:
- user-read-private
Optional scopes:
- user-read-email
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-current-users-profile
"""
return PublicUser(**client.current_user())
def current_user_library_albums(client: Spotify) -> Iterable[SavedAlbum]:
"""
Get the albums a user has saved to their library
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-users-saved-albums
"""
def _library_albums(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_saved_albums(limit, offset))
for album in exhaust(_library_albums):
yield SavedAlbum(**album)
def current_user_library_tracks(client: Spotify) -> Iterable[SavedTrack]:
"""
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-users-saved-tracks
"""
def _library_tracks(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_saved_tracks(limit, offset))
for track in exhaust(_library_tracks):
yield SavedTrack(**track)
def current_user_playlists(client: Spotify) -> Iterable[SimplifiedPlaylist]:
"""
Get all playlists that belong to the current user
"""
def _playlists(limit: int, offset: int) -> Paging:
return Paging(**client.current_user_playlists(limit, offset))
for playlist in exhaust(_playlists):
yield SimplifiedPlaylist(**playlist)
def current_user_followed_artists(client: Spotify) -> Iterable[Artist]:
"""
Required scopes:
- user-follow-read
https://developer.spotify.com/documentation/web-api/reference/#endpoint-get-followed
"""
def _followed_artists(limit: int, after: Optional[str]) -> CursorPaging:
# This one is a little different; rather than using an offset counter,
# we inspect the response to get the "last"
return CursorPaging(**client.current_user_followed_artists(limit=limit, after=after)["artists"])
for artist in exhaust_cursor(_followed_artists):
yield Artist(**artist)
def main() -> None:
"Simple runner for retrieving current user information"
parser = ArgumentParser()
parser.add_argument("-c", "--credentials", required=True)
parser.add_argument("-r", "--redirect-uri", required=True)
cmdline = parser.parse_args()
client = read_credentials_oauth(
cmdline.credentials,
redirect_uri=cmdline.redirect_uri,
scopes=["user-read-private", "user-follow-read"],
)
print("Current user:")
print(current_user(client))
print()
print("Current user followed artists:")
for artist in current_user_followed_artists(client):
print(artist)
if __name__ == "__main__":
main()

View File

@ -8,7 +8,7 @@ from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml import yaml
from spotify_model import Paging, ReleaseDatePrecision from spotify_model import CursorPaging, Paging, ReleaseDatePrecision
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
DEFAULT_LIMIT = 50 DEFAULT_LIMIT = 50
@ -32,13 +32,13 @@ def read_credentials_server(path: Path) -> Spotify:
return Spotify(client_credentials_manager=credentials_manager) return Spotify(client_credentials_manager=credentials_manager)
def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str]) -> Spotify: def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str], open_browser: bool = True) -> Spotify:
"Read credentials from a YAML file and authorize a user" "Read credentials from a YAML file and authorize a user"
with open(path, "r") as credentials_file: with open(path, "r") as credentials_file:
credentials = yaml.safe_load(credentials_file) credentials = yaml.safe_load(credentials_file)
credentials_manager = SpotifyOAuth( credentials_manager = SpotifyOAuth(
credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes, open_browser=open_browser
) )
return Spotify(client_credentials_manager=credentials_manager) return Spotify(client_credentials_manager=credentials_manager)
@ -65,6 +65,27 @@ def exhaust(
yield item yield item
class CursorPaginated(Protocol):
"Protocol definition for functions that will be provided to the `exhaust_cursor` handler"
def __call__(self, limit: int, after: Optional[str]) -> CursorPaging:
...
def exhaust_cursor(function: CursorPaginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[str, Any]]:
"""Exhaust all items provided by a paging object"""
response = function(limit=limit, after=None)
for item in response.items:
yield item
while response.next_href is not None:
response = function(limit=limit, after=response.cursors.after)
for item in response.items:
yield item
def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]: def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
"Split an iterable into smaller chunks" "Split an iterable into smaller chunks"