Initial commit

This commit is contained in:
Storm Dragon
2025-12-15 04:09:55 -05:00
commit 555ca0bba9
26 changed files with 4532 additions and 0 deletions

562
src/api/client.py Normal file
View File

@@ -0,0 +1,562 @@
"""
Subsonic API client for Navidrome communication
"""
from typing import Optional, List, Dict, Any
from urllib.parse import urlencode
import requests
from .auth import SubsonicAuth
from .models import Artist, Album, Song, Playlist, Genre
class SubsonicError(Exception):
"""Subsonic API error"""
ERROR_CODES = {
0: "Generic error",
10: "Required parameter missing",
20: "Incompatible client version",
30: "Incompatible server version",
40: "Wrong username or password",
41: "Token authentication not supported",
50: "User not authorized",
60: "Trial expired",
70: "Data not found"
}
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"[{code}] {message}")
class SubsonicClient:
"""Subsonic API client for Navidrome communication"""
API_VERSION = '1.16.1'
CLIENT_NAME = 'NaviPy'
def __init__(self, serverUrl: str, username: str, password: str):
"""
Initialize the Subsonic client
Args:
serverUrl: Base URL of the Navidrome server (e.g., https://music.example.com)
username: Navidrome username
password: Navidrome password
"""
self.serverUrl = serverUrl.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
self.session.headers.update({
'User-Agent': f'{self.CLIENT_NAME}/1.0'
})
def _makeRequest(self, endpoint: str, params: dict = None) -> dict:
"""
Make authenticated API request
Args:
endpoint: API endpoint (e.g., 'ping', 'getArtists')
params: Additional parameters for the request
Returns:
dict: Parsed API response
Raises:
SubsonicError: If the API returns an error
requests.RequestException: If the network request fails
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
baseParams = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'f': 'json'
}
if params:
baseParams.update(params)
url = f"{self.serverUrl}/rest/{endpoint}"
response = self.session.get(url, params=baseParams, timeout=30)
response.raise_for_status()
data = response.json()
# Subsonic wraps responses in 'subsonic-response'
subsonicResponse = data.get('subsonic-response', {})
if subsonicResponse.get('status') == 'failed':
error = subsonicResponse.get('error', {})
raise SubsonicError(
code=error.get('code', 0),
message=error.get('message', 'Unknown error')
)
return subsonicResponse
def ping(self) -> bool:
"""
Test connection to the server
Returns:
bool: True if connection successful
"""
response = self._makeRequest('ping')
return response.get('status') == 'ok'
def getStreamUrl(self, songId: str, format: str = None, maxBitRate: int = None) -> str:
"""
Generate streaming URL for a song
Args:
songId: ID of the song to stream
format: Optional transcoding format (mp3, opus, etc.)
maxBitRate: Optional max bitrate for transcoding
Returns:
str: Full streaming URL with authentication
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
params = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'id': songId
}
if format:
params['format'] = format
if maxBitRate:
params['maxBitRate'] = maxBitRate
return f"{self.serverUrl}/rest/stream?{urlencode(params)}"
def getCoverArtUrl(self, coverId: str, size: int = None) -> str:
"""
Generate cover art URL
Args:
coverId: Cover art ID
size: Optional size in pixels
Returns:
str: Full cover art URL with authentication
"""
salt = SubsonicAuth.generateSalt()
token = SubsonicAuth.generateToken(self.password, salt)
params = {
'u': self.username,
't': token,
's': salt,
'v': self.API_VERSION,
'c': self.CLIENT_NAME,
'id': coverId
}
if size:
params['size'] = size
return f"{self.serverUrl}/rest/getCoverArt?{urlencode(params)}"
# ==================== Browsing ====================
def getArtists(self, musicFolderId: str = None) -> List[Artist]:
"""
Get all artists
Args:
musicFolderId: Optional music folder to filter by
Returns:
List of Artist objects
"""
params = {}
if musicFolderId:
params['musicFolderId'] = musicFolderId
response = self._makeRequest('getArtists', params)
artists = []
artistsData = response.get('artists', {})
for index in artistsData.get('index', []):
for artistData in index.get('artist', []):
artists.append(Artist.fromDict(artistData))
return artists
def getArtist(self, artistId: str) -> Dict[str, Any]:
"""
Get artist details including albums
Args:
artistId: ID of the artist
Returns:
dict with 'artist' (Artist) and 'albums' (List[Album])
"""
response = self._makeRequest('getArtist', {'id': artistId})
artistData = response.get('artist', {})
artist = Artist.fromDict(artistData)
albums = [Album.fromDict(a) for a in artistData.get('album', [])]
return {'artist': artist, 'albums': albums}
def getAlbum(self, albumId: str) -> Dict[str, Any]:
"""
Get album details including songs
Args:
albumId: ID of the album
Returns:
dict with 'album' (Album) and 'songs' (List[Song])
"""
response = self._makeRequest('getAlbum', {'id': albumId})
albumData = response.get('album', {})
album = Album.fromDict(albumData)
songs = [Song.fromDict(s) for s in albumData.get('song', [])]
return {'album': album, 'songs': songs}
def getSong(self, songId: str) -> Song:
"""
Get song details
Args:
songId: ID of the song
Returns:
Song object
"""
response = self._makeRequest('getSong', {'id': songId})
return Song.fromDict(response.get('song', {}))
def getGenres(self) -> List[Genre]:
"""
Get all genres
Returns:
List of Genre objects
"""
response = self._makeRequest('getGenres')
genresData = response.get('genres', {})
return [Genre.fromDict(g) for g in genresData.get('genre', [])]
def getSongsByGenre(self, genre: str, count: int = 100, offset: int = 0) -> List[Song]:
"""
Get songs by genre
Args:
genre: Genre name
count: Maximum number of songs to return
offset: Pagination offset
Returns:
List of Song objects
"""
params = {
'genre': genre,
'count': count,
'offset': offset
}
response = self._makeRequest('getSongsByGenre', params)
songsData = response.get('songsByGenre', {})
return [Song.fromDict(s) for s in songsData.get('song', [])]
def getRandomSongs(self, size: int = 10, genre: str = None,
fromYear: int = None, toYear: int = None) -> List[Song]:
"""
Get random songs
Args:
size: Number of songs to return
genre: Optional genre filter
fromYear: Optional start year filter
toYear: Optional end year filter
Returns:
List of Song objects
"""
params = {'size': size}
if genre:
params['genre'] = genre
if fromYear:
params['fromYear'] = fromYear
if toYear:
params['toYear'] = toYear
response = self._makeRequest('getRandomSongs', params)
songsData = response.get('randomSongs', {})
return [Song.fromDict(s) for s in songsData.get('song', [])]
# ==================== Search ====================
def search(self, query: str, artistCount: int = 20,
albumCount: int = 20, songCount: int = 20) -> Dict[str, List]:
"""
Search for artists, albums, and songs
Args:
query: Search query
artistCount: Max artists to return
albumCount: Max albums to return
songCount: Max songs to return
Returns:
dict with 'artists', 'albums', 'songs' lists
"""
params = {
'query': query,
'artistCount': artistCount,
'albumCount': albumCount,
'songCount': songCount
}
response = self._makeRequest('search3', params)
searchResult = response.get('searchResult3', {})
return {
'artists': [Artist.fromDict(a) for a in searchResult.get('artist', [])],
'albums': [Album.fromDict(a) for a in searchResult.get('album', [])],
'songs': [Song.fromDict(s) for s in searchResult.get('song', [])]
}
# ==================== Playlists ====================
def getPlaylists(self) -> List[Playlist]:
"""
Get all playlists
Returns:
List of Playlist objects
"""
response = self._makeRequest('getPlaylists')
playlistsData = response.get('playlists', {})
return [Playlist.fromDict(p) for p in playlistsData.get('playlist', [])]
def getPlaylist(self, playlistId: str) -> Playlist:
"""
Get playlist details including songs
Args:
playlistId: ID of the playlist
Returns:
Playlist object with songs populated
"""
response = self._makeRequest('getPlaylist', {'id': playlistId})
return Playlist.fromDict(response.get('playlist', {}))
def createPlaylist(self, name: str, songIds: List[str] = None) -> Playlist:
"""
Create a new playlist
Args:
name: Playlist name
songIds: Optional list of song IDs to add
Returns:
Created Playlist object
"""
params = {'name': name}
if songIds:
params['songId'] = songIds
response = self._makeRequest('createPlaylist', params)
return Playlist.fromDict(response.get('playlist', {}))
def updatePlaylist(self, playlistId: str, name: str = None,
songIdsToAdd: List[str] = None,
songIndexesToRemove: List[int] = None) -> None:
"""
Update a playlist
Args:
playlistId: ID of the playlist
name: New name (optional)
songIdsToAdd: Song IDs to add (optional)
songIndexesToRemove: Song indexes to remove (optional)
"""
params = {'playlistId': playlistId}
if name:
params['name'] = name
if songIdsToAdd:
params['songIdToAdd'] = songIdsToAdd
if songIndexesToRemove:
params['songIndexToRemove'] = songIndexesToRemove
self._makeRequest('updatePlaylist', params)
def deletePlaylist(self, playlistId: str) -> None:
"""
Delete a playlist
Args:
playlistId: ID of the playlist to delete
"""
self._makeRequest('deletePlaylist', {'id': playlistId})
# ==================== User Actions ====================
def star(self, itemId: str, itemType: str = 'song') -> None:
"""
Star (favorite) an item
Args:
itemId: ID of the item
itemType: Type of item ('song', 'album', 'artist')
"""
paramName = {
'song': 'id',
'album': 'albumId',
'artist': 'artistId'
}.get(itemType, 'id')
self._makeRequest('star', {paramName: itemId})
def unstar(self, itemId: str, itemType: str = 'song') -> None:
"""
Unstar (unfavorite) an item
Args:
itemId: ID of the item
itemType: Type of item ('song', 'album', 'artist')
"""
paramName = {
'song': 'id',
'album': 'albumId',
'artist': 'artistId'
}.get(itemType, 'id')
self._makeRequest('unstar', {paramName: itemId})
def setRating(self, itemId: str, rating: int) -> None:
"""
Set rating for an item (1-5 stars, 0 to remove)
Args:
itemId: ID of the item
rating: Rating value (0-5)
"""
self._makeRequest('setRating', {'id': itemId, 'rating': rating})
def scrobble(self, songId: str, submission: bool = True) -> None:
"""
Scrobble a song (register as played)
Args:
songId: ID of the song
submission: True for full scrobble, False for "now playing"
"""
self._makeRequest('scrobble', {'id': songId, 'submission': submission})
# ==================== Play Queue ====================
def getPlayQueue(self) -> Dict[str, Any]:
"""
Get the saved play queue
Returns:
dict with 'songs' (List[Song]), 'current' (str), 'position' (int)
"""
response = self._makeRequest('getPlayQueue')
queueData = response.get('playQueue', {})
return {
'songs': [Song.fromDict(s) for s in queueData.get('entry', [])],
'current': queueData.get('current'),
'position': queueData.get('position', 0)
}
def savePlayQueue(self, songIds: List[str], current: str = None,
position: int = None) -> None:
"""
Save the current play queue
Args:
songIds: List of song IDs in the queue
current: ID of currently playing song
position: Position in current song (milliseconds)
"""
params = {'id': songIds}
if current:
params['current'] = current
if position is not None:
params['position'] = position
self._makeRequest('savePlayQueue', params)
# ==================== Starred/Favorites ====================
def getStarred(self) -> Dict[str, List]:
"""
Get all starred items
Returns:
dict with 'artists', 'albums', 'songs' lists
"""
response = self._makeRequest('getStarred2')
starredData = response.get('starred2', {})
return {
'artists': [Artist.fromDict(a) for a in starredData.get('artist', [])],
'albums': [Album.fromDict(a) for a in starredData.get('album', [])],
'songs': [Song.fromDict(s) for s in starredData.get('song', [])]
}
# ==================== Lyrics ====================
def getLyrics(self, artist: str = None, title: str = None) -> Optional[str]:
"""
Get lyrics for a song
Args:
artist: Artist name
title: Song title
Returns:
Lyrics text or None if not found
"""
params = {}
if artist:
params['artist'] = artist
if title:
params['title'] = title
response = self._makeRequest('getLyrics', params)
lyricsData = response.get('lyrics', {})
return lyricsData.get('value')
# ==================== System ====================
def startScan(self) -> None:
"""Start a library scan"""
self._makeRequest('startScan')
def getScanStatus(self) -> Dict[str, Any]:
"""
Get library scan status
Returns:
dict with 'scanning' (bool) and 'count' (int)
"""
response = self._makeRequest('getScanStatus')
scanData = response.get('scanStatus', {})
return {
'scanning': scanData.get('scanning', False),
'count': scanData.get('count', 0)
}