Files
navipy/src/main_window.py

2276 lines
100 KiB
Python

"""Main application window for NaviPy."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from PySide6.QtWidgets import (
QHBoxLayout,
QLabel,
QMenu,
QListWidget,
QListWidgetItem,
QMainWindow,
QMessageBox,
QPushButton,
QSlider,
QSplitter,
QStatusBar,
QTreeWidgetItem,
QVBoxLayout,
QWidget,
)
from PySide6.QtCore import QPoint, Qt, QTimer
from PySide6.QtGui import (
QAction,
QAccessible,
QAccessibleEvent,
QAccessibleAnnouncementEvent,
QGuiApplication,
QKeySequence,
QShortcut,
)
from src.accessibility.accessible_tree import AccessibleTreeWidget
from src.api.client import SubsonicClient, SubsonicError
from src.api.models import Album, Artist, Playlist, Song, Genre
from src.cache import CachedClient
from src.config.settings import Settings, getDataDir
from src.managers.playback_manager import PlaybackManager
from src.integrations.mpris import MprisService
from src.widgets.accessible_text_dialog import AccessibleTextDialog
from src.widgets.add_to_playlist_dialog import AddToPlaylistDialog
from src.widgets.search_dialog import SearchDialog
from src.widgets.server_dialog import ServerDialog
class MainWindow(QMainWindow):
"""Main NaviPy application window"""
def __init__(self):
super().__init__()
self.setWindowTitle("NaviPy - Navidrome Client")
self.setMinimumSize(980, 640)
self.settings = Settings()
self.client: Optional[CachedClient] = None
self._rawClient: Optional[SubsonicClient] = None
self.playback = PlaybackManager(self)
self.mpris: Optional[MprisService] = None
self._executor = ThreadPoolExecutor(max_workers=4)
self._shutting_down = False
self.logger = logging.getLogger("navipy.main_window")
self.maxBulkSongs: Optional[int] = None # Unlimited; use throttling instead
self.bulkRequestBatch = 1 # Throttle every N requests to be nice to servers
self.bulkThrottleSeconds = 0.05
self.bulkChunkSize = 500
self.bulkThrottleStartAfter = 50 # Don't throttle first N requests for quick playback start
self._startupSyncInterval = timedelta(minutes=60)
self.logger.info("MainWindow initialized")
self.pageStep = self.settings.get("interface", "pageStep", 5)
self.volume = self.settings.get("playback", "volume", 100)
self.shuffleEnabled = self.settings.get("playback", "shuffle", False)
self.repeatMode = self.settings.get("playback", "repeat", "none")
self.announceTrackChangesEnabled = self.settings.get("interface", "announceTrackChanges", True)
self._announceOnTrackChange = False
# Timer for repeating status announcements during long operations
self._statusAnnounceTimer = QTimer(self)
self._statusAnnounceTimer.timeout.connect(self._announceCurrentStatus)
self._currentStatusMessage = ""
self.setupMenus()
self.setupUi()
self.connectSignals()
self.applyPersistedPlaybackSettings()
self.enablePlaybackActions(False)
self.playback.setVolume(self.volume)
self.setupMpris()
# Connect to server if one is configured
if self.settings.hasServers():
self.logger.info("Servers present, attempting default connection")
self.connectToDefaultServer()
else:
self.logger.info("No servers configured; showing setup dialog")
self.showServerSetup()
# ============ UI setup ============
def setupMenus(self):
"""Setup the menu bar"""
menuBar = self.menuBar()
menuBar.setAccessibleName("Menu Bar")
# File menu
fileMenu = menuBar.addMenu("&File")
self.connectAction = QAction("&Connect to Server...", self)
self.connectAction.setShortcut(QKeySequence("Ctrl+O"))
self.connectAction.triggered.connect(self.showServerSetup)
fileMenu.addAction(self.connectAction)
fileMenu.addSeparator()
self.refreshAction = QAction("&Refresh Library", self)
self.refreshAction.setShortcut(QKeySequence("F5"))
self.refreshAction.triggered.connect(self.refreshLibrary)
self.refreshAction.setEnabled(False)
fileMenu.addAction(self.refreshAction)
self.fullSyncAction = QAction("Full Library &Sync", self)
self.fullSyncAction.setShortcut(QKeySequence("Shift+F5"))
self.fullSyncAction.triggered.connect(self.refreshLibraryFull)
self.fullSyncAction.setEnabled(False)
fileMenu.addAction(self.fullSyncAction)
fileMenu.addSeparator()
self.quitAction = QAction("&Quit", self)
self.quitAction.setShortcut(QKeySequence("Ctrl+Q"))
self.quitAction.triggered.connect(self.close)
fileMenu.addAction(self.quitAction)
# Playback menu
playbackMenu = menuBar.addMenu("&Playback")
self.playPauseAction = QAction("&Play/Pause", self)
self.playPauseAction.setShortcut(QKeySequence("Space"))
self.playPauseAction.setEnabled(False)
playbackMenu.addAction(self.playPauseAction)
self.stopAction = QAction("&Stop", self)
self.stopAction.setEnabled(False)
playbackMenu.addAction(self.stopAction)
playbackMenu.addSeparator()
self.previousAction = QAction("Pre&vious Track", self)
self.previousAction.setShortcut(QKeySequence("Ctrl+Left"))
self.previousAction.setEnabled(False)
playbackMenu.addAction(self.previousAction)
self.nextAction = QAction("&Next Track", self)
self.nextAction.setShortcut(QKeySequence("Ctrl+Right"))
self.nextAction.setEnabled(False)
playbackMenu.addAction(self.nextAction)
playbackMenu.addSeparator()
self.shuffleAction = QAction("&Shuffle", self)
self.shuffleAction.setShortcut(QKeySequence("Alt+S"))
self.shuffleAction.setCheckable(True)
playbackMenu.addAction(self.shuffleAction)
self.repeatAction = QAction("&Repeat Off", self)
self.repeatAction.setShortcut(QKeySequence("Alt+R"))
self.repeatAction.setCheckable(True)
playbackMenu.addAction(self.repeatAction)
playbackMenu.addSeparator()
self.favoriteAction = QAction("&Favorite", self)
self.favoriteAction.setShortcut(QKeySequence("+"))
self.favoriteAction.setEnabled(False)
playbackMenu.addAction(self.favoriteAction)
self.unfavoriteAction = QAction("Un&favorite", self)
self.unfavoriteAction.setShortcut(QKeySequence("_"))
self.unfavoriteAction.setEnabled(False)
playbackMenu.addAction(self.unfavoriteAction)
playbackMenu.addSeparator()
self.volumeUpAction = QAction("Volume &Up", self)
self.volumeUpAction.setShortcut(QKeySequence("Ctrl+Up"))
self.volumeUpAction.setEnabled(False)
playbackMenu.addAction(self.volumeUpAction)
self.volumeDownAction = QAction("Volume &Down", self)
self.volumeDownAction.setShortcut(QKeySequence("Ctrl+Down"))
self.volumeDownAction.setEnabled(False)
playbackMenu.addAction(self.volumeDownAction)
# View menu
viewMenu = menuBar.addMenu("&View")
self.searchAction = QAction("&Search...\tCtrl+F", self)
self.searchAction.setEnabled(False)
viewMenu.addAction(self.searchAction)
viewMenu.addSeparator()
self.announceTrackAction = QAction("Announce Current &Track", self)
self.announceTrackAction.setShortcut(QKeySequence("Ctrl+T"))
self.announceTrackAction.setEnabled(False)
viewMenu.addAction(self.announceTrackAction)
self.announcePositionAction = QAction("Announce &Position", self)
self.announcePositionAction.setShortcut(QKeySequence("Ctrl+P"))
self.announcePositionAction.setEnabled(False)
viewMenu.addAction(self.announcePositionAction)
self.announceTrackChangesAction = QAction("Announce Track &Changes", self)
self.announceTrackChangesAction.setCheckable(True)
self.announceTrackChangesAction.setChecked(self.announceTrackChangesEnabled)
viewMenu.addAction(self.announceTrackChangesAction)
# Help menu
helpMenu = menuBar.addMenu("&Help")
self.aboutAction = QAction("&About NaviPy", self)
self.aboutAction.triggered.connect(self.showAbout)
helpMenu.addAction(self.aboutAction)
def setupUi(self):
"""Setup the main UI"""
centralWidget = QWidget()
self.setCentralWidget(centralWidget)
mainLayout = QHBoxLayout(centralWidget)
# Create splitter for resizable panels
self.splitter = QSplitter(Qt.Horizontal)
self.splitter.setAccessibleName("Main Content")
# Left panel - Library browser
leftPanel = QWidget()
leftLayout = QVBoxLayout(leftPanel)
libraryLabel = QLabel("Library Browser")
libraryLabel.setAccessibleName("Library Panel")
leftLayout.addWidget(libraryLabel)
self.libraryStatus = QLabel("Connect to a server to browse your library")
self.libraryStatus.setAccessibleName("Library Status")
leftLayout.addWidget(self.libraryStatus)
self.libraryTree = AccessibleTreeWidget()
self.libraryTree.setHeaderHidden(True)
self.libraryTree.setAccessibleName("Library Tree")
self.libraryTree.setRootIsDecorated(True)
self.libraryTree.setItemsExpandable(True)
self.libraryTree.setContextMenuPolicy(Qt.CustomContextMenu)
self.libraryTree.pageStep = self.pageStep
self.libraryTree.setEnabled(False)
leftLayout.addWidget(self.libraryTree)
self.splitter.addWidget(leftPanel)
# Right panel - Now playing and queue
rightPanel = QWidget()
rightLayout = QVBoxLayout(rightPanel)
nowPlayingLabel = QLabel("Now Playing")
nowPlayingLabel.setAccessibleName("Now Playing Section")
rightLayout.addWidget(nowPlayingLabel)
self.nowPlayingInfo = QLabel("No track playing")
self.nowPlayingInfo.setAccessibleName("Current Track")
rightLayout.addWidget(self.nowPlayingInfo)
self.positionLabel = QLabel("00:00 / 00:00")
self.positionLabel.setAccessibleName("Playback Position")
rightLayout.addWidget(self.positionLabel)
self.positionSlider = QSlider(Qt.Horizontal)
self.positionSlider.setAccessibleName("Seek Slider")
self.positionSlider.setRange(0, 1000)
self.positionSlider.setEnabled(False)
rightLayout.addWidget(self.positionSlider)
controlsLayout = QHBoxLayout()
self.prevButton = QPushButton("Previous")
self.prevButton.setAccessibleName("Previous Track")
self.prevButton.setEnabled(False)
controlsLayout.addWidget(self.prevButton)
self.playButton = QPushButton("Play/Pause")
self.playButton.setAccessibleName("Play or Pause")
self.playButton.setEnabled(False)
controlsLayout.addWidget(self.playButton)
self.stopButton = QPushButton("Stop")
self.stopButton.setAccessibleName("Stop Playback")
self.stopButton.setEnabled(False)
controlsLayout.addWidget(self.stopButton)
self.nextButton = QPushButton("Next")
self.nextButton.setAccessibleName("Next Track")
self.nextButton.setEnabled(False)
controlsLayout.addWidget(self.nextButton)
rightLayout.addLayout(controlsLayout)
queueLabel = QLabel("Play Queue")
queueLabel.setAccessibleName("Queue Section")
rightLayout.addWidget(queueLabel)
self.queueList = QListWidget()
self.queueList.setAccessibleName("Queue List")
self.queueList.setSelectionMode(QListWidget.SingleSelection)
self.queueList.setEnabled(False)
rightLayout.addWidget(self.queueList)
queueButtons = QHBoxLayout()
self.removeQueueButton = QPushButton("Remove Selected")
self.removeQueueButton.setAccessibleName("Remove Selected Track from Queue")
self.removeQueueButton.setEnabled(False)
queueButtons.addWidget(self.removeQueueButton)
self.clearQueueButton = QPushButton("Clear Queue")
self.clearQueueButton.setAccessibleName("Clear Queue")
self.clearQueueButton.setEnabled(False)
queueButtons.addWidget(self.clearQueueButton)
rightLayout.addLayout(queueButtons)
rightLayout.addStretch()
self.splitter.addWidget(rightPanel)
self.splitter.setSizes([560, 420])
mainLayout.addWidget(self.splitter)
# Status bar
self.statusBar = QStatusBar()
self.setStatusBar(self.statusBar)
self.connectionStatus = QLabel("Not connected")
self.connectionStatus.setAccessibleName("Connection Status")
self.statusBar.addPermanentWidget(self.connectionStatus)
self.playbackStatus = QLabel("Stopped")
self.playbackStatus.setAccessibleName("Playback Status")
self.statusBar.addPermanentWidget(self.playbackStatus)
self.volumeStatus = QLabel(f"Volume: {self.volume}%")
self.volumeStatus.setAccessibleName("Volume Status")
self.statusBar.addPermanentWidget(self.volumeStatus)
self.liveRegion = QLabel("")
self.liveRegion.setAccessibleName("Live Region")
self.liveRegion.setAccessibleDescription("Announcements")
self.liveRegion.setStyleSheet("color: transparent;")
self.statusBar.addPermanentWidget(self.liveRegion, 1)
def connectSignals(self):
"""Connect signals after widgets exist."""
self.libraryTree.itemExpanded.connect(self.onLibraryExpanded)
self.libraryTree.itemActivated.connect(self.onLibraryActivated)
self.libraryTree.customContextMenuRequested.connect(self.onLibraryContextMenu)
self.playButton.clicked.connect(self.playPauseAction.trigger)
self.prevButton.clicked.connect(self.previousAction.trigger)
self.nextButton.clicked.connect(self.nextAction.trigger)
self.stopButton.clicked.connect(self.stopAction.trigger)
self.playPauseAction.triggered.connect(self.playback.togglePlayPause)
self.stopAction.triggered.connect(self.playback.stop)
self.previousAction.triggered.connect(self.handlePreviousShortcut)
self.nextAction.triggered.connect(self.handleNextShortcut)
self.shuffleAction.toggled.connect(self.onShuffleToggled)
self.repeatAction.toggled.connect(self.onRepeatToggled)
self.announceTrackChangesAction.toggled.connect(self.onAnnounceTrackChangesToggled)
self.volumeUpAction.triggered.connect(lambda: self.adjustVolume(5))
self.volumeDownAction.triggered.connect(lambda: self.adjustVolume(-5))
self.favoriteAction.triggered.connect(self.favoriteSelection)
self.unfavoriteAction.triggered.connect(self.unfavoriteSelection)
self.searchAction.triggered.connect(self.openSearchDialog)
self.positionSlider.sliderMoved.connect(self.onSeekRequested)
self.queueList.itemActivated.connect(self.onQueueItemActivated)
self.removeQueueButton.clicked.connect(self.removeSelectedFromQueue)
self.clearQueueButton.clicked.connect(self.clearQueue)
self.announceTrackAction.triggered.connect(self.announceTrack)
self.announcePositionAction.triggered.connect(self.announcePosition)
self.announceTrackLiveShortcut = QShortcut(QKeySequence("Alt+C"), self)
self.announceTrackLiveShortcut.activated.connect(self.announceCurrentTrackLive)
# Explicit shortcut to open search dialog regardless of focus
self.searchShortcut = QShortcut(QKeySequence("Ctrl+F"), self)
self.searchShortcut.setContext(Qt.ApplicationShortcut)
self.searchShortcut.activated.connect(self.openSearchDialog)
self.interactCurrentShortcut = QShortcut(QKeySequence("Alt+I"), self)
self.interactCurrentShortcut.setContext(Qt.ApplicationShortcut)
self.interactCurrentShortcut.activated.connect(self.showCurrentMediaContextMenu)
self.previousShortcut = QShortcut(QKeySequence("Z"), self)
self.previousShortcut.activated.connect(self.handlePreviousShortcut)
self.playShortcut = QShortcut(QKeySequence("X"), self)
self.playShortcut.activated.connect(self.handlePlayShortcut)
self.pauseToggleShortcut = QShortcut(QKeySequence("C"), self)
self.pauseToggleShortcut.activated.connect(self.handlePauseToggleShortcut)
self.stopShortcut = QShortcut(QKeySequence("V"), self)
self.stopShortcut.activated.connect(self.handleStopShortcut)
self.nextShortcut = QShortcut(QKeySequence("B"), self)
self.nextShortcut.activated.connect(self.handleNextShortcut)
self.playback.trackChanged.connect(self.onTrackChanged)
self.playback.queueChanged.connect(self.onQueueChanged)
self.playback.playbackStateChanged.connect(self.onPlaybackStateChanged)
self.playback.positionChanged.connect(self.onPositionChanged)
self.playback.errorOccurred.connect(self.onPlaybackError)
def setupMpris(self):
"""Initialize the MPRIS service and wire signal updates."""
try:
self.mpris = MprisService(
playback=self.playback,
art_url_resolver=self._resolveCoverArtUrl,
raise_callback=self.bringToFront,
quit_callback=self.close,
on_volume_changed=self.setVolumeFromExternal,
on_shuffle_changed=lambda enabled: self.setShuffleFromExternal(enabled, announce=False),
on_loop_changed=lambda loop: self.setRepeatFromExternal(loop, announce=False),
)
except Exception:
self.mpris = None
return
if not self.mpris or not self.mpris.available:
self.mpris = None
return
self.playback.trackChanged.connect(self.mpris.updateMetadata)
self.playback.playbackStateChanged.connect(self.mpris.updatePlaybackStatus)
self.playback.queueChanged.connect(self._handleQueueChangedForMpris)
self.playback.errorOccurred.connect(lambda _message: self.mpris.updatePlaybackStatus("stopped"))
self.mpris.updateMetadata(self.playback.currentSong())
self.mpris.updatePlaybackStatus("stopped")
self.mpris.notifyCapabilities()
self.mpris.notifyVolumeChanged()
self.mpris.notifyShuffleChanged()
self.mpris.notifyLoopStatus()
# Additional keyboard shortcuts for quick volume changes
self.volumeUpShortcut = QShortcut(QKeySequence("0"), self)
self.volumeUpShortcut.activated.connect(lambda: self.adjustVolume(5))
self.volumeDownShortcut = QShortcut(QKeySequence("9"), self)
self.volumeDownShortcut.activated.connect(lambda: self.adjustVolume(-5))
# ============ Connection handling ============
def showServerSetup(self):
"""Show the server setup dialog"""
dialog = ServerDialog(self.settings, parent=self)
if dialog.exec():
serverName = dialog.getServerName()
self.logger.info("Server setup completed, connecting to '%s'", serverName)
self.connectToServer(serverName)
else:
self.logger.info("Server setup dialog canceled")
def connectToDefaultServer(self):
"""Connect to the default server"""
defaultServer = self.settings.getDefaultServer()
if defaultServer:
self.logger.info("Connecting to default server '%s'", defaultServer)
self.connectToServer(defaultServer)
else:
servers = self.settings.getServers()
if servers:
self.logger.info("No default server set; connecting to first configured")
self.connectToServer(list(servers.keys())[0])
else:
self.logger.warning("connectToDefaultServer called with no servers configured")
def connectToServer(self, serverName: str):
"""Connect to a specific server"""
server = self.settings.getServer(serverName)
if not server:
AccessibleTextDialog.showError(
"Connection Error",
f"Server '{serverName}' not found in configuration",
parent=self
)
return
self.connectionStatus.setText(f"Connecting to {serverName}...")
self._run_background(
"connect",
lambda: self._ping_server(server),
on_success=lambda client: self._on_connection_success(serverName, client),
on_error=lambda err: self.handleConnectionFailure(f"Failed to connect: {err}")
)
def _ping_server(self, server: Dict[str, str]) -> SubsonicClient:
"""Create client and ping server off the UI thread."""
client = SubsonicClient(
server['url'],
server['username'],
server['password']
)
self.logger.info("Pinging %s", server.get("url"))
if not client.ping():
self.logger.error("Ping failed for %s", server.get("url"))
raise SubsonicError(70, "Failed to connect to server. Please check your settings.")
self.logger.info("Ping succeeded for %s", server.get("url"))
return client
def _on_connection_success(self, serverName: str, rawClient: SubsonicClient):
"""Handle successful connection on the UI thread."""
self.logger.info("Connected to %s, starting connected flow", serverName)
self._rawClient = rawClient
# Wrap with caching layer
self.client = CachedClient(
rawClient,
getDataDir(),
onSyncProgress=self._onSyncProgress
)
self.connectionStatus.setText(f"Connected to {serverName}")
self.playback.setStreamResolver(lambda song: self.client.getStreamUrl(song.id))
self.onConnected()
self.logger.info("Connected to %s", serverName)
def handleConnectionFailure(self, message: str):
"""Handle errors when connecting to the server."""
self.connectionStatus.setText("Connection failed")
AccessibleTextDialog.showError("Connection Error", message, parent=self)
self.logger.error("Connection failed: %s", message)
def onConnected(self):
"""Called when successfully connected to a server"""
self.logger.info("onConnected: enabling actions and loading library")
self.refreshAction.setEnabled(True)
self.fullSyncAction.setEnabled(True)
self.searchAction.setEnabled(True)
self.libraryTree.setEnabled(True)
# Load from cache first if available, then sync in background
if self.client and self.client.hasCache():
self.logger.info("Cache found, loading from cache and syncing in background")
self.libraryStatus.setText("Loading from cache...")
self._startStatusAnnouncements("Loading library from cache")
self._run_background(
"load from cache",
self._fetchLibraryData,
on_success=self._applyLibraryDataAndSync,
on_error=lambda err: self._handleLibraryLoadError(str(err))
)
else:
self.logger.info("No cache found, performing full sync")
self.libraryStatus.setText("Building library cache...")
self._startStatusAnnouncements("Building library cache, please wait")
self.refreshLibraryFull()
# ============ Library browsing ============
def refreshLibrary(self):
"""Refresh the library browser with incremental sync (F5)."""
if not self.client:
return
self.refreshAction.setEnabled(False)
self.fullSyncAction.setEnabled(False)
self.searchAction.setEnabled(False)
self.libraryStatus.setText("Syncing library changes...")
self._startStatusAnnouncements("Updating library cache")
self.logger.info("Refreshing library (incremental sync)...")
self._run_background(
"incremental sync",
self._incrementalSync,
on_success=self._onSyncComplete,
on_error=lambda err: self._handleLibraryLoadError(str(err))
)
def refreshLibraryFull(self):
"""Perform a full library sync (Shift+F5)."""
if not self.client:
return
self.refreshAction.setEnabled(False)
self.fullSyncAction.setEnabled(False)
self.searchAction.setEnabled(False)
self.libraryTree.clear()
self.libraryTree.setEnabled(False)
self.libraryStatus.setText("Full sync in progress...")
self._startStatusAnnouncements("Full library sync in progress")
self.logger.info("Refreshing library (full sync)...")
self._run_background(
"full sync",
self._fullSync,
on_success=self._onSyncComplete,
on_error=lambda err: self._handleLibraryLoadError(str(err))
)
def _incrementalSync(self) -> Dict[str, int]:
"""Perform incremental sync in background."""
if not self.client:
raise SubsonicError(70, "Not connected")
return self.client.syncIncremental()
def _fullSync(self) -> Dict[str, int]:
"""Perform full sync in background."""
if not self.client:
raise SubsonicError(70, "Not connected")
return self.client.syncFull()
def _onSyncComplete(self, stats: Dict[str, int]):
"""Handle sync completion - reload library from cache."""
self.logger.info("Sync complete: %s", stats)
# Now load the UI from the updated cache
self._run_background(
"load library",
self._fetchLibraryData,
on_success=self._applyLibraryData,
on_error=lambda err: self._handleLibraryLoadError(str(err))
)
def _applyLibraryDataAndSync(self, data: Dict[str, Any]):
"""Apply library data from cache, then start background sync."""
self.logger.info("Applying library data to UI...")
self._applyLibraryData(data)
if self._shouldRunStartupIncremental():
self.logger.info("Library UI populated, starting background sync")
self.libraryStatus.setText("Syncing...")
self._run_background(
"background sync",
self._incrementalSync,
on_success=self._onBackgroundSyncComplete,
on_error=lambda err: self.logger.warning("Background sync failed: %s", err)
)
else:
self.logger.info("Skipping startup incremental sync (recently synced)")
self.libraryStatus.setText("Library ready (recently synced)")
self.announceLive("Library ready")
def _onBackgroundSyncComplete(self, stats: Dict[str, int]):
"""Handle background sync completion - refresh UI if changes detected."""
self.logger.info("Background sync complete: %s", stats)
totalChanges = sum(stats.values())
if totalChanges > 0:
self.announceLive(f"Library synced: {totalChanges} items updated")
self.libraryStatus.setText("Library ready")
def _shouldRunStartupIncremental(self) -> bool:
"""Decide whether to run incremental sync at startup based on recent checks."""
if not self.client:
return False
last = self.client.getLastSyncTime("incremental")
if not last:
return True
try:
return datetime.now() - last > self._startupSyncInterval
except Exception:
return True
def _onSyncProgress(self, stage: str, current: int, total: int):
"""Handle sync progress updates on UI thread."""
if total > 0:
message = f"Syncing {stage}: {current}/{total}"
else:
message = f"Syncing {stage}..."
self.logger.info(message)
# Update the repeating announcement message
self._currentStatusMessage = message
# Capture message by value to avoid closure issues
self._on_ui(lambda msg=message: self.libraryStatus.setText(msg))
def _startStatusAnnouncements(self, message: str):
"""Start repeating status announcements every 10 seconds."""
self._currentStatusMessage = message
self.announceLive(message)
self._statusAnnounceTimer.start(10000)
def _stopStatusAnnouncements(self):
"""Stop repeating status announcements."""
self._statusAnnounceTimer.stop()
self._currentStatusMessage = ""
def _announceCurrentStatus(self):
"""Called by timer to repeat the current status message."""
if self._currentStatusMessage:
# Clear the live region first so screen readers re-announce identical content
self.announceLive("")
self.announceLive(self._currentStatusMessage)
def _fetchLibraryData(self) -> Dict[str, Any]:
"""Fetch core library sections in the background."""
if not self.client:
raise SubsonicError(70, "Not connected")
artists = self.client.getArtists()
playlists = self.client.getPlaylists()
genres = self.client.getGenres()
favorites = self.client.getStarred()
self.logger.info(
"Fetched library data (artists=%d, playlists=%d, genres=%d, favorites songs=%d)",
len(artists),
len(playlists),
len(genres),
len(favorites.get("songs", [])) if isinstance(favorites, dict) else 0,
)
return {
"artists": artists,
"playlists": playlists,
"genres": genres,
"favorites": favorites,
}
def _applyLibraryData(self, data: Dict[str, Any]):
"""Populate the library tree with fetched data on the UI thread."""
if not self.client:
return
self.logger.info("Library data fetched; building tree")
self.libraryTree.clear()
# Root sections
self.artistsRoot = QTreeWidgetItem(["Artists"])
self.artistsRoot.setData(0, Qt.UserRole, {"type": "artists_root", "loaded": True})
self.artistsRoot.setExpanded(True)
self.libraryTree.addTopLevelItem(self.artistsRoot)
self.favoritesRoot = QTreeWidgetItem(["Favorites"])
self.favoritesRoot.setData(0, Qt.UserRole, {"type": "favorites_root", "loaded": True})
self.favoritesRoot.setExpanded(True)
self.libraryTree.addTopLevelItem(self.favoritesRoot)
self._populateFavorites(data.get("favorites", {}))
self.discoverRoot = QTreeWidgetItem(["Discover"])
self.discoverRoot.setData(0, Qt.UserRole, {"type": "discover_root", "loaded": True})
self.discoverRoot.setExpanded(True)
self.libraryTree.addTopLevelItem(self.discoverRoot)
self.loadDiscoverSections()
self.playlistsRoot = QTreeWidgetItem(["Playlists"])
self.playlistsRoot.setData(0, Qt.UserRole, {"type": "playlists_root", "loaded": True})
self.playlistsRoot.setExpanded(True)
self.libraryTree.addTopLevelItem(self.playlistsRoot)
self.genresRoot = QTreeWidgetItem(["Genres"])
self.genresRoot.setData(0, Qt.UserRole, {"type": "genres_root", "loaded": True})
self.genresRoot.setExpanded(True)
self.libraryTree.addTopLevelItem(self.genresRoot)
self._populateArtists(data.get("artists", []))
self._populatePlaylists(data.get("playlists", []))
self._populateGenres(data.get("genres", []))
self.libraryStatus.setText("Library ready. Use the tree to browse and press Enter to play.")
self.libraryTree.setEnabled(True)
if self.artistsRoot and self.artistsRoot.childCount() > 0:
self.libraryTree.setCurrentItem(self.artistsRoot)
self.libraryTree.setFocus(Qt.OtherFocusReason)
self.refreshAction.setEnabled(True)
self.fullSyncAction.setEnabled(True)
self.searchAction.setEnabled(True)
self._stopStatusAnnouncements()
self.announceLive("Library ready")
self.logger.info("Library ready with sections: artists=%d playlists=%d genres=%d",
self.artistsRoot.childCount(),
self.playlistsRoot.childCount(),
self.genresRoot.childCount())
if all(root.childCount() == 0 for root in (self.artistsRoot, self.playlistsRoot, self.genresRoot)):
self.libraryStatus.setText("No library data returned from server.")
def _handleLibraryLoadError(self, message: str):
"""Show error when library fails to load."""
self._stopStatusAnnouncements()
AccessibleTextDialog.showError("Error", f"Failed to load library: {message}", parent=self)
self.libraryStatus.setText("Library failed to load.")
self.refreshAction.setEnabled(True)
self.fullSyncAction.setEnabled(True)
self.searchAction.setEnabled(bool(self.client))
self.libraryTree.setEnabled(True)
self.logger.error("Library load failed: %s", message)
def _populateArtists(self, artists: List[Artist]):
"""Populate top-level artists from fetched data."""
for artist in artists:
item = QTreeWidgetItem([artist.name])
description = f"{artist.albumCount} albums" if artist.albumCount else "Artist"
item.setData(0, Qt.UserRole, {"type": "artist", "id": artist.id, "artist": artist, "loaded": False})
item.setData(0, Qt.AccessibleTextRole, f"{artist.name}, {description}")
item.addChild(QTreeWidgetItem(["Loading albums..."]))
self.artistsRoot.addChild(item)
def _populatePlaylists(self, playlists: List[Playlist]):
"""Populate top-level playlists from fetched data."""
for playlist in playlists:
label = f"{playlist.name} ({playlist.songCount} tracks)"
item = QTreeWidgetItem([label])
item.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": False})
item.setData(0, Qt.AccessibleTextRole, label)
item.addChild(QTreeWidgetItem(["Loading tracks..."]))
self.playlistsRoot.addChild(item)
def _populateGenres(self, genres: List[Genre]):
"""Populate top-level genres from fetched data."""
for genre in genres:
label = f"{genre.name} ({genre.songCount} songs)"
item = QTreeWidgetItem([label])
item.setData(0, Qt.UserRole, {"type": "genre", "name": genre.name, "loaded": False})
item.setData(0, Qt.AccessibleTextRole, label)
item.addChild(QTreeWidgetItem(["Loading songs..."]))
self.genresRoot.addChild(item)
def _populateFavorites(self, starred: Dict[str, List[Song]]):
"""Populate favorites with pre-fetched starred songs."""
songs: List[Song] = starred.get("songs", []) if starred else []
self.favoritesRoot.takeChildren()
if not songs:
self.favoritesRoot.addChild(QTreeWidgetItem(["No favorites yet."]))
return
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
item = QTreeWidgetItem([label])
item.setData(0, Qt.UserRole, {"type": "song", "song": song})
item.setData(0, Qt.AccessibleTextRole, label)
self.favoritesRoot.addChild(item)
def loadDiscoverSections(self):
"""Create discover sections for album lists and recommendations."""
sections = [
("Recently Added", "newest"),
("Frequently Played", "frequent"),
("Recently Played", "recent"),
("Random Albums", "random"),
]
for label, listType in sections:
item = QTreeWidgetItem([label])
item.setData(0, Qt.UserRole, {"type": "album_list", "listType": listType, "loaded": False})
item.setData(0, Qt.AccessibleTextRole, label)
item.addChild(QTreeWidgetItem(["Loading albums..."]))
self.discoverRoot.addChild(item)
nowPlaying = QTreeWidgetItem(["Now Playing (server)"])
nowPlaying.setData(0, Qt.UserRole, {"type": "now_playing", "loaded": False})
nowPlaying.setData(0, Qt.AccessibleTextRole, "Now Playing across users")
nowPlaying.addChild(QTreeWidgetItem(["Loading songs..."]))
self.discoverRoot.addChild(nowPlaying)
similarCurrent = QTreeWidgetItem(["Similar to Current Track"])
similarCurrent.setData(0, Qt.UserRole, {"type": "similar_current", "loaded": False})
similarCurrent.setData(0, Qt.AccessibleTextRole, "Songs similar to the current track")
similarCurrent.addChild(QTreeWidgetItem(["Load to view similar tracks..."]))
self.discoverRoot.addChild(similarCurrent)
def onLibraryExpanded(self, item: QTreeWidgetItem):
"""Lazy-load children when an item is expanded."""
data: Dict = item.data(0, Qt.UserRole) or {}
if data.get("loaded"):
return
itemType = data.get("type")
self._setLoadingPlaceholder(item, "Loading...")
if itemType == "artist":
self._run_background(
"load artist albums",
lambda: self._fetchArtistAlbums(data.get("id")),
on_success=lambda albums: self._applyArtistAlbums(item, albums),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "album":
self._run_background(
"load album songs",
lambda: self._fetchAlbumSongs(data.get("id")),
on_success=lambda songs: self._applyAlbumSongs(item, songs),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "playlist":
self._run_background(
"load playlist songs",
lambda: self._fetchPlaylistSongs(data.get("id")),
on_success=lambda playlist: self._applyPlaylistSongs(item, playlist),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "genre":
self._run_background(
"load genre songs",
lambda: self._fetchGenreSongs(data.get("name")),
on_success=lambda songs: self._applyGenreSongs(item, songs),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "album_list":
self._run_background(
"load discover section",
lambda: self._fetchAlbumList(data.get("listType", "newest")),
on_success=lambda albums: self._applyAlbumListSection(item, albums),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "now_playing":
self._run_background(
"load now playing",
self._fetchNowPlaying,
on_success=lambda songs: self._applyNowPlaying(item, songs),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
elif itemType == "similar_current":
self._run_background(
"load similar songs",
self._fetchSimilarToCurrent,
on_success=lambda songs: self._applySimilarToCurrent(item, songs),
on_error=lambda err: self._handleItemLoadError(item, str(err))
)
def onLibraryActivated(self, item: QTreeWidgetItem):
"""Handle activation (Enter/Return) in the library tree."""
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
if itemType == "song":
song: Song = data["song"]
self.playback.enqueue(song, playNow=True)
self.statusBar.showMessage(f"Playing {song.title} by {song.artist}", 4000)
self.enablePlaybackActions(True)
elif itemType == "playlist":
self.playPlaylist(data.get("id"), item)
elif itemType == "album":
self.playAlbum(data.get("id"), item)
elif itemType == "artist":
self.playArtist(data.get("id"), item)
elif itemType == "artists_root":
self.playAllArtists()
elif itemType == "favorites_root":
self.playFavorites()
elif itemType == "genre":
self.playGenre(data.get("name"), item)
elif itemType in ("album_list", "now_playing", "similar_current"):
item.setExpanded(not item.isExpanded())
else:
# Toggle expansion for other nodes
item.setExpanded(not item.isExpanded())
def onLibraryContextMenu(self, position: QPoint):
"""Show a context menu for the library tree (mouse or keyboard)."""
item = self.libraryTree.itemAt(position)
if not item:
return
# Ensure actions target the item the user invoked the menu on
self.libraryTree.setCurrentItem(item)
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
hasClient = self.client is not None
menu = QMenu(self)
menu.setAccessibleName("Library Context Menu")
if itemType == "song":
song: Optional[Song] = data.get("song")
menu.addAction("Play", lambda _=False, song=song: self.playback.enqueue(song, playNow=True) if song else None).setEnabled(hasClient and song is not None)
menu.addAction("Add to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient and song is not None)
menu.addAction("Add to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient and song is not None)
menu.addSeparator()
menu.addAction("Favorite", self.favoriteSelection).setEnabled(hasClient)
menu.addAction("Unfavorite", self.unfavoriteSelection).setEnabled(hasClient)
elif itemType == "album":
menu.addAction("Play Album", lambda _=False, item=item, data=data: self.playAlbum(data.get("id"), item)).setEnabled(hasClient)
menu.addAction("Add Album to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add Album to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
menu.addSeparator()
menu.addAction("Favorite Album", self.favoriteSelection).setEnabled(hasClient)
menu.addAction("Unfavorite Album", self.unfavoriteSelection).setEnabled(hasClient)
elif itemType == "artist":
menu.addAction("Play Artist", lambda _=False, item=item, data=data: self.playArtist(data.get("id"), item)).setEnabled(hasClient)
menu.addAction("Add Artist to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add Artist to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
menu.addSeparator()
menu.addAction("Favorite Artist", self.favoriteSelection).setEnabled(hasClient)
menu.addAction("Unfavorite Artist", self.unfavoriteSelection).setEnabled(hasClient)
elif itemType == "playlist":
menu.addAction("Play Playlist", lambda _=False, item=item, data=data: self.playPlaylist(data.get("id"), item)).setEnabled(hasClient)
menu.addAction("Add Playlist to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add Playlist to Another Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
elif itemType == "genre":
menu.addAction("Play Genre", lambda _=False, item=item, data=data: self.playGenre(data.get("name"), item)).setEnabled(hasClient)
menu.addAction("Add Genre to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add Genre to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
elif itemType == "artists_root":
menu.addAction("Play All Artists", self.playAllArtists).setEnabled(hasClient)
menu.addAction("Add All Artists to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add All Artists to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
elif itemType == "favorites_root":
menu.addAction("Play Favorites", self.playFavorites).setEnabled(hasClient)
menu.addAction("Add Favorites to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
menu.addAction("Add Favorites to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
if not menu.actions():
return
label = (item.text(0) or "").strip()
if label:
self.announceLive(f"Context menu for {label} opened.")
else:
self.announceLive("Context menu opened.")
globalPos = self.libraryTree.viewport().mapToGlobal(position)
menu.exec(globalPos)
def showCurrentMediaContextMenu(self):
"""Open a context menu for the currently playing song (Alt+I)."""
currentSong = self.playback.currentSong()
if currentSong:
row = self.playback.currentIndex
if row is not None and 0 <= row < len(self.playback.queue):
self.queueList.setCurrentRow(row)
self._showQueueSongContextMenu(row, announce=True)
return
self._showSongContextMenu(currentSong, self.queueList.viewport(), announce=True)
return
item = self.libraryTree.currentItem()
if item:
rect = self.libraryTree.visualItemRect(item)
if rect.isValid():
self.announceLive(f"Context menu for {item.text(0)} opened.")
self.onLibraryContextMenu(rect.center())
return
row = self.queueList.currentRow()
if 0 <= row < len(self.playback.queue):
self._showQueueSongContextMenu(row, announce=True)
return
self.announceLive("No track to interact with.")
def _showQueueSongContextMenu(self, row: int, announce: bool = False):
"""Show a song context menu for a queue row."""
if not (0 <= row < len(self.playback.queue)):
return
song = self.playback.queue[row]
item = self.queueList.item(row)
rect = self.queueList.visualItemRect(item) if item else None
anchor = rect.center() if rect and rect.isValid() else self.queueList.viewport().rect().center()
self._showSongContextMenu(song, self.queueList.viewport(), anchor, queueRow=row, announce=announce)
def _showSongContextMenu(self, song: Song, anchorWidget: QWidget, anchor: Optional[QPoint] = None, queueRow: Optional[int] = None, announce: bool = False):
"""Show a context menu with song actions at the provided anchor."""
menu = QMenu(self)
menu.setAccessibleName("Song Context Menu")
hasClient = self.client is not None
menu.addAction("Play", lambda _=False, row=queueRow, s=song: self._playSongFromMenu(s, row)).setEnabled(hasClient)
menu.addAction("Add to Queue", lambda _=False, s=song: self.playback.enqueue(s, playNow=False)).setEnabled(hasClient)
menu.addAction("Add to Playlist", lambda _=False, s=song: self._addSongToPlaylistDirect(s)).setEnabled(hasClient)
menu.addSeparator()
menu.addAction("Favorite", lambda _=False, s=song: self._favoriteSongDirect(s)).setEnabled(hasClient)
menu.addAction("Unfavorite", lambda _=False, s=song: self._unfavoriteSongDirect(s)).setEnabled(hasClient)
if not menu.actions():
return
anchor = anchor or anchorWidget.rect().center()
globalPos = anchorWidget.mapToGlobal(anchor)
if announce:
label = self.buildTrackAnnouncement(song)
self.announceLive(f"Context menu for {label} opened.")
menu.exec(globalPos)
def _playSongFromMenu(self, song: Song, queueRow: Optional[int]):
"""Play a song, reusing queue position when available."""
if queueRow is not None and 0 <= queueRow < len(self.playback.queue):
self.playback.playIndex(queueRow)
return
self.playback.enqueue(song, playNow=True)
def _addSongToPlaylistDirect(self, song: Song):
"""Open the playlist dialog for a single song."""
if not self.client:
self.announceLive("Not connected to a server.")
return
label = self.formatSongLabel(song, includeAlbum=True)
self._collectPlaylistsAndShowDialog([song], label, truncated=False)
def _favoriteSongDirect(self, song: Song):
"""Star a song without relying on tree selection."""
if not self.client:
self.announceLive("Not connected to a server.")
return
label = self.buildTrackAnnouncement(song)
try:
self.client.star(song.id, "song")
message = self.buildFavoriteAnnouncement("favorite", song, label)
self.statusBar.showMessage(message, 3000)
self.announceLive(message)
except Exception as exc: # noqa: BLE001
self.announceLive(self.buildFavoriteFailureAnnouncement("favorite", song, label))
AccessibleTextDialog.showError("Favorite Failed", str(exc), parent=self)
def _unfavoriteSongDirect(self, song: Song):
"""Unstar a song without relying on tree selection."""
if not self.client:
self.announceLive("Not connected to a server.")
return
label = self.buildTrackAnnouncement(song)
try:
self.client.unstar(song.id, "song")
message = self.buildFavoriteAnnouncement("unfavorite", song, label)
self.statusBar.showMessage(message, 3000)
self.announceLive(message)
except Exception as exc: # noqa: BLE001
self.announceLive(self.buildFavoriteFailureAnnouncement("unfavorite", song, label))
AccessibleTextDialog.showError("Unfavorite Failed", str(exc), parent=self)
def _confirmBulkAction(self, action: str, label: str, count: Optional[int] = None, truncated: bool = False) -> bool:
"""Confirm bulk-affecting actions like favoriting many tracks."""
target = label or "selection"
countText = f"{count} " if count else ""
plural = "s" if count and count != 1 else ""
truncatedText = " (first batch shown)" if truncated else ""
message = f"This will {action} {countText}track{plural} from {target}{truncatedText}. Continue?"
response = QMessageBox.question(
self,
"Confirm action",
message,
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
return response == QMessageBox.Yes
def _fetchArtistAlbums(self, artistId: str) -> List[Album]:
"""Fetch albums for a given artist off the UI thread."""
result = self.client.getArtist(artistId)
return result.get("albums", [])
def _applyArtistAlbums(self, parentItem: QTreeWidgetItem, albums: List[Album]):
"""Add albums for a given artist on the UI thread."""
parentItem.takeChildren()
for album in albums:
label = f"{album.name} ({album.songCount} tracks)"
albumItem = QTreeWidgetItem([label])
albumItem.setData(0, Qt.UserRole, {"type": "album", "id": album.id, "album": album, "loaded": False})
albumItem.setData(0, Qt.AccessibleTextRole, label)
albumItem.addChild(QTreeWidgetItem(["Loading tracks..."]))
parentItem.addChild(albumItem)
self._markItemLoaded(parentItem)
def _fetchAlbumSongs(self, albumId: str) -> List[Song]:
"""Fetch songs for a given album off the UI thread."""
result = self.client.getAlbum(albumId)
return result.get("songs", [])
def _applyAlbumSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Add songs for a given album on the UI thread."""
parentItem.takeChildren()
for song in songs:
label = self.formatSongLabel(song)
songItem = QTreeWidgetItem([label])
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
songItem.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(songItem)
self._markItemLoaded(parentItem)
def _fetchPlaylistSongs(self, playlistId: str) -> Playlist:
"""Fetch playlist with songs off the UI thread."""
return self.client.getPlaylist(playlistId)
def _applyPlaylistSongs(self, parentItem: QTreeWidgetItem, playlist: Playlist):
"""Add songs for a playlist on the UI thread."""
parentItem.takeChildren()
for song in playlist.songs:
label = self.formatSongLabel(song, includeAlbum=True)
songItem = QTreeWidgetItem([label])
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
songItem.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(songItem)
parentItem.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": True})
def _fetchGenreSongs(self, genreName: str) -> List[Song]:
"""Fetch songs for a genre off the UI thread."""
return self.client.getSongsByGenre(genreName, count=200)
def _applyGenreSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Add songs for a genre on the UI thread."""
parentItem.takeChildren()
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
songItem = QTreeWidgetItem([label])
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
songItem.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(songItem)
self._markItemLoaded(parentItem)
def _fetchAlbumList(self, listType: str) -> List[Album]:
"""Fetch album list for discover section off the UI thread."""
return self.client.getAlbumList(listType, size=40)
def _applyAlbumListSection(self, parentItem: QTreeWidgetItem, albums: List[Album]):
"""Populate a discover album list section on the UI thread."""
parentItem.takeChildren()
for album in albums:
label = f"{album.name}{album.artist} ({album.songCount} tracks)"
albumItem = QTreeWidgetItem([label])
albumItem.setData(0, Qt.UserRole, {"type": "album", "id": album.id, "album": album, "loaded": False})
albumItem.setData(0, Qt.AccessibleTextRole, label)
albumItem.addChild(QTreeWidgetItem(["Loading tracks..."]))
parentItem.addChild(albumItem)
self._markItemLoaded(parentItem)
def _fetchNowPlaying(self) -> List[Song]:
"""Fetch songs currently playing across users off the UI thread."""
return self.client.getNowPlaying()
def _applyNowPlaying(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Populate songs currently playing across users on the UI thread."""
parentItem.takeChildren()
if not songs:
parentItem.addChild(QTreeWidgetItem(["Nothing is currently playing."]))
return
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
songItem = QTreeWidgetItem([label])
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
songItem.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(songItem)
self._markItemLoaded(parentItem)
def _fetchSimilarToCurrent(self) -> List[Song]:
"""Fetch songs similar to the current track off the UI thread."""
current = self.playback.currentSong()
if not current:
return []
return self.client.getSimilarSongs(current.id, count=40)
def _applySimilarToCurrent(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Populate songs similar to the current track on the UI thread."""
parentItem.takeChildren()
current = self.playback.currentSong()
if not current:
parentItem.addChild(QTreeWidgetItem(["Play a track to see similar songs."]))
return
if not songs:
parentItem.addChild(QTreeWidgetItem([f"No similar tracks found for {current.title}."]))
return
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
songItem = QTreeWidgetItem([label])
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
songItem.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(songItem)
self._markItemLoaded(parentItem)
@staticmethod
def formatSongLabel(song: Song, includeAlbum: bool = False) -> str:
"""Create a descriptive label for a song."""
prefix = f"{song.track}. " if song.track else ""
core = f"{prefix}{song.title}"
details = f"{song.artist}"
if includeAlbum:
details = f"{details}{song.album}"
return f"{core} ({song.durationFormatted}) — {details}"
def _populateAlbumItemWithSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Populate an album tree item with song children."""
parentItem.takeChildren()
for song in songs:
label = self.formatSongLabel(song)
child = QTreeWidgetItem([label])
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
child.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(child)
data: Dict = parentItem.data(0, Qt.UserRole) or {}
data["loaded"] = True
parentItem.setData(0, Qt.UserRole, data)
def _populateGenreItemWithSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
"""Populate a genre item with the provided songs."""
parentItem.takeChildren()
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
child = QTreeWidgetItem([label])
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
child.setData(0, Qt.AccessibleTextRole, label)
parentItem.addChild(child)
data: Dict = parentItem.data(0, Qt.UserRole) or {}
data["loaded"] = True
parentItem.setData(0, Qt.UserRole, data)
def addItemToQueue(self, item: QTreeWidgetItem):
"""Append the item's songs to the queue without interrupting playback."""
if not self.client:
self.announceLive("Not connected to a server.")
return
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
if itemType == "song":
song: Optional[Song] = data.get("song")
if not song:
self.announceLive("No song selected.")
return
self.playback.enqueue(song, playNow=False)
self.statusBar.showMessage(f"Added {song.title} to the queue", 3000)
self.enablePlaybackActions(True)
return
self.statusBar.showMessage("Collecting songs...", 2000)
self._run_background(
"collect songs for queue",
lambda: self.collectSongsForItem(item),
on_success=self._onSongsCollectedForQueue,
on_error=lambda err: self._handleCollectionError(str(err))
)
def addItemToPlaylist(self, item: QTreeWidgetItem):
"""Append the item's songs to an existing or new playlist."""
if not self.client:
self.announceLive("Not connected to a server.")
return
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
if itemType == "song":
song: Optional[Song] = data.get("song")
songs = [song] if song else []
label = self.formatSongLabel(song, includeAlbum=True) if song else item.text(0)
self._collectPlaylistsAndShowDialog(songs, label, truncated=False)
return
self.statusBar.showMessage("Collecting songs...", 2000)
self._run_background(
"collect songs for playlist",
lambda: self.collectSongsForItem(item),
on_success=self._onSongsCollectedForPlaylist,
on_error=lambda err: self._handleCollectionError(str(err))
)
def collectSongsForItem(self, item: QTreeWidgetItem) -> Tuple[List[Song], str, bool]:
"""Resolve a tree item into a list of songs for queueing or playlist actions."""
if self._shutting_down:
return [], "", False
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
label = item.text(0)
truncated = False
request_count = 0
if not self.client:
return [], label, truncated
try:
if itemType == "song":
song: Optional[Song] = data.get("song")
return ([song] if song else []), (self.formatSongLabel(song, includeAlbum=True) if song else label), truncated
if itemType == "album":
albumData = self.client.getAlbum(data.get("id"))
album: Optional[Album] = albumData.get("album")
songs: List[Song] = albumData.get("songs", [])
return songs, (album.name if album else label), truncated
if itemType == "artist":
if self._shutting_down:
return [], label, truncated
artistData = self.client.getArtist(data.get("id"))
artist: Optional[Artist] = artistData.get("artist")
albums: List[Album] = artistData.get("albums", [])
songs: List[Song] = []
for album in albums:
if self._shutting_down:
break
albumData = self.client.getAlbum(album.id)
songs.extend(albumData.get("songs", []))
request_count += 1
self._throttle_requests(request_count)
return songs, (artist.name if artist else label), truncated
if itemType == "playlist":
playlist: Playlist = self.client.getPlaylist(data.get("id"))
return playlist.songs, playlist.name, truncated
if itemType == "genre":
genreName = data.get("name")
songs = self.client.getSongsByGenre(genreName)
return songs, (genreName or label), truncated
if itemType == "artists_root":
if self._shutting_down:
return [], "all artists", truncated
artists: List[Artist] = self.client.getArtists()
songs: List[Song] = []
for artist in artists:
if self._shutting_down:
break
artistData = self.client.getArtist(artist.id)
for album in artistData.get("albums", []):
if self._shutting_down:
break
albumData = self.client.getAlbum(album.id)
songs.extend(albumData.get("songs", []))
request_count += 1
self._throttle_requests(request_count)
return songs, "all artists", truncated
if itemType == "favorites_root":
starred = self.client.getStarred()
songs = starred.get("songs", [])
return songs, "favorites", truncated
if itemType == "album_list":
# These nodes contain album children; we don't aggregate here
return [], label, truncated
if itemType == "now_playing":
songs = self.client.getNowPlaying()
return songs, "now playing", truncated
if itemType == "similar_current":
current = self.playback.currentSong()
songs = self.client.getSimilarSongs(current.id, count=40) if current else []
return songs, "similar tracks", truncated
except Exception as e:
AccessibleTextDialog.showError("Queue Error", str(e), parent=self)
return [], label, truncated
def _onSongsCollectedForQueue(self, result: Tuple[List[Song], str, bool]):
"""Handle async collection results for queueing."""
songs, label, truncated = result
if not songs:
self.announceLive(f"No songs found for {label}")
return
self.playback.enqueueMany(songs, playFirst=False)
message = f"Added {len(songs)} tracks from {label} to the queue"
self.statusBar.showMessage(message, 4000)
self.enablePlaybackActions(True)
self.announceLive(message)
def _handleCollectionError(self, message: str):
"""Surface collection errors consistently."""
AccessibleTextDialog.showError("Queue Error", message, parent=self)
def _onSongsCollectedForPlaylist(self, result: Tuple[List[Song], str, bool]):
"""Handle async collection results before showing playlist dialog."""
songs, label, truncated = result
self._collectPlaylistsAndShowDialog(songs, label, truncated)
def _collectPlaylistsAndShowDialog(self, songs: List[Song], label: str, truncated: bool):
"""Fetch playlists in the background, then show the dialog on the UI thread."""
if not songs:
self.announceLive(f"No songs found for {label}")
return
if len(songs) > 1 and not self._confirmBulkAction("add to playlist", label, count=len(songs), truncated=truncated):
self.statusBar.showMessage("Add to playlist canceled", 2000)
return
self._run_background(
"load playlists",
self.client.getPlaylists,
on_success=lambda playlists: self._showAddToPlaylistDialog(playlists, songs, label, truncated),
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
)
def _showAddToPlaylistDialog(self, playlists: List[Playlist], songs: List[Song], label: str, truncated: bool):
"""Open playlist dialog and dispatch playlist updates in the background."""
dialog = AddToPlaylistDialog(playlists, parent=self)
if not dialog.exec():
return
mode, target = dialog.getResult()
songIds = [song.id for song in songs]
self._run_background(
"update playlist",
lambda: self._updateOrCreatePlaylist(mode, target, playlists, songIds),
on_success=lambda message: self._afterPlaylistUpdated(message, len(songs), truncated),
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
)
def _updateOrCreatePlaylist(self, mode: str, target: str, playlists: List[Playlist], songIds: List[str]) -> str:
"""Perform the playlist mutation off the UI thread and return a status message."""
if mode == "existing":
self.client.updatePlaylist(target, songIdsToAdd=songIds)
targetName = next((p.name for p in playlists if p.id == target), "playlist")
return f"Added tracks to {targetName}"
playlist = self.client.createPlaylist(target, songIds=songIds)
return f"Created playlist {playlist.name}"
def _afterPlaylistUpdated(self, baseMessage: str, count: int, truncated: bool):
"""Announce playlist mutation results on the UI thread."""
suffix = f" ({count} tracks)"
message = f"{baseMessage} {suffix}"
self.statusBar.showMessage(message, 4000)
self.announceLive(message)
# ============ Search ============
def openSearchDialog(self):
"""Open the search dialog."""
if not self.client:
self.logger.warning("Search requested but no client is connected")
self.announceLive("Connect to a server first to search.")
return
self.logger.info("Opening search dialog")
try:
dialog = SearchDialog(self.client, parent=self)
dialog.songActivated.connect(self.onSearchSongChosen)
dialog.exec()
self.logger.info("Search dialog closed")
except Exception as exc: # noqa: BLE001
self.logger.exception("Failed to open search dialog: %s", exc)
def onSearchSongChosen(self, song: Song):
"""Handle song selection from search."""
self.playback.enqueue(song, playNow=True)
self.enablePlaybackActions(True)
self.statusBar.showMessage(f"Playing {song.title} by {song.artist}", 4000)
# ============ Queue management ============
def onQueueChanged(self, queue: List[Song]):
"""Refresh the queue list widget."""
self.queueList.blockSignals(True)
self.queueList.clear()
for index, song in enumerate(queue):
text = f"{index + 1}. {song.artist}{song.title} ({song.durationFormatted})"
item = QListWidgetItem(text)
item.setData(Qt.UserRole, index)
item.setData(Qt.AccessibleTextRole, text)
self.queueList.addItem(item)
self.queueList.blockSignals(False)
hasItems = len(queue) > 0
self.queueList.setEnabled(hasItems)
self.removeQueueButton.setEnabled(hasItems)
self.clearQueueButton.setEnabled(hasItems)
self.enablePlaybackActions(hasItems)
self.announceTrackAction.setEnabled(hasItems)
self.announcePositionAction.setEnabled(hasItems)
def onQueueItemActivated(self, item: QListWidgetItem):
"""Play the selected queue item."""
index = item.data(Qt.UserRole)
if index is not None:
self.playback.playIndex(int(index))
def removeSelectedFromQueue(self):
"""Remove the selected item from the queue."""
row = self.queueList.currentRow()
if row < 0:
return
if 0 <= row < len(self.playback.queue):
del self.playback.queue[row]
if row <= self.playback.currentIndex:
self.playback.currentIndex = max(-1, self.playback.currentIndex - 1)
if not self.playback.queue:
self.playback.stop()
self.playback.queueChanged.emit(self.playback.queue.copy())
def clearQueue(self):
"""Clear the entire queue."""
self.playback.clearQueue()
# ============ Playback updates ============
def startPlaybackWithSongs(self, songs: List[Song], label: str):
"""Replace queue with provided songs and start playback."""
if not songs:
AccessibleTextDialog.showWarning("Nothing to Play", f"No songs found for {label}.", parent=self)
return
self.playback.clearQueue()
self.playback.enqueueMany(songs, playFirst=True)
self.statusBar.showMessage(f"Playing {label}", 4000)
self.enablePlaybackActions(True)
def playAlbum(self, albumId: str, item: Optional[QTreeWidgetItem] = None):
"""Play all songs in an album."""
if not self.client or not albumId:
self.announceLive("No album selected.")
return
self.statusBar.showMessage("Loading album...", 2000)
self._run_background(
"play album",
lambda: self._fetchAlbumWithSongs(albumId),
on_success=lambda data: self._onAlbumLoadedForPlay(albumId, item, data),
on_error=lambda err: AccessibleTextDialog.showError("Album Error", str(err), parent=self)
)
def playArtist(self, artistId: str, item: Optional[QTreeWidgetItem] = None):
"""Play all songs by an artist across their albums."""
if not self.client or not artistId:
self.announceLive("No artist selected.")
return
self.statusBar.showMessage("Loading artist...", 2000)
self._run_background(
"play artist",
lambda: self._fetchArtistSongs(artistId),
on_success=lambda payload: self._onArtistLoadedForPlay(item, payload),
on_error=lambda err: AccessibleTextDialog.showError("Artist Error", str(err), parent=self)
)
def playAllArtists(self):
"""Play all songs across all artists."""
if not self.client:
self.announceLive("Not connected to a server.")
return
self.announceLive("Loading all artists...")
self._run_background(
"play all artists",
self._progressiveLoadAllArtists,
on_success=lambda count: self._announceLiveIfAny(count, "all artists"),
on_error=lambda err: AccessibleTextDialog.showError("Library Error", str(err), parent=self)
)
def playGenre(self, genreName: str, item: Optional[QTreeWidgetItem] = None):
"""Play all songs within a genre."""
if not self.client or not genreName:
self.announceLive("No genre selected.")
return
self.statusBar.showMessage("Loading genre...", 2000)
self._run_background(
"play genre",
lambda: self._fetchGenreSongsForPlay(genreName),
on_success=lambda result: self._onGenreLoadedForPlay(genreName, item, result),
on_error=lambda err: AccessibleTextDialog.showError("Genre Error", str(err), parent=self)
)
def playPlaylist(self, playlistId: str, item: Optional[QTreeWidgetItem] = None):
"""Load a playlist and start playback."""
self.statusBar.showMessage("Loading playlist...", 2000)
self._run_background(
"play playlist",
lambda: self.client.getPlaylist(playlistId),
on_success=lambda playlist: self._onPlaylistLoadedForPlay(item, playlist),
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
)
def playFavorites(self):
"""Play all starred songs."""
if not self.client:
self.announceLive("Not connected to a server.")
return
self.statusBar.showMessage("Loading favorites...", 2000)
self._run_background(
"play favorites",
lambda: self.client.getStarred(),
on_success=lambda starred: self.startPlaybackWithSongs(starred.get("songs", []), "favorites"),
on_error=lambda err: AccessibleTextDialog.showError("Favorites Error", str(err), parent=self)
)
# ============ Background fetch helpers for playback ============
def _fetchAlbumWithSongs(self, albumId: str) -> Tuple[Optional[Album], List[Song]]:
"""Fetch album metadata and songs."""
albumData = self.client.getAlbum(albumId)
return albumData.get("album"), albumData.get("songs", [])
def _onAlbumLoadedForPlay(self, albumId: str, item: Optional[QTreeWidgetItem], data: Tuple[Optional[Album], List[Song]]):
"""Start playback for a loaded album and populate UI if needed."""
album, songs = data
self.startPlaybackWithSongs(songs, f"album {album.name if album else albumId}")
if item and songs:
self._populateAlbumItemWithSongs(item, songs)
item.setExpanded(True)
def _fetchArtistSongs(self, artistId: str) -> Tuple[Optional[Artist], List[Song]]:
"""Fetch all songs for an artist across albums with throttling."""
if self._shutting_down:
return None, []
artistData = self.client.getArtist(artistId)
artist: Optional[Artist] = artistData.get("artist")
albums: List[Album] = artistData.get("albums", [])
songs: List[Song] = []
request_count = 0
for album in albums:
if self._shutting_down:
break
albumData = self.client.getAlbum(album.id)
songs.extend(albumData.get("songs", []))
request_count += 1
self._throttle_requests(request_count)
return artist, songs
def _onArtistLoadedForPlay(self, item: Optional[QTreeWidgetItem], payload: Tuple[Optional[Artist], List[Song]]):
"""Start playback for a loaded artist and expand UI if present."""
artist, songs = payload
label = artist.name if artist else "artist"
self.startPlaybackWithSongs(songs, label)
if item:
item.setExpanded(True)
def _progressiveLoadAllArtists(self) -> int:
"""Fetch songs across all artists in chunks and enqueue progressively."""
if self._shutting_down:
return 0
artists: List[Artist] = self.client.getArtists()
batch: List[Song] = []
request_count = 0
total = 0
first_batch = True
for artist in artists:
if self._shutting_down:
break
artistData = self.client.getArtist(artist.id)
for album in artistData.get("albums", []):
if self._shutting_down:
break
albumData = self.client.getAlbum(album.id)
songs = albumData.get("songs", [])
batch.extend(songs)
total += len(songs)
request_count += 1
if len(batch) >= self.bulkChunkSize:
self._on_ui(lambda b=batch.copy(), first=first_batch: self._enqueueBatchForPlayback(b, "all artists", first))
batch.clear()
first_batch = False
self._throttle_requests(request_count)
if batch and not self._shutting_down:
self._on_ui(lambda b=batch.copy(), first=first_batch: self._enqueueBatchForPlayback(b, "all artists", first))
return total
def _enqueueBatchForPlayback(self, batch: List[Song], label: str, first: bool):
"""Enqueue a batch of songs, starting playback on the first batch."""
if not batch:
return
if first:
self.playback.clearQueue()
self.playback.enqueueMany(batch, playFirst=True)
self.statusBar.showMessage(f"Playing {label}", 4000)
else:
self.playback.enqueueMany(batch, playFirst=False)
self.statusBar.showMessage(f"Queued {len(batch)} more tracks from {label}", 2000)
self.enablePlaybackActions(True)
def _announceLiveIfAny(self, count: int, label: str):
"""Announce completion if any tracks were queued."""
if count > 0:
self.announceLive(f"Queued {count} tracks from {label}")
def _fetchGenreSongsForPlay(self, genreName: str) -> Tuple[List[Song], bool]:
"""Fetch songs for a genre with throttling for playback."""
songs = self.client.getSongsByGenre(genreName)
return songs, False
def _onGenreLoadedForPlay(self, genreName: str, item: Optional[QTreeWidgetItem], result: Tuple[List[Song], bool]):
"""Start playback for a genre and populate UI if present."""
songs, truncated = result
self.startPlaybackWithSongs(songs, f"genre {genreName}")
if item and songs:
self._populateGenreItemWithSongs(item, songs)
item.setExpanded(True)
def _onPlaylistLoadedForPlay(self, item: Optional[QTreeWidgetItem], playlist: Playlist):
"""Start playback for a playlist and populate UI if present."""
songs = playlist.songs
if not songs:
AccessibleTextDialog.showWarning("Playlist Empty", "This playlist has no tracks.", parent=self)
return
self.playback.clearQueue()
self.playback.enqueueMany(songs, playFirst=True)
self.statusBar.showMessage(f"Playing playlist {playlist.name}", 4000)
if item:
item.takeChildren()
for song in songs:
label = self.formatSongLabel(song, includeAlbum=True)
child = QTreeWidgetItem([label])
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
child.setData(0, Qt.AccessibleTextRole, label)
item.addChild(child)
item.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": True})
self.enablePlaybackActions(True)
def onTrackChanged(self, song: Song):
"""Update UI when track changes."""
self.nowPlayingInfo.setText(f"{song.title}{song.artist} ({song.album})")
self.announceTrackAction.setEnabled(True)
self.announcePositionAction.setEnabled(True)
self.positionSlider.setEnabled(True)
self.favoriteAction.setEnabled(True)
self.unfavoriteAction.setEnabled(True)
if self.announceTrackChangesEnabled and self._announceOnTrackChange:
self.announceCurrentTrackLive()
self._announceOnTrackChange = False
def onPlaybackStateChanged(self, state: str):
"""Update playback state labels and actions."""
self.playbackStatus.setText(state.capitalize())
if state in ("playing", "paused"):
self.playButton.setText("Pause" if state == "playing" else "Play")
self.playPauseAction.setText("Pause" if state == "playing" else "Play")
else:
self.playButton.setText("Play/Pause")
self.playPauseAction.setText("Play/Pause")
def onPositionChanged(self, position: int, duration: int):
"""Update seek slider and position label."""
if duration <= 0:
self.positionLabel.setText("00:00 / 00:00")
return
self.positionLabel.setText(f"{self.formatTime(position)} / {self.formatTime(duration)}")
sliderValue = int((position / duration) * 1000)
self.positionSlider.blockSignals(True)
self.positionSlider.setValue(sliderValue)
self.positionSlider.blockSignals(False)
def onPlaybackError(self, message: str):
"""Show playback errors accessibly."""
AccessibleTextDialog.showError("Playback Error", message, parent=self)
self.playbackStatus.setText("Error")
def onRepeatToggled(self, checked: bool):
"""Cycle repeat mode."""
mode = "all" if checked else "none"
self.applyRepeatMode(mode)
def onShuffleToggled(self, checked: bool):
"""Handle shuffle toggle with announcement."""
self.setShuffleState(checked)
def onAnnounceTrackChangesToggled(self, checked: bool):
"""Persist preference for automatic track change announcements."""
self.announceTrackChangesEnabled = checked
self.settings.set("interface", "announceTrackChanges", checked)
self.announceLive(f"Announce track changes {'enabled' if checked else 'disabled'}")
def onSeekRequested(self, value: int):
"""Seek within the current track using slider position."""
duration = self.playback.player.duration()
if duration > 0:
target = int((value / 1000) * duration)
self.playback.seek(target)
def bringToFront(self):
"""Raise the window in response to MPRIS Raise."""
self.showNormal()
self.raise_()
self.activateWindow()
def setVolumePercent(self, volumePercent: int, announce: bool = False):
"""Set volume from 0-100 and sync UI/settings."""
volumePercent = max(0, min(100, volumePercent))
self.volume = volumePercent
self.playback.setVolume(volumePercent)
self.volumeStatus.setText(f"Volume: {self.volume}%")
self.settings.set("playback", "volume", self.volume)
if announce:
self.announceLive(f"Volume {self.volume} percent")
if self.mpris and self.mpris.available:
self.mpris.notifyVolumeChanged()
def setVolumeFromExternal(self, volumePercent: int):
"""Apply volume set via MPRIS without re-announcing."""
self.setVolumePercent(volumePercent, announce=False)
def adjustVolume(self, delta: int):
"""Adjust volume by a delta and update UI/settings."""
self.setVolumePercent(self.volume + delta)
def setShuffleState(self, enabled: bool, announce: bool = True, persist: bool = True):
"""Apply shuffle setting consistently for UI, playback, and persistence."""
self.shuffleAction.blockSignals(True)
self.shuffleAction.setChecked(enabled)
self.shuffleAction.blockSignals(False)
self.playback.setShuffle(enabled)
self.shuffleEnabled = enabled
if persist:
self.settings.set("playback", "shuffle", enabled)
if announce:
self.announceLive(f"Shuffle {'enabled' if enabled else 'disabled'}")
if self.mpris and self.mpris.available:
self.mpris.notifyShuffleChanged()
def setShuffleFromExternal(self, enabled: bool, announce: bool = False):
"""Handle shuffle updates coming from MPRIS."""
self.setShuffleState(enabled, announce=announce, persist=True)
def applyRepeatMode(self, mode: str, announce: bool = True, persist: bool = True):
"""Apply repeat mode ('none', 'one', 'all') across UI and playback."""
if mode not in ("none", "one", "all"):
mode = "none"
self.repeatAction.blockSignals(True)
self.repeatAction.setChecked(mode != "none")
if mode == "one":
self.repeatAction.setText("Repeat One")
elif mode == "all":
self.repeatAction.setText("Repeat All")
else:
self.repeatAction.setText("Repeat Off")
self.repeatAction.blockSignals(False)
self.repeatMode = mode
self.playback.setRepeatMode(mode)
if persist:
self.settings.set("playback", "repeat", mode)
if announce:
self.announceLive(f"Repeat {'enabled' if mode != 'none' else 'disabled'}")
if self.mpris and self.mpris.available:
self.mpris.notifyLoopStatus()
def setRepeatFromExternal(self, loopStatus: str, announce: bool = False):
"""Map MPRIS loop status to internal repeat modes."""
mode = {"Track": "one", "Playlist": "all", "None": "none"}.get(loopStatus, "none")
self.applyRepeatMode(mode, announce=announce, persist=True)
def announceTrack(self):
"""Speak current track details via an accessible dialog."""
text = self.nowPlayingInfo.text()
AccessibleTextDialog.showInfo("Current Track", text, parent=self)
def announcePosition(self):
"""Speak current playback position."""
AccessibleTextDialog.showInfo("Playback Position", self.positionLabel.text(), parent=self)
def announceCurrentTrackLive(self):
"""Announce the currently playing track via the live region."""
song = self.playback.currentSong()
message = self.buildTrackAnnouncement(song)
self.announceLive(message)
def enablePlaybackActions(self, enabled: bool):
"""Enable or disable playback-related actions."""
self.playPauseAction.setEnabled(enabled)
self.stopAction.setEnabled(enabled)
self.previousAction.setEnabled(enabled)
self.nextAction.setEnabled(enabled)
self.shuffleAction.setEnabled(True)
self.repeatAction.setEnabled(True)
self.volumeUpAction.setEnabled(True)
self.volumeDownAction.setEnabled(True)
self.favoriteAction.setEnabled(enabled)
self.unfavoriteAction.setEnabled(enabled)
self.prevButton.setEnabled(enabled)
self.playButton.setEnabled(enabled)
self.stopButton.setEnabled(enabled)
self.nextButton.setEnabled(enabled)
self.positionSlider.setEnabled(enabled)
# ============ Helpers ============
def _setLoadingPlaceholder(self, item: QTreeWidgetItem, text: str):
"""Replace children with a single loading placeholder."""
item.takeChildren()
item.addChild(QTreeWidgetItem([text]))
def _markItemLoaded(self, item: QTreeWidgetItem):
"""Mark a tree item as loaded in its user data."""
data: Dict = item.data(0, Qt.UserRole) or {}
data["loaded"] = True
item.setData(0, Qt.UserRole, data)
def _handleItemLoadError(self, item: QTreeWidgetItem, message: str):
"""Show a load error and restore a placeholder child."""
AccessibleTextDialog.showError("Library Error", f"Could not load content: {message}", parent=self)
self._setLoadingPlaceholder(item, "Failed to load.")
def _run_background(
self,
label: str,
func: Callable[[], Any],
on_success: Optional[Callable[[Any], None]] = None,
on_error: Optional[Callable[[Exception], None]] = None
):
"""Execute blocking work off the UI thread and marshal results back."""
self.logger.info("Starting background task: %s", label)
future = self._executor.submit(func)
def _done(fut):
try:
result = fut.result()
except Exception as exc: # noqa: BLE001
self.logger.error("Background task '%s' failed: %s", label, exc)
if on_error:
self._on_ui(lambda: on_error(exc))
return
if on_success:
self._on_ui(lambda: on_success(result))
self.logger.info("Background task '%s' completed", label)
future.add_done_callback(_done)
def _on_ui(self, callback: Callable[[], None]):
"""Post a callable to the Qt event loop with error logging."""
def wrapped():
try:
callback()
except Exception as exc: # noqa: BLE001
self.logger.exception("UI callback failed: %s", exc)
# Ensure it runs on the main thread even when scheduled from workers
QTimer.singleShot(0, self, wrapped)
def _throttle_requests(self, count: int):
"""Sleep briefly every N requests to be gentle on the server.
Skips throttling for the first bulkThrottleStartAfter requests so
playback can start quickly, then throttles subsequent requests.
"""
if self.bulkRequestBatch <= 0 or self.bulkThrottleSeconds <= 0:
return
if count <= self.bulkThrottleStartAfter:
return
if count % self.bulkRequestBatch == 0:
time.sleep(self.bulkThrottleSeconds)
def keyPressEvent(self, event):
"""Handle high-priority shortcuts that might be consumed by child widgets."""
if event.matches(QKeySequence.Find):
self.openSearchDialog()
event.accept()
return
super().keyPressEvent(event)
@staticmethod
def formatTime(ms: int) -> str:
"""Convert milliseconds to mm:ss or hh:mm:ss."""
totalSeconds = ms // 1000
hours = totalSeconds // 3600
minutes = (totalSeconds % 3600) // 60
seconds = totalSeconds % 60
if hours:
return f"{hours}:{minutes:02d}:{seconds:02d}"
return f"{minutes:02d}:{seconds:02d}"
def copyToClipboard(self):
"""Allow AccessibleTreeWidget to copy the current item's text."""
item = self.libraryTree.currentItem()
if item:
QGuiApplication.clipboard().setText(item.text(0))
def buildTrackAnnouncement(self, song: Optional[Song]) -> str:
"""Construct a live announcement for the current track with fallbacks."""
if not song:
return "No track playing."
title = (song.title or "").strip()
artist = (song.artist or "").strip()
album = (song.album or "").strip()
filename = Path(song.path).name if song.path else ""
if title and artist and album:
return f"{title} by {artist} from {album}"
if title and artist:
return f"{title} by {artist}"
if filename:
return filename
if title:
return title
return "Unknown track"
def buildFavoriteAnnouncement(self, action: str, song: Optional[Song], label: str) -> str:
"""Create an announcement for favorite/unfavorite actions with track fallbacks."""
base = self.buildTrackAnnouncement(song) if song else (label or "item")
verb = "added to favorites" if action == "favorite" else "removed from favorites"
return f"{base} {verb}"
def buildFavoriteFailureAnnouncement(self, action: str, song: Optional[Song], label: str) -> str:
"""Create an error announcement for favorite/unfavorite actions."""
base = self.buildTrackAnnouncement(song) if song else (label or "item")
verb = "favorite" if action == "favorite" else "unfavorite"
return f"Could not {verb} {base}"
def _resolveCoverArtUrl(self, coverId: Optional[str]) -> Optional[str]:
"""Provide a cover art URL for MPRIS metadata when possible."""
if not coverId or not self.client:
return None
try:
return self.client.getCoverArtUrl(coverId)
except Exception:
return None
def announceLive(self, text: str):
"""Update a live region and status bar for screen readers."""
self.liveRegion.setText(text)
self.statusBar.showMessage(text, 2000)
try:
# Prefer explicit announcement event where supported
ann = QAccessibleAnnouncementEvent(self.liveRegion, text)
QAccessible.updateAccessibility(ann)
except Exception:
try:
event = QAccessibleEvent(self.liveRegion, QAccessible.Event.Alert)
QAccessible.updateAccessibility(event)
except Exception:
# As a last resort, change accessible name to trigger a change event
self.liveRegion.setAccessibleName(text)
# Shortcut handlers with feedback when no playback is active
def handlePreviousShortcut(self, checked: bool = False):
"""Go to previous track or announce why it cannot."""
if not self.playback.queue:
self.announceLive("No tracks in the queue.")
return
if not self.playback.currentSong():
self.announceLive("No track playing.")
return
self._announceOnTrackChange = True
self.playback.previous()
def handlePlayShortcut(self, checked: bool = False):
"""Start or resume playback, announcing if nothing is queued."""
if not self.playback.queue:
self.announceLive("No tracks in the queue.")
return
self.playback.play()
def handlePauseToggleShortcut(self, checked: bool = False):
"""Toggle pause or announce when nothing is playing."""
if not self.playback.currentSong():
message = "No tracks in the queue." if not self.playback.queue else "No track playing."
self.announceLive(message)
return
self.playPauseAction.trigger()
def handleStopShortcut(self, checked: bool = False):
"""Stop playback or announce when nothing is playing."""
if not self.playback.currentSong():
self.announceLive("Nothing is playing.")
return
self.stopAction.trigger()
def handleNextShortcut(self, checked: bool = False):
"""Advance to next track or announce when it is unavailable."""
if not self.playback.queue:
self.announceLive("No tracks in the queue.")
return
if not self.playback.currentSong():
self.announceLive("No track playing.")
return
self._announceOnTrackChange = True
self.playback.next()
def applyPersistedPlaybackSettings(self):
"""Restore saved shuffle and repeat preferences without announcing."""
shuffleEnabled = bool(self.shuffleEnabled)
repeatMode = self.repeatMode if self.repeatMode in ("none", "one", "all") else "none"
self.setShuffleState(shuffleEnabled, announce=False, persist=False)
self.applyRepeatMode(repeatMode, announce=False, persist=False)
self.announceTrackChangesAction.blockSignals(True)
self.announceTrackChangesAction.setChecked(self.announceTrackChangesEnabled)
self.announceTrackChangesAction.blockSignals(False)
def _handleQueueChangedForMpris(self, queue: List[Song]):
"""Update MPRIS capabilities and metadata when the queue changes."""
if not self.mpris or not self.mpris.available:
return
self.mpris.notifyCapabilities()
if not queue:
self.mpris.updateMetadata(None)
def getCurrentMediaTarget(self):
"""Return (type, id, label, song) for the most relevant selection."""
# Library tree selection
item = self.libraryTree.currentItem()
if item:
data: Dict = item.data(0, Qt.UserRole) or {}
itemType = data.get("type")
if itemType == "song":
song: Song = data["song"]
label = self.buildTrackAnnouncement(song)
return ("song", song.id, label, song)
if itemType in ("album", "artist"):
label = item.text(0)
return (itemType, data.get("id"), label, None)
# Queue selection
row = self.queueList.currentRow()
if 0 <= row < len(self.playback.queue):
song = self.playback.queue[row]
label = self.buildTrackAnnouncement(song)
return ("song", song.id, label, song)
# Now playing
current = self.playback.currentSong()
if current:
label = self.buildTrackAnnouncement(current)
return ("song", current.id, label, current)
return None
def favoriteSelection(self):
"""Favorite the selected or currently playing item."""
target = self.getCurrentMediaTarget()
if not target or not self.client:
AccessibleTextDialog.showWarning("Favorite", "No item selected to favorite.", parent=self)
return
itemType, itemId, label, song = target
if itemType != "song":
if not self._confirmBulkAction("favorite", label):
self.statusBar.showMessage("Favorite canceled", 2000)
return
try:
self.client.star(itemId, itemType)
message = self.buildFavoriteAnnouncement("favorite", song, label)
self.statusBar.showMessage(message, 3000)
self.announceLive(message)
except Exception as e:
self.announceLive(self.buildFavoriteFailureAnnouncement("favorite", song, label))
AccessibleTextDialog.showError("Favorite Failed", str(e), parent=self)
def unfavoriteSelection(self):
"""Unfavorite the selected or currently playing item."""
target = self.getCurrentMediaTarget()
if not target or not self.client:
AccessibleTextDialog.showWarning("Unfavorite", "No item selected to unfavorite.", parent=self)
return
itemType, itemId, label, song = target
if itemType != "song":
if not self._confirmBulkAction("unfavorite", label):
self.statusBar.showMessage("Unfavorite canceled", 2000)
return
try:
self.client.unstar(itemId, itemType)
message = self.buildFavoriteAnnouncement("unfavorite", song, label)
self.statusBar.showMessage(message, 3000)
self.announceLive(message)
except Exception as e:
self.announceLive(self.buildFavoriteFailureAnnouncement("unfavorite", song, label))
AccessibleTextDialog.showError("Unfavorite Failed", str(e), parent=self)
def showAbout(self):
"""Show the about dialog"""
AccessibleTextDialog.showInfo(
"About NaviPy",
"NaviPy - Accessible Navidrome Client\n\n"
"An accessible music player for Navidrome servers.\n\n"
"Built with PySide6 for full keyboard navigation "
"and screen reader compatibility.\n\n"
"Version: 0.1.0 (Development)",
parent=self
)
def closeEvent(self, event):
"""Handle window close"""
# Signal background tasks to stop
self._shutting_down = True
# Signal cached client to stop any running sync
if self.client:
self.client.shutdown()
self.settings.set("playback", "volume", self.volume)
self.settings.save()
# Stop playback and release media resources
try:
self.playback.player.stop()
self.playback.player.setSource("")
except Exception:
pass
# Shutdown MPRIS (stops GLib main loop thread)
if self.mpris:
try:
self.mpris.shutdown()
except Exception:
pass
# Shutdown thread pool - don't wait for tasks
if self._executor:
self._executor.shutdown(wait=False, cancel_futures=True)
self.logger.info("Main window closed")
event.accept()
# Schedule forced exit after event loop processes the close
def forceExit():
import os
os._exit(0)
QTimer.singleShot(100, forceExit)