Files
navipy/src/api/client.py
2025-12-16 03:54:17 -05:00

771 lines
23 KiB
Python

"""
Subsonic API client for Navidrome communication
"""
import hashlib
import json
from typing import Optional, List, Dict, Any
from urllib.parse import urlencode
import requests
from .auth import SubsonicAuth
from .models import Artist, Album, Song, Playlist, Genre
class SubsonicError(Exception):
"""Subsonic API error"""
ERROR_CODES = {
0: "Generic error",
10: "Required parameter missing",
20: "Incompatible client version",
30: "Incompatible server version",
40: "Wrong username or password",
41: "Token authentication not supported",
50: "User not authorized",
60: "Trial expired",
70: "Data not found"
}
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"[{code}] {message}")
class SubsonicClient:
"""Subsonic API client for Navidrome communication"""
API_VERSION = '1.16.1'
CLIENT_NAME = 'NaviPy'
def __init__(self, serverUrl: str, username: str, password: str):
"""
Initialize the Subsonic client
Args:
serverUrl: Base URL of the Navidrome server (e.g., https://music.example.com)
username: Navidrome username
password: Navidrome password
"""
self.serverUrl = serverUrl.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
self.session.headers.update({
'User-Agent': f'{self.CLIENT_NAME}/1.0'
})
self._cache: Dict[str, Any] = {}
def _cacheKey(self, endpoint: str, params: Optional[dict] = None) -> str:
"""Generate a cache key from endpoint and parameters."""
keyData = {"endpoint": endpoint, "params": params or {}}
keyString = json.dumps(keyData, sort_keys=True)
return hashlib.md5(keyString.encode()).hexdigest()
def _getCached(self, endpoint: str, params: Optional[dict] = None) -> Optional[Any]:
"""Retrieve a cached response if available."""
key = self._cacheKey(endpoint, params)
return self._cache.get(key)
def _setCache(self, endpoint: str, params: Optional[dict], value: Any) -> None:
"""Store a response in the cache."""
key = self._cacheKey(endpoint, params)
self._cache[key] = value
def clearCache(self) -> None:
"""Clear all cached responses. Call this on library refresh."""
self._cache.clear()
def _makeRequest(self, endpoint: str, params: dict = None) -> dict:
"""
Make authenticated API request
Args:
endpoint: API endpoint (e.g., 'ping', 'getArtists')
params: Additional parameters for the request
Returns:
dict: Parsed API response
Raises:
SubsonicError: If the API returns an error
requests.RequestException: If the network request fails
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
baseParams = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'f': 'json'
}
if params:
baseParams.update(params)
url = f"{self.serverUrl}/rest/{endpoint}"
response = self.session.get(url, params=baseParams, timeout=30)
response.raise_for_status()
data = response.json()
# Subsonic wraps responses in 'subsonic-response'
subsonicResponse = data.get('subsonic-response', {})
if subsonicResponse.get('status') == 'failed':
error = subsonicResponse.get('error', {})
raise SubsonicError(
code=error.get('code', 0),
message=error.get('message', 'Unknown error')
)
return subsonicResponse
def ping(self) -> bool:
"""
Test connection to the server
Returns:
bool: True if connection successful
"""
response = self._makeRequest('ping')
return response.get('status') == 'ok'
def getStreamUrl(self, songId: str, format: str = None, maxBitRate: int = None) -> str:
"""
Generate streaming URL for a song
Args:
songId: ID of the song to stream
format: Optional transcoding format (mp3, opus, etc.)
maxBitRate: Optional max bitrate for transcoding
Returns:
str: Full streaming URL with authentication
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
params = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'id': songId
}
if format:
params['format'] = format
if maxBitRate:
params['maxBitRate'] = maxBitRate
return f"{self.serverUrl}/rest/stream?{urlencode(params)}"
def getCoverArtUrl(self, coverId: str, size: int = None) -> str:
"""
Generate cover art URL
Args:
coverId: Cover art ID
size: Optional size in pixels
Returns:
str: Full cover art URL with authentication
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
params = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'id': coverId
}
if size:
params['size'] = size
return f"{self.serverUrl}/rest/getCoverArt?{urlencode(params)}"
# ==================== Browsing ====================
def getArtists(self, musicFolderId: str = None) -> List[Artist]:
"""
Get all artists
Args:
musicFolderId: Optional music folder to filter by
Returns:
List of Artist objects
"""
params = {}
if musicFolderId:
params['musicFolderId'] = musicFolderId
cached = self._getCached('getArtists', params)
if cached is not None:
return cached
response = self._makeRequest('getArtists', params)
artists = []
artistsData = response.get('artists', {})
for index in artistsData.get('index', []):
for artistData in index.get('artist', []):
artists.append(Artist.fromDict(artistData))
self._setCache('getArtists', params, artists)
return artists
def getArtist(self, artistId: str) -> Dict[str, Any]:
"""
Get artist details including albums
Args:
artistId: ID of the artist
Returns:
dict with 'artist' (Artist) and 'albums' (List[Album])
"""
params = {'id': artistId}
cached = self._getCached('getArtist', params)
if cached is not None:
return cached
response = self._makeRequest('getArtist', params)
artistData = response.get('artist', {})
artist = Artist.fromDict(artistData)
albums = [Album.fromDict(a) for a in artistData.get('album', [])]
result = {'artist': artist, 'albums': albums}
self._setCache('getArtist', params, result)
return result
def getAlbum(self, albumId: str) -> Dict[str, Any]:
"""
Get album details including songs
Args:
albumId: ID of the album
Returns:
dict with 'album' (Album) and 'songs' (List[Song])
"""
params = {'id': albumId}
cached = self._getCached('getAlbum', params)
if cached is not None:
return cached
response = self._makeRequest('getAlbum', params)
albumData = response.get('album', {})
album = Album.fromDict(albumData)
songs = [Song.fromDict(s) for s in albumData.get('song', [])]
result = {'album': album, 'songs': songs}
self._setCache('getAlbum', params, result)
return result
def getSong(self, songId: str) -> Song:
"""
Get song details
Args:
songId: ID of the song
Returns:
Song object
"""
params = {'id': songId}
cached = self._getCached('getSong', params)
if cached is not None:
return cached
response = self._makeRequest('getSong', params)
result = Song.fromDict(response.get('song', {}))
self._setCache('getSong', params, result)
return result
def getGenres(self) -> List[Genre]:
"""
Get all genres
Returns:
List of Genre objects
"""
cached = self._getCached('getGenres', None)
if cached is not None:
return cached
response = self._makeRequest('getGenres')
genresData = response.get('genres', {})
result = [Genre.fromDict(g) for g in genresData.get('genre', [])]
self._setCache('getGenres', None, result)
return result
def getAlbumList(self, listType: str, size: int = 20, offset: int = 0,
musicFolderId: str = None, genre: str = None,
fromYear: int = None, toYear: int = None) -> List[Album]:
"""
Get a list of albums by criteria (newest, frequent, recent, random, highest, alphabetical).
Args:
listType: Album list type (e.g., 'newest', 'frequent', 'recent', 'random', 'highest')
size: Number of albums to return
offset: Offset for pagination
musicFolderId: Optional folder filter
genre: Optional genre filter
fromYear: Optional starting year
toYear: Optional ending year
Returns:
List of Album objects
"""
params = {
'type': listType,
'size': size,
'offset': offset
}
if musicFolderId:
params['musicFolderId'] = musicFolderId
if genre:
params['genre'] = genre
if fromYear:
params['fromYear'] = fromYear
if toYear:
params['toYear'] = toYear
cached = self._getCached('getAlbumList2', params)
if cached is not None:
return cached
response = self._makeRequest('getAlbumList2', params)
albumsData = response.get('albumList2', {})
result = [Album.fromDict(a) for a in albumsData.get('album', [])]
self._setCache('getAlbumList2', params, result)
return result
def getSongsByGenre(self, genre: str, count: int = 100, offset: int = 0) -> List[Song]:
"""
Get songs by genre
Args:
genre: Genre name
count: Maximum number of songs to return
offset: Pagination offset
Returns:
List of Song objects
"""
params = {
'genre': genre,
'count': count,
'offset': offset
}
cached = self._getCached('getSongsByGenre', params)
if cached is not None:
return cached
response = self._makeRequest('getSongsByGenre', params)
songsData = response.get('songsByGenre', {})
result = [Song.fromDict(s) for s in songsData.get('song', [])]
self._setCache('getSongsByGenre', params, result)
return result
def getRandomSongs(self, size: int = 10, genre: str = None,
fromYear: int = None, toYear: int = None) -> List[Song]:
"""
Get random songs
Args:
size: Number of songs to return
genre: Optional genre filter
fromYear: Optional start year filter
toYear: Optional end year filter
Returns:
List of Song objects
"""
params = {'size': size}
if genre:
params['genre'] = genre
if fromYear:
params['fromYear'] = fromYear
if toYear:
params['toYear'] = toYear
response = self._makeRequest('getRandomSongs', params)
songsData = response.get('randomSongs', {})
return [Song.fromDict(s) for s in songsData.get('song', [])]
# ==================== Search ====================
def search(self, query: str, artistCount: int = 20,
albumCount: int = 20, songCount: int = 20) -> Dict[str, List]:
"""
Search for artists, albums, and songs
Args:
query: Search query
artistCount: Max artists to return
albumCount: Max albums to return
songCount: Max songs to return
Returns:
dict with 'artists', 'albums', 'songs' lists
"""
params = {
'query': query,
'artistCount': artistCount,
'albumCount': albumCount,
'songCount': songCount
}
cached = self._getCached('search3', params)
if cached is not None:
return cached
response = self._makeRequest('search3', params)
searchResult = response.get('searchResult3', {})
result = {
'artists': [Artist.fromDict(a) for a in searchResult.get('artist', [])],
'albums': [Album.fromDict(a) for a in searchResult.get('album', [])],
'songs': [Song.fromDict(s) for s in searchResult.get('song', [])]
}
self._setCache('search3', params, result)
return result
# ==================== Playlists ====================
def getPlaylists(self) -> List[Playlist]:
"""
Get all playlists
Returns:
List of Playlist objects
"""
cached = self._getCached('getPlaylists', None)
if cached is not None:
return cached
response = self._makeRequest('getPlaylists')
playlistsData = response.get('playlists', {})
result = [Playlist.fromDict(p) for p in playlistsData.get('playlist', [])]
self._setCache('getPlaylists', None, result)
return result
def getPlaylist(self, playlistId: str) -> Playlist:
"""
Get playlist details including songs
Args:
playlistId: ID of the playlist
Returns:
Playlist object with songs populated
"""
params = {'id': playlistId}
cached = self._getCached('getPlaylist', params)
if cached is not None:
return cached
response = self._makeRequest('getPlaylist', params)
result = Playlist.fromDict(response.get('playlist', {}))
self._setCache('getPlaylist', params, result)
return result
def createPlaylist(self, name: str, songIds: List[str] = None) -> Playlist:
"""
Create a new playlist
Args:
name: Playlist name
songIds: Optional list of song IDs to add
Returns:
Created Playlist object
"""
params = {'name': name}
if songIds:
params['songId'] = songIds
response = self._makeRequest('createPlaylist', params)
self._invalidatePlaylistCache()
return Playlist.fromDict(response.get('playlist', {}))
def updatePlaylist(self, playlistId: str, name: str = None,
songIdsToAdd: List[str] = None,
songIndexesToRemove: List[int] = None) -> None:
"""
Update a playlist
Args:
playlistId: ID of the playlist
name: New name (optional)
songIdsToAdd: Song IDs to add (optional)
songIndexesToRemove: Song indexes to remove (optional)
"""
params = {'playlistId': playlistId}
if name:
params['name'] = name
if songIdsToAdd:
params['songIdToAdd'] = songIdsToAdd
if songIndexesToRemove:
params['songIndexToRemove'] = songIndexesToRemove
self._makeRequest('updatePlaylist', params)
self._invalidatePlaylistCache(playlistId)
def deletePlaylist(self, playlistId: str) -> None:
"""
Delete a playlist
Args:
playlistId: ID of the playlist to delete
"""
self._makeRequest('deletePlaylist', {'id': playlistId})
self._invalidatePlaylistCache(playlistId)
def _invalidatePlaylistCache(self, playlistId: str = None) -> None:
"""Invalidate playlist-related cache entries."""
listKey = self._cacheKey('getPlaylists', None)
self._cache.pop(listKey, None)
if playlistId:
detailKey = self._cacheKey('getPlaylist', {'id': playlistId})
self._cache.pop(detailKey, None)
# ==================== User Actions ====================
def star(self, itemId: str, itemType: str = 'song') -> None:
"""
Star (favorite) an item
Args:
itemId: ID of the item
itemType: Type of item ('song', 'album', 'artist')
"""
paramName = {
'song': 'id',
'album': 'albumId',
'artist': 'artistId'
}.get(itemType, 'id')
self._makeRequest('star', {paramName: itemId})
self._invalidateStarredCache()
def unstar(self, itemId: str, itemType: str = 'song') -> None:
"""
Unstar (unfavorite) an item
Args:
itemId: ID of the item
itemType: Type of item ('song', 'album', 'artist')
"""
paramName = {
'song': 'id',
'album': 'albumId',
'artist': 'artistId'
}.get(itemType, 'id')
self._makeRequest('unstar', {paramName: itemId})
self._invalidateStarredCache()
def _invalidateStarredCache(self) -> None:
"""Invalidate the starred/favorites cache."""
key = self._cacheKey('getStarred2', None)
self._cache.pop(key, None)
def setRating(self, itemId: str, rating: int) -> None:
"""
Set rating for an item (1-5 stars, 0 to remove)
Args:
itemId: ID of the item
rating: Rating value (0-5)
"""
self._makeRequest('setRating', {'id': itemId, 'rating': rating})
def scrobble(self, songId: str, submission: bool = True) -> None:
"""
Scrobble a song (register as played)
Args:
songId: ID of the song
submission: True for full scrobble, False for "now playing"
"""
self._makeRequest('scrobble', {'id': songId, 'submission': submission})
# ==================== Play Queue ====================
def getPlayQueue(self) -> Dict[str, Any]:
"""
Get the saved play queue
Returns:
dict with 'songs' (List[Song]), 'current' (str), 'position' (int)
"""
response = self._makeRequest('getPlayQueue')
queueData = response.get('playQueue', {})
return {
'songs': [Song.fromDict(s) for s in queueData.get('entry', [])],
'current': queueData.get('current'),
'position': queueData.get('position', 0)
}
def savePlayQueue(self, songIds: List[str], current: str = None,
position: int = None) -> None:
"""
Save the current play queue
Args:
songIds: List of song IDs in the queue
current: ID of currently playing song
position: Position in current song (milliseconds)
"""
params = {'id': songIds}
if current:
params['current'] = current
if position is not None:
params['position'] = position
self._makeRequest('savePlayQueue', params)
# ==================== Starred/Favorites ====================
def getStarred(self) -> Dict[str, List]:
"""
Get all starred items
Returns:
dict with 'artists', 'albums', 'songs' lists
"""
cached = self._getCached('getStarred2', None)
if cached is not None:
return cached
response = self._makeRequest('getStarred2')
starredData = response.get('starred2', {})
result = {
'artists': [Artist.fromDict(a) for a in starredData.get('artist', [])],
'albums': [Album.fromDict(a) for a in starredData.get('album', [])],
'songs': [Song.fromDict(s) for s in starredData.get('song', [])]
}
self._setCache('getStarred2', None, result)
return result
# ==================== Recommendations ====================
def getTopSongs(self, artist: str, count: int = 50) -> List[Song]:
"""
Get top songs for an artist
Args:
artist: Artist name
count: Number of songs to return
Returns:
List of Song objects
"""
if not artist:
return []
params = {'artist': artist, 'count': count}
cached = self._getCached('getTopSongs', params)
if cached is not None:
return cached
response = self._makeRequest('getTopSongs', params)
topData = response.get('topSongs', {})
result = [Song.fromDict(s) for s in topData.get('song', [])]
self._setCache('getTopSongs', params, result)
return result
def getSimilarSongs(self, songId: str, count: int = 50) -> List[Song]:
"""
Get songs similar to a given song
Args:
songId: ID of the seed song
count: Number of songs to return
"""
if not songId:
return []
params = {'id': songId, 'count': count}
cached = self._getCached('getSimilarSongs2', params)
if cached is not None:
return cached
response = self._makeRequest('getSimilarSongs2', params)
data = response.get('similarSongs2', {})
result = [Song.fromDict(s) for s in data.get('song', [])]
self._setCache('getSimilarSongs2', params, result)
return result
def getNowPlaying(self) -> List[Song]:
"""
Get songs currently being played by all users
Returns:
List of Song objects
"""
response = self._makeRequest('getNowPlaying')
nowPlayingData = response.get('nowPlaying', {})
return [Song.fromDict(s) for s in nowPlayingData.get('entry', [])]
# ==================== Lyrics ====================
def getLyrics(self, artist: str = None, title: str = None) -> Optional[str]:
"""
Get lyrics for a song
Args:
artist: Artist name
title: Song title
Returns:
Lyrics text or None if not found
"""
params = {}
if artist:
params['artist'] = artist
if title:
params['title'] = title
response = self._makeRequest('getLyrics', params)
lyricsData = response.get('lyrics', {})
return lyricsData.get('value')
# ==================== System ====================
def startScan(self) -> None:
"""Start a library scan"""
self._makeRequest('startScan')
def getScanStatus(self) -> Dict[str, Any]:
"""
Get library scan status
Returns:
dict with 'scanning' (bool) and 'count' (int)
"""
response = self._makeRequest('getScanStatus')
scanData = response.get('scanStatus', {})
return {
'scanning': scanData.get('scanning', False),
'count': scanData.get('count', 0)
}