""" Subsonic API client for Navidrome communication """ import hashlib import json from typing import Optional, List, Dict, Any from urllib.parse import urlencode import requests from .auth import SubsonicAuth from .models import Artist, Album, Song, Playlist, Genre class SubsonicError(Exception): """Subsonic API error""" ERROR_CODES = { 0: "Generic error", 10: "Required parameter missing", 20: "Incompatible client version", 30: "Incompatible server version", 40: "Wrong username or password", 41: "Token authentication not supported", 50: "User not authorized", 60: "Trial expired", 70: "Data not found" } def __init__(self, code: int, message: str): self.code = code self.message = message super().__init__(f"[{code}] {message}") class SubsonicClient: """Subsonic API client for Navidrome communication""" API_VERSION = '1.16.1' CLIENT_NAME = 'NaviPy' def __init__(self, serverUrl: str, username: str, password: str): """ Initialize the Subsonic client Args: serverUrl: Base URL of the Navidrome server (e.g., https://music.example.com) username: Navidrome username password: Navidrome password """ self.serverUrl = serverUrl.rstrip('/') self.username = username self.password = password self.session = requests.Session() self.session.headers.update({ 'User-Agent': f'{self.CLIENT_NAME}/1.0' }) self._cache: Dict[str, Any] = {} def _cacheKey(self, endpoint: str, params: Optional[dict] = None) -> str: """Generate a cache key from endpoint and parameters.""" keyData = {"endpoint": endpoint, "params": params or {}} keyString = json.dumps(keyData, sort_keys=True) return hashlib.md5(keyString.encode()).hexdigest() def _getCached(self, endpoint: str, params: Optional[dict] = None) -> Optional[Any]: """Retrieve a cached response if available.""" key = self._cacheKey(endpoint, params) return self._cache.get(key) def _setCache(self, endpoint: str, params: Optional[dict], value: Any) -> None: """Store a response in the cache.""" key = self._cacheKey(endpoint, params) self._cache[key] = value def clearCache(self) -> None: """Clear all cached responses. Call this on library refresh.""" self._cache.clear() def _makeRequest(self, endpoint: str, params: dict = None) -> dict: """ Make authenticated API request Args: endpoint: API endpoint (e.g., 'ping', 'getArtists') params: Additional parameters for the request Returns: dict: Parsed API response Raises: SubsonicError: If the API returns an error requests.RequestException: If the network request fails """ salt = SubsonicAuth.generateSalt() token = SubsonicAuth.generateToken(self.password, salt) baseParams = { 'u': self.username, 't': token, 's': salt, 'v': self.API_VERSION, 'c': self.CLIENT_NAME, 'f': 'json' } if params: baseParams.update(params) url = f"{self.serverUrl}/rest/{endpoint}" response = self.session.get(url, params=baseParams, timeout=30) response.raise_for_status() data = response.json() # Subsonic wraps responses in 'subsonic-response' subsonicResponse = data.get('subsonic-response', {}) if subsonicResponse.get('status') == 'failed': error = subsonicResponse.get('error', {}) raise SubsonicError( code=error.get('code', 0), message=error.get('message', 'Unknown error') ) return subsonicResponse def ping(self) -> bool: """ Test connection to the server Returns: bool: True if connection successful """ response = self._makeRequest('ping') return response.get('status') == 'ok' def getStreamUrl(self, songId: str, format: str = None, maxBitRate: int = None) -> str: """ Generate streaming URL for a song Args: songId: ID of the song to stream format: Optional transcoding format (mp3, opus, etc.) maxBitRate: Optional max bitrate for transcoding Returns: str: Full streaming URL with authentication """ salt = SubsonicAuth.generateSalt() token = SubsonicAuth.generateToken(self.password, salt) params = { 'u': self.username, 't': token, 's': salt, 'v': self.API_VERSION, 'c': self.CLIENT_NAME, 'id': songId } if format: params['format'] = format if maxBitRate: params['maxBitRate'] = maxBitRate return f"{self.serverUrl}/rest/stream?{urlencode(params)}" def getCoverArtUrl(self, coverId: str, size: int = None) -> str: """ Generate cover art URL Args: coverId: Cover art ID size: Optional size in pixels Returns: str: Full cover art URL with authentication """ salt = SubsonicAuth.generateSalt() token = SubsonicAuth.generateToken(self.password, salt) params = { 'u': self.username, 't': token, 's': salt, 'v': self.API_VERSION, 'c': self.CLIENT_NAME, 'id': coverId } if size: params['size'] = size return f"{self.serverUrl}/rest/getCoverArt?{urlencode(params)}" # ==================== Browsing ==================== def getArtists(self, musicFolderId: str = None) -> List[Artist]: """ Get all artists Args: musicFolderId: Optional music folder to filter by Returns: List of Artist objects """ params = {} if musicFolderId: params['musicFolderId'] = musicFolderId cached = self._getCached('getArtists', params) if cached is not None: return cached response = self._makeRequest('getArtists', params) artists = [] artistsData = response.get('artists', {}) for index in artistsData.get('index', []): for artistData in index.get('artist', []): artists.append(Artist.fromDict(artistData)) self._setCache('getArtists', params, artists) return artists def getArtist(self, artistId: str) -> Dict[str, Any]: """ Get artist details including albums Args: artistId: ID of the artist Returns: dict with 'artist' (Artist) and 'albums' (List[Album]) """ params = {'id': artistId} cached = self._getCached('getArtist', params) if cached is not None: return cached response = self._makeRequest('getArtist', params) artistData = response.get('artist', {}) artist = Artist.fromDict(artistData) albums = [Album.fromDict(a) for a in artistData.get('album', [])] result = {'artist': artist, 'albums': albums} self._setCache('getArtist', params, result) return result def getAlbum(self, albumId: str) -> Dict[str, Any]: """ Get album details including songs Args: albumId: ID of the album Returns: dict with 'album' (Album) and 'songs' (List[Song]) """ params = {'id': albumId} cached = self._getCached('getAlbum', params) if cached is not None: return cached response = self._makeRequest('getAlbum', params) albumData = response.get('album', {}) album = Album.fromDict(albumData) songs = [Song.fromDict(s) for s in albumData.get('song', [])] result = {'album': album, 'songs': songs} self._setCache('getAlbum', params, result) return result def getSong(self, songId: str) -> Song: """ Get song details Args: songId: ID of the song Returns: Song object """ params = {'id': songId} cached = self._getCached('getSong', params) if cached is not None: return cached response = self._makeRequest('getSong', params) result = Song.fromDict(response.get('song', {})) self._setCache('getSong', params, result) return result def getGenres(self) -> List[Genre]: """ Get all genres Returns: List of Genre objects """ cached = self._getCached('getGenres', None) if cached is not None: return cached response = self._makeRequest('getGenres') genresData = response.get('genres', {}) result = [Genre.fromDict(g) for g in genresData.get('genre', [])] self._setCache('getGenres', None, result) return result def getAlbumList(self, listType: str, size: int = 20, offset: int = 0, musicFolderId: str = None, genre: str = None, fromYear: int = None, toYear: int = None) -> List[Album]: """ Get a list of albums by criteria (newest, frequent, recent, random, highest, alphabetical). Args: listType: Album list type (e.g., 'newest', 'frequent', 'recent', 'random', 'highest') size: Number of albums to return offset: Offset for pagination musicFolderId: Optional folder filter genre: Optional genre filter fromYear: Optional starting year toYear: Optional ending year Returns: List of Album objects """ params = { 'type': listType, 'size': size, 'offset': offset } if musicFolderId: params['musicFolderId'] = musicFolderId if genre: params['genre'] = genre if fromYear: params['fromYear'] = fromYear if toYear: params['toYear'] = toYear cached = self._getCached('getAlbumList2', params) if cached is not None: return cached response = self._makeRequest('getAlbumList2', params) albumsData = response.get('albumList2', {}) result = [Album.fromDict(a) for a in albumsData.get('album', [])] self._setCache('getAlbumList2', params, result) return result def getSongsByGenre(self, genre: str, count: int = 100, offset: int = 0) -> List[Song]: """ Get songs by genre Args: genre: Genre name count: Maximum number of songs to return offset: Pagination offset Returns: List of Song objects """ params = { 'genre': genre, 'count': count, 'offset': offset } cached = self._getCached('getSongsByGenre', params) if cached is not None: return cached response = self._makeRequest('getSongsByGenre', params) songsData = response.get('songsByGenre', {}) result = [Song.fromDict(s) for s in songsData.get('song', [])] self._setCache('getSongsByGenre', params, result) return result def getRandomSongs(self, size: int = 10, genre: str = None, fromYear: int = None, toYear: int = None) -> List[Song]: """ Get random songs Args: size: Number of songs to return genre: Optional genre filter fromYear: Optional start year filter toYear: Optional end year filter Returns: List of Song objects """ params = {'size': size} if genre: params['genre'] = genre if fromYear: params['fromYear'] = fromYear if toYear: params['toYear'] = toYear response = self._makeRequest('getRandomSongs', params) songsData = response.get('randomSongs', {}) return [Song.fromDict(s) for s in songsData.get('song', [])] # ==================== Search ==================== def search(self, query: str, artistCount: int = 20, albumCount: int = 20, songCount: int = 20) -> Dict[str, List]: """ Search for artists, albums, and songs Args: query: Search query artistCount: Max artists to return albumCount: Max albums to return songCount: Max songs to return Returns: dict with 'artists', 'albums', 'songs' lists """ params = { 'query': query, 'artistCount': artistCount, 'albumCount': albumCount, 'songCount': songCount } cached = self._getCached('search3', params) if cached is not None: return cached response = self._makeRequest('search3', params) searchResult = response.get('searchResult3', {}) result = { 'artists': [Artist.fromDict(a) for a in searchResult.get('artist', [])], 'albums': [Album.fromDict(a) for a in searchResult.get('album', [])], 'songs': [Song.fromDict(s) for s in searchResult.get('song', [])] } self._setCache('search3', params, result) return result # ==================== Playlists ==================== def getPlaylists(self) -> List[Playlist]: """ Get all playlists Returns: List of Playlist objects """ cached = self._getCached('getPlaylists', None) if cached is not None: return cached response = self._makeRequest('getPlaylists') playlistsData = response.get('playlists', {}) result = [Playlist.fromDict(p) for p in playlistsData.get('playlist', [])] self._setCache('getPlaylists', None, result) return result def getPlaylist(self, playlistId: str) -> Playlist: """ Get playlist details including songs Args: playlistId: ID of the playlist Returns: Playlist object with songs populated """ params = {'id': playlistId} cached = self._getCached('getPlaylist', params) if cached is not None: return cached response = self._makeRequest('getPlaylist', params) result = Playlist.fromDict(response.get('playlist', {})) self._setCache('getPlaylist', params, result) return result def createPlaylist(self, name: str, songIds: List[str] = None) -> Playlist: """ Create a new playlist Args: name: Playlist name songIds: Optional list of song IDs to add Returns: Created Playlist object """ params = {'name': name} if songIds: params['songId'] = songIds response = self._makeRequest('createPlaylist', params) self._invalidatePlaylistCache() return Playlist.fromDict(response.get('playlist', {})) def updatePlaylist(self, playlistId: str, name: str = None, songIdsToAdd: List[str] = None, songIndexesToRemove: List[int] = None) -> None: """ Update a playlist Args: playlistId: ID of the playlist name: New name (optional) songIdsToAdd: Song IDs to add (optional) songIndexesToRemove: Song indexes to remove (optional) """ params = {'playlistId': playlistId} if name: params['name'] = name if songIdsToAdd: params['songIdToAdd'] = songIdsToAdd if songIndexesToRemove: params['songIndexToRemove'] = songIndexesToRemove self._makeRequest('updatePlaylist', params) self._invalidatePlaylistCache(playlistId) def deletePlaylist(self, playlistId: str) -> None: """ Delete a playlist Args: playlistId: ID of the playlist to delete """ self._makeRequest('deletePlaylist', {'id': playlistId}) self._invalidatePlaylistCache(playlistId) def _invalidatePlaylistCache(self, playlistId: str = None) -> None: """Invalidate playlist-related cache entries.""" listKey = self._cacheKey('getPlaylists', None) self._cache.pop(listKey, None) if playlistId: detailKey = self._cacheKey('getPlaylist', {'id': playlistId}) self._cache.pop(detailKey, None) # ==================== User Actions ==================== def star(self, itemId: str, itemType: str = 'song') -> None: """ Star (favorite) an item Args: itemId: ID of the item itemType: Type of item ('song', 'album', 'artist') """ paramName = { 'song': 'id', 'album': 'albumId', 'artist': 'artistId' }.get(itemType, 'id') self._makeRequest('star', {paramName: itemId}) self._invalidateStarredCache() def unstar(self, itemId: str, itemType: str = 'song') -> None: """ Unstar (unfavorite) an item Args: itemId: ID of the item itemType: Type of item ('song', 'album', 'artist') """ paramName = { 'song': 'id', 'album': 'albumId', 'artist': 'artistId' }.get(itemType, 'id') self._makeRequest('unstar', {paramName: itemId}) self._invalidateStarredCache() def _invalidateStarredCache(self) -> None: """Invalidate the starred/favorites cache.""" key = self._cacheKey('getStarred2', None) self._cache.pop(key, None) def setRating(self, itemId: str, rating: int) -> None: """ Set rating for an item (1-5 stars, 0 to remove) Args: itemId: ID of the item rating: Rating value (0-5) """ self._makeRequest('setRating', {'id': itemId, 'rating': rating}) def scrobble(self, songId: str, submission: bool = True) -> None: """ Scrobble a song (register as played) Args: songId: ID of the song submission: True for full scrobble, False for "now playing" """ self._makeRequest('scrobble', {'id': songId, 'submission': submission}) # ==================== Play Queue ==================== def getPlayQueue(self) -> Dict[str, Any]: """ Get the saved play queue Returns: dict with 'songs' (List[Song]), 'current' (str), 'position' (int) """ response = self._makeRequest('getPlayQueue') queueData = response.get('playQueue', {}) return { 'songs': [Song.fromDict(s) for s in queueData.get('entry', [])], 'current': queueData.get('current'), 'position': queueData.get('position', 0) } def savePlayQueue(self, songIds: List[str], current: str = None, position: int = None) -> None: """ Save the current play queue Args: songIds: List of song IDs in the queue current: ID of currently playing song position: Position in current song (milliseconds) """ params = {'id': songIds} if current: params['current'] = current if position is not None: params['position'] = position self._makeRequest('savePlayQueue', params) # ==================== Starred/Favorites ==================== def getStarred(self) -> Dict[str, List]: """ Get all starred items Returns: dict with 'artists', 'albums', 'songs' lists """ cached = self._getCached('getStarred2', None) if cached is not None: return cached response = self._makeRequest('getStarred2') starredData = response.get('starred2', {}) result = { 'artists': [Artist.fromDict(a) for a in starredData.get('artist', [])], 'albums': [Album.fromDict(a) for a in starredData.get('album', [])], 'songs': [Song.fromDict(s) for s in starredData.get('song', [])] } self._setCache('getStarred2', None, result) return result # ==================== Recommendations ==================== def getTopSongs(self, artist: str, count: int = 50) -> List[Song]: """ Get top songs for an artist Args: artist: Artist name count: Number of songs to return Returns: List of Song objects """ if not artist: return [] params = {'artist': artist, 'count': count} cached = self._getCached('getTopSongs', params) if cached is not None: return cached response = self._makeRequest('getTopSongs', params) topData = response.get('topSongs', {}) result = [Song.fromDict(s) for s in topData.get('song', [])] self._setCache('getTopSongs', params, result) return result def getSimilarSongs(self, songId: str, count: int = 50) -> List[Song]: """ Get songs similar to a given song Args: songId: ID of the seed song count: Number of songs to return """ if not songId: return [] params = {'id': songId, 'count': count} cached = self._getCached('getSimilarSongs2', params) if cached is not None: return cached response = self._makeRequest('getSimilarSongs2', params) data = response.get('similarSongs2', {}) result = [Song.fromDict(s) for s in data.get('song', [])] self._setCache('getSimilarSongs2', params, result) return result def getNowPlaying(self) -> List[Song]: """ Get songs currently being played by all users Returns: List of Song objects """ response = self._makeRequest('getNowPlaying') nowPlayingData = response.get('nowPlaying', {}) return [Song.fromDict(s) for s in nowPlayingData.get('entry', [])] # ==================== Lyrics ==================== def getLyrics(self, artist: str = None, title: str = None) -> Optional[str]: """ Get lyrics for a song Args: artist: Artist name title: Song title Returns: Lyrics text or None if not found """ params = {} if artist: params['artist'] = artist if title: params['title'] = title response = self._makeRequest('getLyrics', params) lyricsData = response.get('lyrics', {}) return lyricsData.get('value') # ==================== System ==================== def startScan(self) -> None: """Start a library scan""" self._makeRequest('startScan') def getScanStatus(self) -> Dict[str, Any]: """ Get library scan status Returns: dict with 'scanning' (bool) and 'count' (int) """ response = self._makeRequest('getScanStatus') scanData = response.get('scanStatus', {}) return { 'scanning': scanData.get('scanning', False), 'count': scanData.get('count', 0) }