spotify_actions/spotify_actions/util.py

134 lines
4.3 KiB
Python

"""
Utility methods for working with the Spotify API
"""
from calendar import monthrange
from datetime import date, datetime
from math import ceil
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Protocol, TypeVar
import yaml
from spotify_model import CursorPaging, Paging, ReleaseDatePrecision
from spotipy import Spotify, SpotifyClientCredentials, SpotifyOAuth
DEFAULT_LIMIT = 50
T = TypeVar("T") # pylint: disable=invalid-name
def echo(elements: Iterable[T]) -> Iterable[T]:
"Echo the elements of an iterable and re-yield them"
for element in elements:
print(element)
yield element
def read_credentials_server(path: Path) -> Spotify:
"Read credentials from a YAML file and construct a Spotify client using the server workflow"
with open(path, "r") as credentials_file:
credentials = yaml.safe_load(credentials_file)
credentials_manager = SpotifyClientCredentials(credentials["client_id"], credentials["client_secret"])
return Spotify(client_credentials_manager=credentials_manager)
def read_credentials_oauth(path: Path, redirect_uri: str, scopes: List[str]) -> Spotify:
"Read credentials from a YAML file and authorize a user"
with open(path, "r") as credentials_file:
credentials = yaml.safe_load(credentials_file)
credentials_manager = SpotifyOAuth(
credentials["client_id"], credentials["client_secret"], redirect_uri, scope=scopes
)
return Spotify(client_credentials_manager=credentials_manager)
class Paginated(Protocol):
"Protocol definition for functions that will be provided to the `exhaust` handler"
def __call__(self, limit: int, offset: int) -> Paging:
...
def exhaust(
function: Paginated, initial: Optional[Paging] = None, limit: int = DEFAULT_LIMIT
) -> Iterable[Dict[str, Any]]:
"Exhaust a function that returns a pagination object"
response = initial if initial is not None else function(limit=limit, offset=0)
for item in response.items:
yield item
for i in range(1, ceil(response.total / limit)):
response = function(limit=response.limit, offset=response.limit * i)
for item in response.items:
yield item
class CursorPaginated(Protocol):
"Protocol definition for functions that will be provided to the `exhaust_cursor` handler"
def __call__(self, limit: int, after: Optional[str]) -> CursorPaging:
...
def exhaust_cursor(function: CursorPaginated, limit: int = DEFAULT_LIMIT) -> Iterable[Dict[str, Any]]:
"""Exhaust all items provided by a paging object"""
response = function(limit=limit, after=None)
for item in response.items:
yield item
while response.next_href is not None:
response = function(limit=limit, after=response.cursors.after)
for item in response.items:
yield item
def chunk(items: Iterable[T], size: int) -> Iterable[List[T]]:
"Split an iterable into smaller chunks"
assert size >= 1
return_items = []
for item in items:
return_items.append(item)
if len(return_items) == size:
yield return_items
return_items = []
if return_items:
yield return_items
def parse_release_date(date_str: str, precision: ReleaseDatePrecision, fast_forward: bool = False) -> date:
"""
Parse a date string with provided precision to a concrete date.
`fast_forward` controls how precision is resolved:
- If `False` (default), dates are assumed to be at the start of the period
(e.g. "1970" becomes "1970-01-01", "1970-08" becomes "1907-08-01")
- If `True`, dates are "fast-forwarded" to the end of the given period
(e.g. "1970" becomes "1970-12-31", "1970-08" becomes "1970-08-31")
"""
if precision == ReleaseDatePrecision.YEAR:
effective = datetime.strptime(date_str, "%Y").date()
if fast_forward:
effective = date(effective.year, 12, 31)
elif precision == ReleaseDatePrecision.MONTH:
effective = datetime.strptime(date_str, "%Y-%m").date()
if fast_forward:
final_day = monthrange(effective.year, effective.month)[1] - 1
effective = date(effective.year, effective.month, final_day)
else:
effective = datetime.strptime(date_str, "%Y-%m-%d").date()
return effective