2276 lines
100 KiB
Python
2276 lines
100 KiB
Python
"""Main application window for NaviPy."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from PySide6.QtWidgets import (
|
|
QHBoxLayout,
|
|
QLabel,
|
|
QMenu,
|
|
QListWidget,
|
|
QListWidgetItem,
|
|
QMainWindow,
|
|
QMessageBox,
|
|
QPushButton,
|
|
QSlider,
|
|
QSplitter,
|
|
QStatusBar,
|
|
QTreeWidgetItem,
|
|
QVBoxLayout,
|
|
QWidget,
|
|
)
|
|
from PySide6.QtCore import QPoint, Qt, QTimer
|
|
from PySide6.QtGui import (
|
|
QAction,
|
|
QAccessible,
|
|
QAccessibleEvent,
|
|
QAccessibleAnnouncementEvent,
|
|
QGuiApplication,
|
|
QKeySequence,
|
|
QShortcut,
|
|
)
|
|
|
|
from src.accessibility.accessible_tree import AccessibleTreeWidget
|
|
from src.api.client import SubsonicClient, SubsonicError
|
|
from src.api.models import Album, Artist, Playlist, Song, Genre
|
|
from src.cache import CachedClient
|
|
from src.config.settings import Settings, getDataDir
|
|
from src.managers.playback_manager import PlaybackManager
|
|
from src.integrations.mpris import MprisService
|
|
from src.widgets.accessible_text_dialog import AccessibleTextDialog
|
|
from src.widgets.add_to_playlist_dialog import AddToPlaylistDialog
|
|
from src.widgets.search_dialog import SearchDialog
|
|
from src.widgets.server_dialog import ServerDialog
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
"""Main NaviPy application window"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("NaviPy - Navidrome Client")
|
|
self.setMinimumSize(980, 640)
|
|
|
|
self.settings = Settings()
|
|
self.client: Optional[CachedClient] = None
|
|
self._rawClient: Optional[SubsonicClient] = None
|
|
self.playback = PlaybackManager(self)
|
|
self.mpris: Optional[MprisService] = None
|
|
self._executor = ThreadPoolExecutor(max_workers=4)
|
|
self._shutting_down = False
|
|
self.logger = logging.getLogger("navipy.main_window")
|
|
self.maxBulkSongs: Optional[int] = None # Unlimited; use throttling instead
|
|
self.bulkRequestBatch = 1 # Throttle every N requests to be nice to servers
|
|
self.bulkThrottleSeconds = 0.05
|
|
self.bulkChunkSize = 500
|
|
self.bulkThrottleStartAfter = 50 # Don't throttle first N requests for quick playback start
|
|
self._startupSyncInterval = timedelta(minutes=60)
|
|
self.logger.info("MainWindow initialized")
|
|
|
|
self.pageStep = self.settings.get("interface", "pageStep", 5)
|
|
self.volume = self.settings.get("playback", "volume", 100)
|
|
self.shuffleEnabled = self.settings.get("playback", "shuffle", False)
|
|
self.repeatMode = self.settings.get("playback", "repeat", "none")
|
|
self.announceTrackChangesEnabled = self.settings.get("interface", "announceTrackChanges", True)
|
|
self._announceOnTrackChange = False
|
|
|
|
# Timer for repeating status announcements during long operations
|
|
self._statusAnnounceTimer = QTimer(self)
|
|
self._statusAnnounceTimer.timeout.connect(self._announceCurrentStatus)
|
|
self._currentStatusMessage = ""
|
|
|
|
self.setupMenus()
|
|
self.setupUi()
|
|
self.connectSignals()
|
|
self.applyPersistedPlaybackSettings()
|
|
self.enablePlaybackActions(False)
|
|
|
|
self.playback.setVolume(self.volume)
|
|
self.setupMpris()
|
|
|
|
# Connect to server if one is configured
|
|
if self.settings.hasServers():
|
|
self.logger.info("Servers present, attempting default connection")
|
|
self.connectToDefaultServer()
|
|
else:
|
|
self.logger.info("No servers configured; showing setup dialog")
|
|
self.showServerSetup()
|
|
|
|
# ============ UI setup ============
|
|
|
|
def setupMenus(self):
|
|
"""Setup the menu bar"""
|
|
menuBar = self.menuBar()
|
|
menuBar.setAccessibleName("Menu Bar")
|
|
|
|
# File menu
|
|
fileMenu = menuBar.addMenu("&File")
|
|
|
|
self.connectAction = QAction("&Connect to Server...", self)
|
|
self.connectAction.setShortcut(QKeySequence("Ctrl+O"))
|
|
self.connectAction.triggered.connect(self.showServerSetup)
|
|
fileMenu.addAction(self.connectAction)
|
|
|
|
fileMenu.addSeparator()
|
|
|
|
self.refreshAction = QAction("&Refresh Library", self)
|
|
self.refreshAction.setShortcut(QKeySequence("F5"))
|
|
self.refreshAction.triggered.connect(self.refreshLibrary)
|
|
self.refreshAction.setEnabled(False)
|
|
fileMenu.addAction(self.refreshAction)
|
|
|
|
self.fullSyncAction = QAction("Full Library &Sync", self)
|
|
self.fullSyncAction.setShortcut(QKeySequence("Shift+F5"))
|
|
self.fullSyncAction.triggered.connect(self.refreshLibraryFull)
|
|
self.fullSyncAction.setEnabled(False)
|
|
fileMenu.addAction(self.fullSyncAction)
|
|
|
|
fileMenu.addSeparator()
|
|
|
|
self.quitAction = QAction("&Quit", self)
|
|
self.quitAction.setShortcut(QKeySequence("Ctrl+Q"))
|
|
self.quitAction.triggered.connect(self.close)
|
|
fileMenu.addAction(self.quitAction)
|
|
|
|
# Playback menu
|
|
playbackMenu = menuBar.addMenu("&Playback")
|
|
|
|
self.playPauseAction = QAction("&Play/Pause", self)
|
|
self.playPauseAction.setShortcut(QKeySequence("Space"))
|
|
self.playPauseAction.setEnabled(False)
|
|
playbackMenu.addAction(self.playPauseAction)
|
|
|
|
self.stopAction = QAction("&Stop", self)
|
|
self.stopAction.setEnabled(False)
|
|
playbackMenu.addAction(self.stopAction)
|
|
|
|
playbackMenu.addSeparator()
|
|
|
|
self.previousAction = QAction("Pre&vious Track", self)
|
|
self.previousAction.setShortcut(QKeySequence("Ctrl+Left"))
|
|
self.previousAction.setEnabled(False)
|
|
playbackMenu.addAction(self.previousAction)
|
|
|
|
self.nextAction = QAction("&Next Track", self)
|
|
self.nextAction.setShortcut(QKeySequence("Ctrl+Right"))
|
|
self.nextAction.setEnabled(False)
|
|
playbackMenu.addAction(self.nextAction)
|
|
|
|
playbackMenu.addSeparator()
|
|
|
|
self.shuffleAction = QAction("&Shuffle", self)
|
|
self.shuffleAction.setShortcut(QKeySequence("Alt+S"))
|
|
self.shuffleAction.setCheckable(True)
|
|
playbackMenu.addAction(self.shuffleAction)
|
|
|
|
self.repeatAction = QAction("&Repeat Off", self)
|
|
self.repeatAction.setShortcut(QKeySequence("Alt+R"))
|
|
self.repeatAction.setCheckable(True)
|
|
playbackMenu.addAction(self.repeatAction)
|
|
|
|
playbackMenu.addSeparator()
|
|
|
|
self.favoriteAction = QAction("&Favorite", self)
|
|
self.favoriteAction.setShortcut(QKeySequence("+"))
|
|
self.favoriteAction.setEnabled(False)
|
|
playbackMenu.addAction(self.favoriteAction)
|
|
|
|
self.unfavoriteAction = QAction("Un&favorite", self)
|
|
self.unfavoriteAction.setShortcut(QKeySequence("_"))
|
|
self.unfavoriteAction.setEnabled(False)
|
|
playbackMenu.addAction(self.unfavoriteAction)
|
|
|
|
playbackMenu.addSeparator()
|
|
|
|
self.volumeUpAction = QAction("Volume &Up", self)
|
|
self.volumeUpAction.setShortcut(QKeySequence("Ctrl+Up"))
|
|
self.volumeUpAction.setEnabled(False)
|
|
playbackMenu.addAction(self.volumeUpAction)
|
|
|
|
self.volumeDownAction = QAction("Volume &Down", self)
|
|
self.volumeDownAction.setShortcut(QKeySequence("Ctrl+Down"))
|
|
self.volumeDownAction.setEnabled(False)
|
|
playbackMenu.addAction(self.volumeDownAction)
|
|
|
|
# View menu
|
|
viewMenu = menuBar.addMenu("&View")
|
|
|
|
self.searchAction = QAction("&Search...\tCtrl+F", self)
|
|
self.searchAction.setEnabled(False)
|
|
viewMenu.addAction(self.searchAction)
|
|
|
|
viewMenu.addSeparator()
|
|
|
|
self.announceTrackAction = QAction("Announce Current &Track", self)
|
|
self.announceTrackAction.setShortcut(QKeySequence("Ctrl+T"))
|
|
self.announceTrackAction.setEnabled(False)
|
|
viewMenu.addAction(self.announceTrackAction)
|
|
|
|
self.announcePositionAction = QAction("Announce &Position", self)
|
|
self.announcePositionAction.setShortcut(QKeySequence("Ctrl+P"))
|
|
self.announcePositionAction.setEnabled(False)
|
|
viewMenu.addAction(self.announcePositionAction)
|
|
|
|
self.announceTrackChangesAction = QAction("Announce Track &Changes", self)
|
|
self.announceTrackChangesAction.setCheckable(True)
|
|
self.announceTrackChangesAction.setChecked(self.announceTrackChangesEnabled)
|
|
viewMenu.addAction(self.announceTrackChangesAction)
|
|
|
|
# Help menu
|
|
helpMenu = menuBar.addMenu("&Help")
|
|
|
|
self.aboutAction = QAction("&About NaviPy", self)
|
|
self.aboutAction.triggered.connect(self.showAbout)
|
|
helpMenu.addAction(self.aboutAction)
|
|
|
|
def setupUi(self):
|
|
"""Setup the main UI"""
|
|
centralWidget = QWidget()
|
|
self.setCentralWidget(centralWidget)
|
|
mainLayout = QHBoxLayout(centralWidget)
|
|
|
|
# Create splitter for resizable panels
|
|
self.splitter = QSplitter(Qt.Horizontal)
|
|
self.splitter.setAccessibleName("Main Content")
|
|
|
|
# Left panel - Library browser
|
|
leftPanel = QWidget()
|
|
leftLayout = QVBoxLayout(leftPanel)
|
|
|
|
libraryLabel = QLabel("Library Browser")
|
|
libraryLabel.setAccessibleName("Library Panel")
|
|
leftLayout.addWidget(libraryLabel)
|
|
|
|
self.libraryStatus = QLabel("Connect to a server to browse your library")
|
|
self.libraryStatus.setAccessibleName("Library Status")
|
|
leftLayout.addWidget(self.libraryStatus)
|
|
|
|
self.libraryTree = AccessibleTreeWidget()
|
|
self.libraryTree.setHeaderHidden(True)
|
|
self.libraryTree.setAccessibleName("Library Tree")
|
|
self.libraryTree.setRootIsDecorated(True)
|
|
self.libraryTree.setItemsExpandable(True)
|
|
self.libraryTree.setContextMenuPolicy(Qt.CustomContextMenu)
|
|
self.libraryTree.pageStep = self.pageStep
|
|
self.libraryTree.setEnabled(False)
|
|
leftLayout.addWidget(self.libraryTree)
|
|
|
|
self.splitter.addWidget(leftPanel)
|
|
|
|
# Right panel - Now playing and queue
|
|
rightPanel = QWidget()
|
|
rightLayout = QVBoxLayout(rightPanel)
|
|
|
|
nowPlayingLabel = QLabel("Now Playing")
|
|
nowPlayingLabel.setAccessibleName("Now Playing Section")
|
|
rightLayout.addWidget(nowPlayingLabel)
|
|
|
|
self.nowPlayingInfo = QLabel("No track playing")
|
|
self.nowPlayingInfo.setAccessibleName("Current Track")
|
|
rightLayout.addWidget(self.nowPlayingInfo)
|
|
|
|
self.positionLabel = QLabel("00:00 / 00:00")
|
|
self.positionLabel.setAccessibleName("Playback Position")
|
|
rightLayout.addWidget(self.positionLabel)
|
|
|
|
self.positionSlider = QSlider(Qt.Horizontal)
|
|
self.positionSlider.setAccessibleName("Seek Slider")
|
|
self.positionSlider.setRange(0, 1000)
|
|
self.positionSlider.setEnabled(False)
|
|
rightLayout.addWidget(self.positionSlider)
|
|
|
|
controlsLayout = QHBoxLayout()
|
|
|
|
self.prevButton = QPushButton("Previous")
|
|
self.prevButton.setAccessibleName("Previous Track")
|
|
self.prevButton.setEnabled(False)
|
|
controlsLayout.addWidget(self.prevButton)
|
|
|
|
self.playButton = QPushButton("Play/Pause")
|
|
self.playButton.setAccessibleName("Play or Pause")
|
|
self.playButton.setEnabled(False)
|
|
controlsLayout.addWidget(self.playButton)
|
|
|
|
self.stopButton = QPushButton("Stop")
|
|
self.stopButton.setAccessibleName("Stop Playback")
|
|
self.stopButton.setEnabled(False)
|
|
controlsLayout.addWidget(self.stopButton)
|
|
|
|
self.nextButton = QPushButton("Next")
|
|
self.nextButton.setAccessibleName("Next Track")
|
|
self.nextButton.setEnabled(False)
|
|
controlsLayout.addWidget(self.nextButton)
|
|
|
|
rightLayout.addLayout(controlsLayout)
|
|
|
|
queueLabel = QLabel("Play Queue")
|
|
queueLabel.setAccessibleName("Queue Section")
|
|
rightLayout.addWidget(queueLabel)
|
|
|
|
self.queueList = QListWidget()
|
|
self.queueList.setAccessibleName("Queue List")
|
|
self.queueList.setSelectionMode(QListWidget.SingleSelection)
|
|
self.queueList.setEnabled(False)
|
|
rightLayout.addWidget(self.queueList)
|
|
|
|
queueButtons = QHBoxLayout()
|
|
|
|
self.removeQueueButton = QPushButton("Remove Selected")
|
|
self.removeQueueButton.setAccessibleName("Remove Selected Track from Queue")
|
|
self.removeQueueButton.setEnabled(False)
|
|
queueButtons.addWidget(self.removeQueueButton)
|
|
|
|
self.clearQueueButton = QPushButton("Clear Queue")
|
|
self.clearQueueButton.setAccessibleName("Clear Queue")
|
|
self.clearQueueButton.setEnabled(False)
|
|
queueButtons.addWidget(self.clearQueueButton)
|
|
|
|
rightLayout.addLayout(queueButtons)
|
|
rightLayout.addStretch()
|
|
|
|
self.splitter.addWidget(rightPanel)
|
|
|
|
self.splitter.setSizes([560, 420])
|
|
mainLayout.addWidget(self.splitter)
|
|
|
|
# Status bar
|
|
self.statusBar = QStatusBar()
|
|
self.setStatusBar(self.statusBar)
|
|
|
|
self.connectionStatus = QLabel("Not connected")
|
|
self.connectionStatus.setAccessibleName("Connection Status")
|
|
self.statusBar.addPermanentWidget(self.connectionStatus)
|
|
|
|
self.playbackStatus = QLabel("Stopped")
|
|
self.playbackStatus.setAccessibleName("Playback Status")
|
|
self.statusBar.addPermanentWidget(self.playbackStatus)
|
|
|
|
self.volumeStatus = QLabel(f"Volume: {self.volume}%")
|
|
self.volumeStatus.setAccessibleName("Volume Status")
|
|
self.statusBar.addPermanentWidget(self.volumeStatus)
|
|
|
|
self.liveRegion = QLabel("")
|
|
self.liveRegion.setAccessibleName("Live Region")
|
|
self.liveRegion.setAccessibleDescription("Announcements")
|
|
self.liveRegion.setStyleSheet("color: transparent;")
|
|
self.statusBar.addPermanentWidget(self.liveRegion, 1)
|
|
|
|
def connectSignals(self):
|
|
"""Connect signals after widgets exist."""
|
|
self.libraryTree.itemExpanded.connect(self.onLibraryExpanded)
|
|
self.libraryTree.itemActivated.connect(self.onLibraryActivated)
|
|
self.libraryTree.customContextMenuRequested.connect(self.onLibraryContextMenu)
|
|
|
|
self.playButton.clicked.connect(self.playPauseAction.trigger)
|
|
self.prevButton.clicked.connect(self.previousAction.trigger)
|
|
self.nextButton.clicked.connect(self.nextAction.trigger)
|
|
self.stopButton.clicked.connect(self.stopAction.trigger)
|
|
|
|
self.playPauseAction.triggered.connect(self.playback.togglePlayPause)
|
|
self.stopAction.triggered.connect(self.playback.stop)
|
|
self.previousAction.triggered.connect(self.handlePreviousShortcut)
|
|
self.nextAction.triggered.connect(self.handleNextShortcut)
|
|
self.shuffleAction.toggled.connect(self.onShuffleToggled)
|
|
self.repeatAction.toggled.connect(self.onRepeatToggled)
|
|
self.announceTrackChangesAction.toggled.connect(self.onAnnounceTrackChangesToggled)
|
|
self.volumeUpAction.triggered.connect(lambda: self.adjustVolume(5))
|
|
self.volumeDownAction.triggered.connect(lambda: self.adjustVolume(-5))
|
|
self.favoriteAction.triggered.connect(self.favoriteSelection)
|
|
self.unfavoriteAction.triggered.connect(self.unfavoriteSelection)
|
|
|
|
self.searchAction.triggered.connect(self.openSearchDialog)
|
|
|
|
self.positionSlider.sliderMoved.connect(self.onSeekRequested)
|
|
|
|
self.queueList.itemActivated.connect(self.onQueueItemActivated)
|
|
self.removeQueueButton.clicked.connect(self.removeSelectedFromQueue)
|
|
self.clearQueueButton.clicked.connect(self.clearQueue)
|
|
|
|
self.announceTrackAction.triggered.connect(self.announceTrack)
|
|
self.announcePositionAction.triggered.connect(self.announcePosition)
|
|
self.announceTrackLiveShortcut = QShortcut(QKeySequence("Alt+C"), self)
|
|
self.announceTrackLiveShortcut.activated.connect(self.announceCurrentTrackLive)
|
|
|
|
# Explicit shortcut to open search dialog regardless of focus
|
|
self.searchShortcut = QShortcut(QKeySequence("Ctrl+F"), self)
|
|
self.searchShortcut.setContext(Qt.ApplicationShortcut)
|
|
self.searchShortcut.activated.connect(self.openSearchDialog)
|
|
self.interactCurrentShortcut = QShortcut(QKeySequence("Alt+I"), self)
|
|
self.interactCurrentShortcut.setContext(Qt.ApplicationShortcut)
|
|
self.interactCurrentShortcut.activated.connect(self.showCurrentMediaContextMenu)
|
|
|
|
self.previousShortcut = QShortcut(QKeySequence("Z"), self)
|
|
self.previousShortcut.activated.connect(self.handlePreviousShortcut)
|
|
|
|
self.playShortcut = QShortcut(QKeySequence("X"), self)
|
|
self.playShortcut.activated.connect(self.handlePlayShortcut)
|
|
|
|
self.pauseToggleShortcut = QShortcut(QKeySequence("C"), self)
|
|
self.pauseToggleShortcut.activated.connect(self.handlePauseToggleShortcut)
|
|
|
|
self.stopShortcut = QShortcut(QKeySequence("V"), self)
|
|
self.stopShortcut.activated.connect(self.handleStopShortcut)
|
|
|
|
self.nextShortcut = QShortcut(QKeySequence("B"), self)
|
|
self.nextShortcut.activated.connect(self.handleNextShortcut)
|
|
|
|
self.playback.trackChanged.connect(self.onTrackChanged)
|
|
self.playback.queueChanged.connect(self.onQueueChanged)
|
|
self.playback.playbackStateChanged.connect(self.onPlaybackStateChanged)
|
|
self.playback.positionChanged.connect(self.onPositionChanged)
|
|
self.playback.errorOccurred.connect(self.onPlaybackError)
|
|
|
|
def setupMpris(self):
|
|
"""Initialize the MPRIS service and wire signal updates."""
|
|
try:
|
|
self.mpris = MprisService(
|
|
playback=self.playback,
|
|
art_url_resolver=self._resolveCoverArtUrl,
|
|
raise_callback=self.bringToFront,
|
|
quit_callback=self.close,
|
|
on_volume_changed=self.setVolumeFromExternal,
|
|
on_shuffle_changed=lambda enabled: self.setShuffleFromExternal(enabled, announce=False),
|
|
on_loop_changed=lambda loop: self.setRepeatFromExternal(loop, announce=False),
|
|
)
|
|
except Exception:
|
|
self.mpris = None
|
|
return
|
|
|
|
if not self.mpris or not self.mpris.available:
|
|
self.mpris = None
|
|
return
|
|
|
|
self.playback.trackChanged.connect(self.mpris.updateMetadata)
|
|
self.playback.playbackStateChanged.connect(self.mpris.updatePlaybackStatus)
|
|
self.playback.queueChanged.connect(self._handleQueueChangedForMpris)
|
|
self.playback.errorOccurred.connect(lambda _message: self.mpris.updatePlaybackStatus("stopped"))
|
|
self.mpris.updateMetadata(self.playback.currentSong())
|
|
self.mpris.updatePlaybackStatus("stopped")
|
|
self.mpris.notifyCapabilities()
|
|
self.mpris.notifyVolumeChanged()
|
|
self.mpris.notifyShuffleChanged()
|
|
self.mpris.notifyLoopStatus()
|
|
|
|
# Additional keyboard shortcuts for quick volume changes
|
|
self.volumeUpShortcut = QShortcut(QKeySequence("0"), self)
|
|
self.volumeUpShortcut.activated.connect(lambda: self.adjustVolume(5))
|
|
self.volumeDownShortcut = QShortcut(QKeySequence("9"), self)
|
|
self.volumeDownShortcut.activated.connect(lambda: self.adjustVolume(-5))
|
|
|
|
# ============ Connection handling ============
|
|
|
|
def showServerSetup(self):
|
|
"""Show the server setup dialog"""
|
|
dialog = ServerDialog(self.settings, parent=self)
|
|
if dialog.exec():
|
|
serverName = dialog.getServerName()
|
|
self.logger.info("Server setup completed, connecting to '%s'", serverName)
|
|
self.connectToServer(serverName)
|
|
else:
|
|
self.logger.info("Server setup dialog canceled")
|
|
|
|
def connectToDefaultServer(self):
|
|
"""Connect to the default server"""
|
|
defaultServer = self.settings.getDefaultServer()
|
|
if defaultServer:
|
|
self.logger.info("Connecting to default server '%s'", defaultServer)
|
|
self.connectToServer(defaultServer)
|
|
else:
|
|
servers = self.settings.getServers()
|
|
if servers:
|
|
self.logger.info("No default server set; connecting to first configured")
|
|
self.connectToServer(list(servers.keys())[0])
|
|
else:
|
|
self.logger.warning("connectToDefaultServer called with no servers configured")
|
|
|
|
def connectToServer(self, serverName: str):
|
|
"""Connect to a specific server"""
|
|
server = self.settings.getServer(serverName)
|
|
if not server:
|
|
AccessibleTextDialog.showError(
|
|
"Connection Error",
|
|
f"Server '{serverName}' not found in configuration",
|
|
parent=self
|
|
)
|
|
return
|
|
|
|
self.connectionStatus.setText(f"Connecting to {serverName}...")
|
|
self._run_background(
|
|
"connect",
|
|
lambda: self._ping_server(server),
|
|
on_success=lambda client: self._on_connection_success(serverName, client),
|
|
on_error=lambda err: self.handleConnectionFailure(f"Failed to connect: {err}")
|
|
)
|
|
|
|
def _ping_server(self, server: Dict[str, str]) -> SubsonicClient:
|
|
"""Create client and ping server off the UI thread."""
|
|
client = SubsonicClient(
|
|
server['url'],
|
|
server['username'],
|
|
server['password']
|
|
)
|
|
self.logger.info("Pinging %s", server.get("url"))
|
|
if not client.ping():
|
|
self.logger.error("Ping failed for %s", server.get("url"))
|
|
raise SubsonicError(70, "Failed to connect to server. Please check your settings.")
|
|
self.logger.info("Ping succeeded for %s", server.get("url"))
|
|
return client
|
|
|
|
def _on_connection_success(self, serverName: str, rawClient: SubsonicClient):
|
|
"""Handle successful connection on the UI thread."""
|
|
self.logger.info("Connected to %s, starting connected flow", serverName)
|
|
self._rawClient = rawClient
|
|
|
|
# Wrap with caching layer
|
|
self.client = CachedClient(
|
|
rawClient,
|
|
getDataDir(),
|
|
onSyncProgress=self._onSyncProgress
|
|
)
|
|
|
|
self.connectionStatus.setText(f"Connected to {serverName}")
|
|
self.playback.setStreamResolver(lambda song: self.client.getStreamUrl(song.id))
|
|
self.onConnected()
|
|
self.logger.info("Connected to %s", serverName)
|
|
|
|
def handleConnectionFailure(self, message: str):
|
|
"""Handle errors when connecting to the server."""
|
|
self.connectionStatus.setText("Connection failed")
|
|
AccessibleTextDialog.showError("Connection Error", message, parent=self)
|
|
self.logger.error("Connection failed: %s", message)
|
|
|
|
def onConnected(self):
|
|
"""Called when successfully connected to a server"""
|
|
self.logger.info("onConnected: enabling actions and loading library")
|
|
self.refreshAction.setEnabled(True)
|
|
self.fullSyncAction.setEnabled(True)
|
|
self.searchAction.setEnabled(True)
|
|
self.libraryTree.setEnabled(True)
|
|
|
|
# Load from cache first if available, then sync in background
|
|
if self.client and self.client.hasCache():
|
|
self.logger.info("Cache found, loading from cache and syncing in background")
|
|
self.libraryStatus.setText("Loading from cache...")
|
|
self._startStatusAnnouncements("Loading library from cache")
|
|
self._run_background(
|
|
"load from cache",
|
|
self._fetchLibraryData,
|
|
on_success=self._applyLibraryDataAndSync,
|
|
on_error=lambda err: self._handleLibraryLoadError(str(err))
|
|
)
|
|
else:
|
|
self.logger.info("No cache found, performing full sync")
|
|
self.libraryStatus.setText("Building library cache...")
|
|
self._startStatusAnnouncements("Building library cache, please wait")
|
|
self.refreshLibraryFull()
|
|
|
|
# ============ Library browsing ============
|
|
|
|
def refreshLibrary(self):
|
|
"""Refresh the library browser with incremental sync (F5)."""
|
|
if not self.client:
|
|
return
|
|
|
|
self.refreshAction.setEnabled(False)
|
|
self.fullSyncAction.setEnabled(False)
|
|
self.searchAction.setEnabled(False)
|
|
self.libraryStatus.setText("Syncing library changes...")
|
|
self._startStatusAnnouncements("Updating library cache")
|
|
self.logger.info("Refreshing library (incremental sync)...")
|
|
|
|
self._run_background(
|
|
"incremental sync",
|
|
self._incrementalSync,
|
|
on_success=self._onSyncComplete,
|
|
on_error=lambda err: self._handleLibraryLoadError(str(err))
|
|
)
|
|
|
|
def refreshLibraryFull(self):
|
|
"""Perform a full library sync (Shift+F5)."""
|
|
if not self.client:
|
|
return
|
|
|
|
self.refreshAction.setEnabled(False)
|
|
self.fullSyncAction.setEnabled(False)
|
|
self.searchAction.setEnabled(False)
|
|
self.libraryTree.clear()
|
|
self.libraryTree.setEnabled(False)
|
|
self.libraryStatus.setText("Full sync in progress...")
|
|
self._startStatusAnnouncements("Full library sync in progress")
|
|
self.logger.info("Refreshing library (full sync)...")
|
|
|
|
self._run_background(
|
|
"full sync",
|
|
self._fullSync,
|
|
on_success=self._onSyncComplete,
|
|
on_error=lambda err: self._handleLibraryLoadError(str(err))
|
|
)
|
|
|
|
def _incrementalSync(self) -> Dict[str, int]:
|
|
"""Perform incremental sync in background."""
|
|
if not self.client:
|
|
raise SubsonicError(70, "Not connected")
|
|
return self.client.syncIncremental()
|
|
|
|
def _fullSync(self) -> Dict[str, int]:
|
|
"""Perform full sync in background."""
|
|
if not self.client:
|
|
raise SubsonicError(70, "Not connected")
|
|
return self.client.syncFull()
|
|
|
|
def _onSyncComplete(self, stats: Dict[str, int]):
|
|
"""Handle sync completion - reload library from cache."""
|
|
self.logger.info("Sync complete: %s", stats)
|
|
# Now load the UI from the updated cache
|
|
self._run_background(
|
|
"load library",
|
|
self._fetchLibraryData,
|
|
on_success=self._applyLibraryData,
|
|
on_error=lambda err: self._handleLibraryLoadError(str(err))
|
|
)
|
|
|
|
def _applyLibraryDataAndSync(self, data: Dict[str, Any]):
|
|
"""Apply library data from cache, then start background sync."""
|
|
self.logger.info("Applying library data to UI...")
|
|
self._applyLibraryData(data)
|
|
if self._shouldRunStartupIncremental():
|
|
self.logger.info("Library UI populated, starting background sync")
|
|
self.libraryStatus.setText("Syncing...")
|
|
self._run_background(
|
|
"background sync",
|
|
self._incrementalSync,
|
|
on_success=self._onBackgroundSyncComplete,
|
|
on_error=lambda err: self.logger.warning("Background sync failed: %s", err)
|
|
)
|
|
else:
|
|
self.logger.info("Skipping startup incremental sync (recently synced)")
|
|
self.libraryStatus.setText("Library ready (recently synced)")
|
|
self.announceLive("Library ready")
|
|
|
|
def _onBackgroundSyncComplete(self, stats: Dict[str, int]):
|
|
"""Handle background sync completion - refresh UI if changes detected."""
|
|
self.logger.info("Background sync complete: %s", stats)
|
|
totalChanges = sum(stats.values())
|
|
if totalChanges > 0:
|
|
self.announceLive(f"Library synced: {totalChanges} items updated")
|
|
self.libraryStatus.setText("Library ready")
|
|
|
|
def _shouldRunStartupIncremental(self) -> bool:
|
|
"""Decide whether to run incremental sync at startup based on recent checks."""
|
|
if not self.client:
|
|
return False
|
|
last = self.client.getLastSyncTime("incremental")
|
|
if not last:
|
|
return True
|
|
try:
|
|
return datetime.now() - last > self._startupSyncInterval
|
|
except Exception:
|
|
return True
|
|
|
|
def _onSyncProgress(self, stage: str, current: int, total: int):
|
|
"""Handle sync progress updates on UI thread."""
|
|
if total > 0:
|
|
message = f"Syncing {stage}: {current}/{total}"
|
|
else:
|
|
message = f"Syncing {stage}..."
|
|
self.logger.info(message)
|
|
# Update the repeating announcement message
|
|
self._currentStatusMessage = message
|
|
# Capture message by value to avoid closure issues
|
|
self._on_ui(lambda msg=message: self.libraryStatus.setText(msg))
|
|
|
|
def _startStatusAnnouncements(self, message: str):
|
|
"""Start repeating status announcements every 10 seconds."""
|
|
self._currentStatusMessage = message
|
|
self.announceLive(message)
|
|
self._statusAnnounceTimer.start(10000)
|
|
|
|
def _stopStatusAnnouncements(self):
|
|
"""Stop repeating status announcements."""
|
|
self._statusAnnounceTimer.stop()
|
|
self._currentStatusMessage = ""
|
|
|
|
def _announceCurrentStatus(self):
|
|
"""Called by timer to repeat the current status message."""
|
|
if self._currentStatusMessage:
|
|
# Clear the live region first so screen readers re-announce identical content
|
|
self.announceLive("")
|
|
self.announceLive(self._currentStatusMessage)
|
|
|
|
def _fetchLibraryData(self) -> Dict[str, Any]:
|
|
"""Fetch core library sections in the background."""
|
|
if not self.client:
|
|
raise SubsonicError(70, "Not connected")
|
|
artists = self.client.getArtists()
|
|
playlists = self.client.getPlaylists()
|
|
genres = self.client.getGenres()
|
|
favorites = self.client.getStarred()
|
|
self.logger.info(
|
|
"Fetched library data (artists=%d, playlists=%d, genres=%d, favorites songs=%d)",
|
|
len(artists),
|
|
len(playlists),
|
|
len(genres),
|
|
len(favorites.get("songs", [])) if isinstance(favorites, dict) else 0,
|
|
)
|
|
return {
|
|
"artists": artists,
|
|
"playlists": playlists,
|
|
"genres": genres,
|
|
"favorites": favorites,
|
|
}
|
|
|
|
def _applyLibraryData(self, data: Dict[str, Any]):
|
|
"""Populate the library tree with fetched data on the UI thread."""
|
|
if not self.client:
|
|
return
|
|
self.logger.info("Library data fetched; building tree")
|
|
|
|
self.libraryTree.clear()
|
|
|
|
# Root sections
|
|
self.artistsRoot = QTreeWidgetItem(["Artists"])
|
|
self.artistsRoot.setData(0, Qt.UserRole, {"type": "artists_root", "loaded": True})
|
|
self.artistsRoot.setExpanded(True)
|
|
self.libraryTree.addTopLevelItem(self.artistsRoot)
|
|
|
|
self.favoritesRoot = QTreeWidgetItem(["Favorites"])
|
|
self.favoritesRoot.setData(0, Qt.UserRole, {"type": "favorites_root", "loaded": True})
|
|
self.favoritesRoot.setExpanded(True)
|
|
self.libraryTree.addTopLevelItem(self.favoritesRoot)
|
|
self._populateFavorites(data.get("favorites", {}))
|
|
|
|
self.discoverRoot = QTreeWidgetItem(["Discover"])
|
|
self.discoverRoot.setData(0, Qt.UserRole, {"type": "discover_root", "loaded": True})
|
|
self.discoverRoot.setExpanded(True)
|
|
self.libraryTree.addTopLevelItem(self.discoverRoot)
|
|
self.loadDiscoverSections()
|
|
|
|
self.playlistsRoot = QTreeWidgetItem(["Playlists"])
|
|
self.playlistsRoot.setData(0, Qt.UserRole, {"type": "playlists_root", "loaded": True})
|
|
self.playlistsRoot.setExpanded(True)
|
|
self.libraryTree.addTopLevelItem(self.playlistsRoot)
|
|
|
|
self.genresRoot = QTreeWidgetItem(["Genres"])
|
|
self.genresRoot.setData(0, Qt.UserRole, {"type": "genres_root", "loaded": True})
|
|
self.genresRoot.setExpanded(True)
|
|
self.libraryTree.addTopLevelItem(self.genresRoot)
|
|
|
|
self._populateArtists(data.get("artists", []))
|
|
self._populatePlaylists(data.get("playlists", []))
|
|
self._populateGenres(data.get("genres", []))
|
|
|
|
self.libraryStatus.setText("Library ready. Use the tree to browse and press Enter to play.")
|
|
self.libraryTree.setEnabled(True)
|
|
if self.artistsRoot and self.artistsRoot.childCount() > 0:
|
|
self.libraryTree.setCurrentItem(self.artistsRoot)
|
|
self.libraryTree.setFocus(Qt.OtherFocusReason)
|
|
self.refreshAction.setEnabled(True)
|
|
self.fullSyncAction.setEnabled(True)
|
|
self.searchAction.setEnabled(True)
|
|
self._stopStatusAnnouncements()
|
|
self.announceLive("Library ready")
|
|
self.logger.info("Library ready with sections: artists=%d playlists=%d genres=%d",
|
|
self.artistsRoot.childCount(),
|
|
self.playlistsRoot.childCount(),
|
|
self.genresRoot.childCount())
|
|
|
|
if all(root.childCount() == 0 for root in (self.artistsRoot, self.playlistsRoot, self.genresRoot)):
|
|
self.libraryStatus.setText("No library data returned from server.")
|
|
|
|
def _handleLibraryLoadError(self, message: str):
|
|
"""Show error when library fails to load."""
|
|
self._stopStatusAnnouncements()
|
|
AccessibleTextDialog.showError("Error", f"Failed to load library: {message}", parent=self)
|
|
self.libraryStatus.setText("Library failed to load.")
|
|
self.refreshAction.setEnabled(True)
|
|
self.fullSyncAction.setEnabled(True)
|
|
self.searchAction.setEnabled(bool(self.client))
|
|
self.libraryTree.setEnabled(True)
|
|
self.logger.error("Library load failed: %s", message)
|
|
|
|
def _populateArtists(self, artists: List[Artist]):
|
|
"""Populate top-level artists from fetched data."""
|
|
for artist in artists:
|
|
item = QTreeWidgetItem([artist.name])
|
|
description = f"{artist.albumCount} albums" if artist.albumCount else "Artist"
|
|
item.setData(0, Qt.UserRole, {"type": "artist", "id": artist.id, "artist": artist, "loaded": False})
|
|
item.setData(0, Qt.AccessibleTextRole, f"{artist.name}, {description}")
|
|
item.addChild(QTreeWidgetItem(["Loading albums..."]))
|
|
self.artistsRoot.addChild(item)
|
|
|
|
def _populatePlaylists(self, playlists: List[Playlist]):
|
|
"""Populate top-level playlists from fetched data."""
|
|
for playlist in playlists:
|
|
label = f"{playlist.name} ({playlist.songCount} tracks)"
|
|
item = QTreeWidgetItem([label])
|
|
item.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": False})
|
|
item.setData(0, Qt.AccessibleTextRole, label)
|
|
item.addChild(QTreeWidgetItem(["Loading tracks..."]))
|
|
self.playlistsRoot.addChild(item)
|
|
|
|
def _populateGenres(self, genres: List[Genre]):
|
|
"""Populate top-level genres from fetched data."""
|
|
for genre in genres:
|
|
label = f"{genre.name} ({genre.songCount} songs)"
|
|
item = QTreeWidgetItem([label])
|
|
item.setData(0, Qt.UserRole, {"type": "genre", "name": genre.name, "loaded": False})
|
|
item.setData(0, Qt.AccessibleTextRole, label)
|
|
item.addChild(QTreeWidgetItem(["Loading songs..."]))
|
|
self.genresRoot.addChild(item)
|
|
|
|
def _populateFavorites(self, starred: Dict[str, List[Song]]):
|
|
"""Populate favorites with pre-fetched starred songs."""
|
|
songs: List[Song] = starred.get("songs", []) if starred else []
|
|
self.favoritesRoot.takeChildren()
|
|
if not songs:
|
|
self.favoritesRoot.addChild(QTreeWidgetItem(["No favorites yet."]))
|
|
return
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
item = QTreeWidgetItem([label])
|
|
item.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
item.setData(0, Qt.AccessibleTextRole, label)
|
|
self.favoritesRoot.addChild(item)
|
|
|
|
def loadDiscoverSections(self):
|
|
"""Create discover sections for album lists and recommendations."""
|
|
sections = [
|
|
("Recently Added", "newest"),
|
|
("Frequently Played", "frequent"),
|
|
("Recently Played", "recent"),
|
|
("Random Albums", "random"),
|
|
]
|
|
|
|
for label, listType in sections:
|
|
item = QTreeWidgetItem([label])
|
|
item.setData(0, Qt.UserRole, {"type": "album_list", "listType": listType, "loaded": False})
|
|
item.setData(0, Qt.AccessibleTextRole, label)
|
|
item.addChild(QTreeWidgetItem(["Loading albums..."]))
|
|
self.discoverRoot.addChild(item)
|
|
|
|
nowPlaying = QTreeWidgetItem(["Now Playing (server)"])
|
|
nowPlaying.setData(0, Qt.UserRole, {"type": "now_playing", "loaded": False})
|
|
nowPlaying.setData(0, Qt.AccessibleTextRole, "Now Playing across users")
|
|
nowPlaying.addChild(QTreeWidgetItem(["Loading songs..."]))
|
|
self.discoverRoot.addChild(nowPlaying)
|
|
|
|
similarCurrent = QTreeWidgetItem(["Similar to Current Track"])
|
|
similarCurrent.setData(0, Qt.UserRole, {"type": "similar_current", "loaded": False})
|
|
similarCurrent.setData(0, Qt.AccessibleTextRole, "Songs similar to the current track")
|
|
similarCurrent.addChild(QTreeWidgetItem(["Load to view similar tracks..."]))
|
|
self.discoverRoot.addChild(similarCurrent)
|
|
|
|
def onLibraryExpanded(self, item: QTreeWidgetItem):
|
|
"""Lazy-load children when an item is expanded."""
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
if data.get("loaded"):
|
|
return
|
|
itemType = data.get("type")
|
|
self._setLoadingPlaceholder(item, "Loading...")
|
|
|
|
if itemType == "artist":
|
|
self._run_background(
|
|
"load artist albums",
|
|
lambda: self._fetchArtistAlbums(data.get("id")),
|
|
on_success=lambda albums: self._applyArtistAlbums(item, albums),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "album":
|
|
self._run_background(
|
|
"load album songs",
|
|
lambda: self._fetchAlbumSongs(data.get("id")),
|
|
on_success=lambda songs: self._applyAlbumSongs(item, songs),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "playlist":
|
|
self._run_background(
|
|
"load playlist songs",
|
|
lambda: self._fetchPlaylistSongs(data.get("id")),
|
|
on_success=lambda playlist: self._applyPlaylistSongs(item, playlist),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "genre":
|
|
self._run_background(
|
|
"load genre songs",
|
|
lambda: self._fetchGenreSongs(data.get("name")),
|
|
on_success=lambda songs: self._applyGenreSongs(item, songs),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "album_list":
|
|
self._run_background(
|
|
"load discover section",
|
|
lambda: self._fetchAlbumList(data.get("listType", "newest")),
|
|
on_success=lambda albums: self._applyAlbumListSection(item, albums),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "now_playing":
|
|
self._run_background(
|
|
"load now playing",
|
|
self._fetchNowPlaying,
|
|
on_success=lambda songs: self._applyNowPlaying(item, songs),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
elif itemType == "similar_current":
|
|
self._run_background(
|
|
"load similar songs",
|
|
self._fetchSimilarToCurrent,
|
|
on_success=lambda songs: self._applySimilarToCurrent(item, songs),
|
|
on_error=lambda err: self._handleItemLoadError(item, str(err))
|
|
)
|
|
|
|
def onLibraryActivated(self, item: QTreeWidgetItem):
|
|
"""Handle activation (Enter/Return) in the library tree."""
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
if itemType == "song":
|
|
song: Song = data["song"]
|
|
self.playback.enqueue(song, playNow=True)
|
|
self.statusBar.showMessage(f"Playing {song.title} by {song.artist}", 4000)
|
|
self.enablePlaybackActions(True)
|
|
elif itemType == "playlist":
|
|
self.playPlaylist(data.get("id"), item)
|
|
elif itemType == "album":
|
|
self.playAlbum(data.get("id"), item)
|
|
elif itemType == "artist":
|
|
self.playArtist(data.get("id"), item)
|
|
elif itemType == "artists_root":
|
|
self.playAllArtists()
|
|
elif itemType == "favorites_root":
|
|
self.playFavorites()
|
|
elif itemType == "genre":
|
|
self.playGenre(data.get("name"), item)
|
|
elif itemType in ("album_list", "now_playing", "similar_current"):
|
|
item.setExpanded(not item.isExpanded())
|
|
else:
|
|
# Toggle expansion for other nodes
|
|
item.setExpanded(not item.isExpanded())
|
|
|
|
def onLibraryContextMenu(self, position: QPoint):
|
|
"""Show a context menu for the library tree (mouse or keyboard)."""
|
|
item = self.libraryTree.itemAt(position)
|
|
if not item:
|
|
return
|
|
|
|
# Ensure actions target the item the user invoked the menu on
|
|
self.libraryTree.setCurrentItem(item)
|
|
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
hasClient = self.client is not None
|
|
|
|
menu = QMenu(self)
|
|
menu.setAccessibleName("Library Context Menu")
|
|
|
|
if itemType == "song":
|
|
song: Optional[Song] = data.get("song")
|
|
menu.addAction("Play", lambda _=False, song=song: self.playback.enqueue(song, playNow=True) if song else None).setEnabled(hasClient and song is not None)
|
|
menu.addAction("Add to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient and song is not None)
|
|
menu.addAction("Add to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient and song is not None)
|
|
menu.addSeparator()
|
|
menu.addAction("Favorite", self.favoriteSelection).setEnabled(hasClient)
|
|
menu.addAction("Unfavorite", self.unfavoriteSelection).setEnabled(hasClient)
|
|
elif itemType == "album":
|
|
menu.addAction("Play Album", lambda _=False, item=item, data=data: self.playAlbum(data.get("id"), item)).setEnabled(hasClient)
|
|
menu.addAction("Add Album to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add Album to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
menu.addSeparator()
|
|
menu.addAction("Favorite Album", self.favoriteSelection).setEnabled(hasClient)
|
|
menu.addAction("Unfavorite Album", self.unfavoriteSelection).setEnabled(hasClient)
|
|
elif itemType == "artist":
|
|
menu.addAction("Play Artist", lambda _=False, item=item, data=data: self.playArtist(data.get("id"), item)).setEnabled(hasClient)
|
|
menu.addAction("Add Artist to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add Artist to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
menu.addSeparator()
|
|
menu.addAction("Favorite Artist", self.favoriteSelection).setEnabled(hasClient)
|
|
menu.addAction("Unfavorite Artist", self.unfavoriteSelection).setEnabled(hasClient)
|
|
elif itemType == "playlist":
|
|
menu.addAction("Play Playlist", lambda _=False, item=item, data=data: self.playPlaylist(data.get("id"), item)).setEnabled(hasClient)
|
|
menu.addAction("Add Playlist to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add Playlist to Another Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
elif itemType == "genre":
|
|
menu.addAction("Play Genre", lambda _=False, item=item, data=data: self.playGenre(data.get("name"), item)).setEnabled(hasClient)
|
|
menu.addAction("Add Genre to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add Genre to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
elif itemType == "artists_root":
|
|
menu.addAction("Play All Artists", self.playAllArtists).setEnabled(hasClient)
|
|
menu.addAction("Add All Artists to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add All Artists to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
elif itemType == "favorites_root":
|
|
menu.addAction("Play Favorites", self.playFavorites).setEnabled(hasClient)
|
|
menu.addAction("Add Favorites to Queue", lambda _=False, item=item: self.addItemToQueue(item)).setEnabled(hasClient)
|
|
menu.addAction("Add Favorites to Playlist", lambda _=False, item=item: self.addItemToPlaylist(item)).setEnabled(hasClient)
|
|
|
|
if not menu.actions():
|
|
return
|
|
|
|
label = (item.text(0) or "").strip()
|
|
if label:
|
|
self.announceLive(f"Context menu for {label} opened.")
|
|
else:
|
|
self.announceLive("Context menu opened.")
|
|
|
|
globalPos = self.libraryTree.viewport().mapToGlobal(position)
|
|
menu.exec(globalPos)
|
|
|
|
def showCurrentMediaContextMenu(self):
|
|
"""Open a context menu for the currently playing song (Alt+I)."""
|
|
currentSong = self.playback.currentSong()
|
|
if currentSong:
|
|
row = self.playback.currentIndex
|
|
if row is not None and 0 <= row < len(self.playback.queue):
|
|
self.queueList.setCurrentRow(row)
|
|
self._showQueueSongContextMenu(row, announce=True)
|
|
return
|
|
self._showSongContextMenu(currentSong, self.queueList.viewport(), announce=True)
|
|
return
|
|
|
|
item = self.libraryTree.currentItem()
|
|
if item:
|
|
rect = self.libraryTree.visualItemRect(item)
|
|
if rect.isValid():
|
|
self.announceLive(f"Context menu for {item.text(0)} opened.")
|
|
self.onLibraryContextMenu(rect.center())
|
|
return
|
|
|
|
row = self.queueList.currentRow()
|
|
if 0 <= row < len(self.playback.queue):
|
|
self._showQueueSongContextMenu(row, announce=True)
|
|
return
|
|
|
|
self.announceLive("No track to interact with.")
|
|
|
|
def _showQueueSongContextMenu(self, row: int, announce: bool = False):
|
|
"""Show a song context menu for a queue row."""
|
|
if not (0 <= row < len(self.playback.queue)):
|
|
return
|
|
song = self.playback.queue[row]
|
|
item = self.queueList.item(row)
|
|
rect = self.queueList.visualItemRect(item) if item else None
|
|
anchor = rect.center() if rect and rect.isValid() else self.queueList.viewport().rect().center()
|
|
self._showSongContextMenu(song, self.queueList.viewport(), anchor, queueRow=row, announce=announce)
|
|
|
|
def _showSongContextMenu(self, song: Song, anchorWidget: QWidget, anchor: Optional[QPoint] = None, queueRow: Optional[int] = None, announce: bool = False):
|
|
"""Show a context menu with song actions at the provided anchor."""
|
|
menu = QMenu(self)
|
|
menu.setAccessibleName("Song Context Menu")
|
|
hasClient = self.client is not None
|
|
|
|
menu.addAction("Play", lambda _=False, row=queueRow, s=song: self._playSongFromMenu(s, row)).setEnabled(hasClient)
|
|
menu.addAction("Add to Queue", lambda _=False, s=song: self.playback.enqueue(s, playNow=False)).setEnabled(hasClient)
|
|
menu.addAction("Add to Playlist", lambda _=False, s=song: self._addSongToPlaylistDirect(s)).setEnabled(hasClient)
|
|
menu.addSeparator()
|
|
menu.addAction("Favorite", lambda _=False, s=song: self._favoriteSongDirect(s)).setEnabled(hasClient)
|
|
menu.addAction("Unfavorite", lambda _=False, s=song: self._unfavoriteSongDirect(s)).setEnabled(hasClient)
|
|
|
|
if not menu.actions():
|
|
return
|
|
|
|
anchor = anchor or anchorWidget.rect().center()
|
|
globalPos = anchorWidget.mapToGlobal(anchor)
|
|
if announce:
|
|
label = self.buildTrackAnnouncement(song)
|
|
self.announceLive(f"Context menu for {label} opened.")
|
|
menu.exec(globalPos)
|
|
|
|
def _playSongFromMenu(self, song: Song, queueRow: Optional[int]):
|
|
"""Play a song, reusing queue position when available."""
|
|
if queueRow is not None and 0 <= queueRow < len(self.playback.queue):
|
|
self.playback.playIndex(queueRow)
|
|
return
|
|
self.playback.enqueue(song, playNow=True)
|
|
|
|
def _addSongToPlaylistDirect(self, song: Song):
|
|
"""Open the playlist dialog for a single song."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
self._collectPlaylistsAndShowDialog([song], label, truncated=False)
|
|
|
|
def _favoriteSongDirect(self, song: Song):
|
|
"""Star a song without relying on tree selection."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
label = self.buildTrackAnnouncement(song)
|
|
try:
|
|
self.client.star(song.id, "song")
|
|
message = self.buildFavoriteAnnouncement("favorite", song, label)
|
|
self.statusBar.showMessage(message, 3000)
|
|
self.announceLive(message)
|
|
except Exception as exc: # noqa: BLE001
|
|
self.announceLive(self.buildFavoriteFailureAnnouncement("favorite", song, label))
|
|
AccessibleTextDialog.showError("Favorite Failed", str(exc), parent=self)
|
|
|
|
def _unfavoriteSongDirect(self, song: Song):
|
|
"""Unstar a song without relying on tree selection."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
label = self.buildTrackAnnouncement(song)
|
|
try:
|
|
self.client.unstar(song.id, "song")
|
|
message = self.buildFavoriteAnnouncement("unfavorite", song, label)
|
|
self.statusBar.showMessage(message, 3000)
|
|
self.announceLive(message)
|
|
except Exception as exc: # noqa: BLE001
|
|
self.announceLive(self.buildFavoriteFailureAnnouncement("unfavorite", song, label))
|
|
AccessibleTextDialog.showError("Unfavorite Failed", str(exc), parent=self)
|
|
|
|
def _confirmBulkAction(self, action: str, label: str, count: Optional[int] = None, truncated: bool = False) -> bool:
|
|
"""Confirm bulk-affecting actions like favoriting many tracks."""
|
|
target = label or "selection"
|
|
countText = f"{count} " if count else ""
|
|
plural = "s" if count and count != 1 else ""
|
|
truncatedText = " (first batch shown)" if truncated else ""
|
|
message = f"This will {action} {countText}track{plural} from {target}{truncatedText}. Continue?"
|
|
response = QMessageBox.question(
|
|
self,
|
|
"Confirm action",
|
|
message,
|
|
QMessageBox.Yes | QMessageBox.No,
|
|
QMessageBox.No
|
|
)
|
|
return response == QMessageBox.Yes
|
|
|
|
def _fetchArtistAlbums(self, artistId: str) -> List[Album]:
|
|
"""Fetch albums for a given artist off the UI thread."""
|
|
result = self.client.getArtist(artistId)
|
|
return result.get("albums", [])
|
|
|
|
def _applyArtistAlbums(self, parentItem: QTreeWidgetItem, albums: List[Album]):
|
|
"""Add albums for a given artist on the UI thread."""
|
|
parentItem.takeChildren()
|
|
for album in albums:
|
|
label = f"{album.name} ({album.songCount} tracks)"
|
|
albumItem = QTreeWidgetItem([label])
|
|
albumItem.setData(0, Qt.UserRole, {"type": "album", "id": album.id, "album": album, "loaded": False})
|
|
albumItem.setData(0, Qt.AccessibleTextRole, label)
|
|
albumItem.addChild(QTreeWidgetItem(["Loading tracks..."]))
|
|
parentItem.addChild(albumItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
def _fetchAlbumSongs(self, albumId: str) -> List[Song]:
|
|
"""Fetch songs for a given album off the UI thread."""
|
|
result = self.client.getAlbum(albumId)
|
|
return result.get("songs", [])
|
|
|
|
def _applyAlbumSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Add songs for a given album on the UI thread."""
|
|
parentItem.takeChildren()
|
|
for song in songs:
|
|
label = self.formatSongLabel(song)
|
|
songItem = QTreeWidgetItem([label])
|
|
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
songItem.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(songItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
def _fetchPlaylistSongs(self, playlistId: str) -> Playlist:
|
|
"""Fetch playlist with songs off the UI thread."""
|
|
return self.client.getPlaylist(playlistId)
|
|
|
|
def _applyPlaylistSongs(self, parentItem: QTreeWidgetItem, playlist: Playlist):
|
|
"""Add songs for a playlist on the UI thread."""
|
|
parentItem.takeChildren()
|
|
for song in playlist.songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
songItem = QTreeWidgetItem([label])
|
|
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
songItem.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(songItem)
|
|
parentItem.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": True})
|
|
|
|
def _fetchGenreSongs(self, genreName: str) -> List[Song]:
|
|
"""Fetch songs for a genre off the UI thread."""
|
|
return self.client.getSongsByGenre(genreName, count=200)
|
|
|
|
def _applyGenreSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Add songs for a genre on the UI thread."""
|
|
parentItem.takeChildren()
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
songItem = QTreeWidgetItem([label])
|
|
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
songItem.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(songItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
def _fetchAlbumList(self, listType: str) -> List[Album]:
|
|
"""Fetch album list for discover section off the UI thread."""
|
|
return self.client.getAlbumList(listType, size=40)
|
|
|
|
def _applyAlbumListSection(self, parentItem: QTreeWidgetItem, albums: List[Album]):
|
|
"""Populate a discover album list section on the UI thread."""
|
|
parentItem.takeChildren()
|
|
for album in albums:
|
|
label = f"{album.name} — {album.artist} ({album.songCount} tracks)"
|
|
albumItem = QTreeWidgetItem([label])
|
|
albumItem.setData(0, Qt.UserRole, {"type": "album", "id": album.id, "album": album, "loaded": False})
|
|
albumItem.setData(0, Qt.AccessibleTextRole, label)
|
|
albumItem.addChild(QTreeWidgetItem(["Loading tracks..."]))
|
|
parentItem.addChild(albumItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
def _fetchNowPlaying(self) -> List[Song]:
|
|
"""Fetch songs currently playing across users off the UI thread."""
|
|
return self.client.getNowPlaying()
|
|
|
|
def _applyNowPlaying(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Populate songs currently playing across users on the UI thread."""
|
|
parentItem.takeChildren()
|
|
if not songs:
|
|
parentItem.addChild(QTreeWidgetItem(["Nothing is currently playing."]))
|
|
return
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
songItem = QTreeWidgetItem([label])
|
|
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
songItem.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(songItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
def _fetchSimilarToCurrent(self) -> List[Song]:
|
|
"""Fetch songs similar to the current track off the UI thread."""
|
|
current = self.playback.currentSong()
|
|
if not current:
|
|
return []
|
|
return self.client.getSimilarSongs(current.id, count=40)
|
|
|
|
def _applySimilarToCurrent(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Populate songs similar to the current track on the UI thread."""
|
|
parentItem.takeChildren()
|
|
current = self.playback.currentSong()
|
|
if not current:
|
|
parentItem.addChild(QTreeWidgetItem(["Play a track to see similar songs."]))
|
|
return
|
|
if not songs:
|
|
parentItem.addChild(QTreeWidgetItem([f"No similar tracks found for {current.title}."]))
|
|
return
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
songItem = QTreeWidgetItem([label])
|
|
songItem.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
songItem.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(songItem)
|
|
self._markItemLoaded(parentItem)
|
|
|
|
@staticmethod
|
|
def formatSongLabel(song: Song, includeAlbum: bool = False) -> str:
|
|
"""Create a descriptive label for a song."""
|
|
prefix = f"{song.track}. " if song.track else ""
|
|
core = f"{prefix}{song.title}"
|
|
details = f"{song.artist}"
|
|
if includeAlbum:
|
|
details = f"{details} — {song.album}"
|
|
return f"{core} ({song.durationFormatted}) — {details}"
|
|
|
|
def _populateAlbumItemWithSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Populate an album tree item with song children."""
|
|
parentItem.takeChildren()
|
|
for song in songs:
|
|
label = self.formatSongLabel(song)
|
|
child = QTreeWidgetItem([label])
|
|
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
child.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(child)
|
|
data: Dict = parentItem.data(0, Qt.UserRole) or {}
|
|
data["loaded"] = True
|
|
parentItem.setData(0, Qt.UserRole, data)
|
|
|
|
def _populateGenreItemWithSongs(self, parentItem: QTreeWidgetItem, songs: List[Song]):
|
|
"""Populate a genre item with the provided songs."""
|
|
parentItem.takeChildren()
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
child = QTreeWidgetItem([label])
|
|
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
child.setData(0, Qt.AccessibleTextRole, label)
|
|
parentItem.addChild(child)
|
|
data: Dict = parentItem.data(0, Qt.UserRole) or {}
|
|
data["loaded"] = True
|
|
parentItem.setData(0, Qt.UserRole, data)
|
|
|
|
def addItemToQueue(self, item: QTreeWidgetItem):
|
|
"""Append the item's songs to the queue without interrupting playback."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
|
|
if itemType == "song":
|
|
song: Optional[Song] = data.get("song")
|
|
if not song:
|
|
self.announceLive("No song selected.")
|
|
return
|
|
self.playback.enqueue(song, playNow=False)
|
|
self.statusBar.showMessage(f"Added {song.title} to the queue", 3000)
|
|
self.enablePlaybackActions(True)
|
|
return
|
|
|
|
self.statusBar.showMessage("Collecting songs...", 2000)
|
|
self._run_background(
|
|
"collect songs for queue",
|
|
lambda: self.collectSongsForItem(item),
|
|
on_success=self._onSongsCollectedForQueue,
|
|
on_error=lambda err: self._handleCollectionError(str(err))
|
|
)
|
|
|
|
def addItemToPlaylist(self, item: QTreeWidgetItem):
|
|
"""Append the item's songs to an existing or new playlist."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
if itemType == "song":
|
|
song: Optional[Song] = data.get("song")
|
|
songs = [song] if song else []
|
|
label = self.formatSongLabel(song, includeAlbum=True) if song else item.text(0)
|
|
self._collectPlaylistsAndShowDialog(songs, label, truncated=False)
|
|
return
|
|
|
|
self.statusBar.showMessage("Collecting songs...", 2000)
|
|
self._run_background(
|
|
"collect songs for playlist",
|
|
lambda: self.collectSongsForItem(item),
|
|
on_success=self._onSongsCollectedForPlaylist,
|
|
on_error=lambda err: self._handleCollectionError(str(err))
|
|
)
|
|
|
|
def collectSongsForItem(self, item: QTreeWidgetItem) -> Tuple[List[Song], str, bool]:
|
|
"""Resolve a tree item into a list of songs for queueing or playlist actions."""
|
|
if self._shutting_down:
|
|
return [], "", False
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
label = item.text(0)
|
|
truncated = False
|
|
request_count = 0
|
|
|
|
if not self.client:
|
|
return [], label, truncated
|
|
|
|
try:
|
|
if itemType == "song":
|
|
song: Optional[Song] = data.get("song")
|
|
return ([song] if song else []), (self.formatSongLabel(song, includeAlbum=True) if song else label), truncated
|
|
if itemType == "album":
|
|
albumData = self.client.getAlbum(data.get("id"))
|
|
album: Optional[Album] = albumData.get("album")
|
|
songs: List[Song] = albumData.get("songs", [])
|
|
return songs, (album.name if album else label), truncated
|
|
if itemType == "artist":
|
|
if self._shutting_down:
|
|
return [], label, truncated
|
|
artistData = self.client.getArtist(data.get("id"))
|
|
artist: Optional[Artist] = artistData.get("artist")
|
|
albums: List[Album] = artistData.get("albums", [])
|
|
songs: List[Song] = []
|
|
for album in albums:
|
|
if self._shutting_down:
|
|
break
|
|
albumData = self.client.getAlbum(album.id)
|
|
songs.extend(albumData.get("songs", []))
|
|
request_count += 1
|
|
self._throttle_requests(request_count)
|
|
return songs, (artist.name if artist else label), truncated
|
|
if itemType == "playlist":
|
|
playlist: Playlist = self.client.getPlaylist(data.get("id"))
|
|
return playlist.songs, playlist.name, truncated
|
|
if itemType == "genre":
|
|
genreName = data.get("name")
|
|
songs = self.client.getSongsByGenre(genreName)
|
|
return songs, (genreName or label), truncated
|
|
if itemType == "artists_root":
|
|
if self._shutting_down:
|
|
return [], "all artists", truncated
|
|
artists: List[Artist] = self.client.getArtists()
|
|
songs: List[Song] = []
|
|
for artist in artists:
|
|
if self._shutting_down:
|
|
break
|
|
artistData = self.client.getArtist(artist.id)
|
|
for album in artistData.get("albums", []):
|
|
if self._shutting_down:
|
|
break
|
|
albumData = self.client.getAlbum(album.id)
|
|
songs.extend(albumData.get("songs", []))
|
|
request_count += 1
|
|
self._throttle_requests(request_count)
|
|
return songs, "all artists", truncated
|
|
if itemType == "favorites_root":
|
|
starred = self.client.getStarred()
|
|
songs = starred.get("songs", [])
|
|
return songs, "favorites", truncated
|
|
if itemType == "album_list":
|
|
# These nodes contain album children; we don't aggregate here
|
|
return [], label, truncated
|
|
if itemType == "now_playing":
|
|
songs = self.client.getNowPlaying()
|
|
return songs, "now playing", truncated
|
|
if itemType == "similar_current":
|
|
current = self.playback.currentSong()
|
|
songs = self.client.getSimilarSongs(current.id, count=40) if current else []
|
|
return songs, "similar tracks", truncated
|
|
except Exception as e:
|
|
AccessibleTextDialog.showError("Queue Error", str(e), parent=self)
|
|
return [], label, truncated
|
|
|
|
def _onSongsCollectedForQueue(self, result: Tuple[List[Song], str, bool]):
|
|
"""Handle async collection results for queueing."""
|
|
songs, label, truncated = result
|
|
if not songs:
|
|
self.announceLive(f"No songs found for {label}")
|
|
return
|
|
self.playback.enqueueMany(songs, playFirst=False)
|
|
message = f"Added {len(songs)} tracks from {label} to the queue"
|
|
self.statusBar.showMessage(message, 4000)
|
|
self.enablePlaybackActions(True)
|
|
self.announceLive(message)
|
|
|
|
def _handleCollectionError(self, message: str):
|
|
"""Surface collection errors consistently."""
|
|
AccessibleTextDialog.showError("Queue Error", message, parent=self)
|
|
|
|
def _onSongsCollectedForPlaylist(self, result: Tuple[List[Song], str, bool]):
|
|
"""Handle async collection results before showing playlist dialog."""
|
|
songs, label, truncated = result
|
|
self._collectPlaylistsAndShowDialog(songs, label, truncated)
|
|
|
|
def _collectPlaylistsAndShowDialog(self, songs: List[Song], label: str, truncated: bool):
|
|
"""Fetch playlists in the background, then show the dialog on the UI thread."""
|
|
if not songs:
|
|
self.announceLive(f"No songs found for {label}")
|
|
return
|
|
if len(songs) > 1 and not self._confirmBulkAction("add to playlist", label, count=len(songs), truncated=truncated):
|
|
self.statusBar.showMessage("Add to playlist canceled", 2000)
|
|
return
|
|
self._run_background(
|
|
"load playlists",
|
|
self.client.getPlaylists,
|
|
on_success=lambda playlists: self._showAddToPlaylistDialog(playlists, songs, label, truncated),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
|
|
)
|
|
|
|
def _showAddToPlaylistDialog(self, playlists: List[Playlist], songs: List[Song], label: str, truncated: bool):
|
|
"""Open playlist dialog and dispatch playlist updates in the background."""
|
|
dialog = AddToPlaylistDialog(playlists, parent=self)
|
|
if not dialog.exec():
|
|
return
|
|
|
|
mode, target = dialog.getResult()
|
|
songIds = [song.id for song in songs]
|
|
self._run_background(
|
|
"update playlist",
|
|
lambda: self._updateOrCreatePlaylist(mode, target, playlists, songIds),
|
|
on_success=lambda message: self._afterPlaylistUpdated(message, len(songs), truncated),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
|
|
)
|
|
|
|
def _updateOrCreatePlaylist(self, mode: str, target: str, playlists: List[Playlist], songIds: List[str]) -> str:
|
|
"""Perform the playlist mutation off the UI thread and return a status message."""
|
|
if mode == "existing":
|
|
self.client.updatePlaylist(target, songIdsToAdd=songIds)
|
|
targetName = next((p.name for p in playlists if p.id == target), "playlist")
|
|
return f"Added tracks to {targetName}"
|
|
playlist = self.client.createPlaylist(target, songIds=songIds)
|
|
return f"Created playlist {playlist.name}"
|
|
|
|
def _afterPlaylistUpdated(self, baseMessage: str, count: int, truncated: bool):
|
|
"""Announce playlist mutation results on the UI thread."""
|
|
suffix = f" ({count} tracks)"
|
|
message = f"{baseMessage} {suffix}"
|
|
self.statusBar.showMessage(message, 4000)
|
|
self.announceLive(message)
|
|
|
|
# ============ Search ============
|
|
|
|
def openSearchDialog(self):
|
|
"""Open the search dialog."""
|
|
if not self.client:
|
|
self.logger.warning("Search requested but no client is connected")
|
|
self.announceLive("Connect to a server first to search.")
|
|
return
|
|
self.logger.info("Opening search dialog")
|
|
try:
|
|
dialog = SearchDialog(self.client, parent=self)
|
|
dialog.songActivated.connect(self.onSearchSongChosen)
|
|
dialog.exec()
|
|
self.logger.info("Search dialog closed")
|
|
except Exception as exc: # noqa: BLE001
|
|
self.logger.exception("Failed to open search dialog: %s", exc)
|
|
|
|
def onSearchSongChosen(self, song: Song):
|
|
"""Handle song selection from search."""
|
|
self.playback.enqueue(song, playNow=True)
|
|
self.enablePlaybackActions(True)
|
|
self.statusBar.showMessage(f"Playing {song.title} by {song.artist}", 4000)
|
|
|
|
# ============ Queue management ============
|
|
|
|
def onQueueChanged(self, queue: List[Song]):
|
|
"""Refresh the queue list widget."""
|
|
self.queueList.blockSignals(True)
|
|
self.queueList.clear()
|
|
for index, song in enumerate(queue):
|
|
text = f"{index + 1}. {song.artist} — {song.title} ({song.durationFormatted})"
|
|
item = QListWidgetItem(text)
|
|
item.setData(Qt.UserRole, index)
|
|
item.setData(Qt.AccessibleTextRole, text)
|
|
self.queueList.addItem(item)
|
|
self.queueList.blockSignals(False)
|
|
|
|
hasItems = len(queue) > 0
|
|
self.queueList.setEnabled(hasItems)
|
|
self.removeQueueButton.setEnabled(hasItems)
|
|
self.clearQueueButton.setEnabled(hasItems)
|
|
self.enablePlaybackActions(hasItems)
|
|
self.announceTrackAction.setEnabled(hasItems)
|
|
self.announcePositionAction.setEnabled(hasItems)
|
|
|
|
def onQueueItemActivated(self, item: QListWidgetItem):
|
|
"""Play the selected queue item."""
|
|
index = item.data(Qt.UserRole)
|
|
if index is not None:
|
|
self.playback.playIndex(int(index))
|
|
|
|
def removeSelectedFromQueue(self):
|
|
"""Remove the selected item from the queue."""
|
|
row = self.queueList.currentRow()
|
|
if row < 0:
|
|
return
|
|
if 0 <= row < len(self.playback.queue):
|
|
del self.playback.queue[row]
|
|
if row <= self.playback.currentIndex:
|
|
self.playback.currentIndex = max(-1, self.playback.currentIndex - 1)
|
|
if not self.playback.queue:
|
|
self.playback.stop()
|
|
self.playback.queueChanged.emit(self.playback.queue.copy())
|
|
|
|
def clearQueue(self):
|
|
"""Clear the entire queue."""
|
|
self.playback.clearQueue()
|
|
|
|
# ============ Playback updates ============
|
|
|
|
def startPlaybackWithSongs(self, songs: List[Song], label: str):
|
|
"""Replace queue with provided songs and start playback."""
|
|
if not songs:
|
|
AccessibleTextDialog.showWarning("Nothing to Play", f"No songs found for {label}.", parent=self)
|
|
return
|
|
self.playback.clearQueue()
|
|
self.playback.enqueueMany(songs, playFirst=True)
|
|
self.statusBar.showMessage(f"Playing {label}", 4000)
|
|
self.enablePlaybackActions(True)
|
|
|
|
def playAlbum(self, albumId: str, item: Optional[QTreeWidgetItem] = None):
|
|
"""Play all songs in an album."""
|
|
if not self.client or not albumId:
|
|
self.announceLive("No album selected.")
|
|
return
|
|
self.statusBar.showMessage("Loading album...", 2000)
|
|
self._run_background(
|
|
"play album",
|
|
lambda: self._fetchAlbumWithSongs(albumId),
|
|
on_success=lambda data: self._onAlbumLoadedForPlay(albumId, item, data),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Album Error", str(err), parent=self)
|
|
)
|
|
|
|
def playArtist(self, artistId: str, item: Optional[QTreeWidgetItem] = None):
|
|
"""Play all songs by an artist across their albums."""
|
|
if not self.client or not artistId:
|
|
self.announceLive("No artist selected.")
|
|
return
|
|
self.statusBar.showMessage("Loading artist...", 2000)
|
|
self._run_background(
|
|
"play artist",
|
|
lambda: self._fetchArtistSongs(artistId),
|
|
on_success=lambda payload: self._onArtistLoadedForPlay(item, payload),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Artist Error", str(err), parent=self)
|
|
)
|
|
|
|
def playAllArtists(self):
|
|
"""Play all songs across all artists."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
self.announceLive("Loading all artists...")
|
|
self._run_background(
|
|
"play all artists",
|
|
self._progressiveLoadAllArtists,
|
|
on_success=lambda count: self._announceLiveIfAny(count, "all artists"),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Library Error", str(err), parent=self)
|
|
)
|
|
|
|
def playGenre(self, genreName: str, item: Optional[QTreeWidgetItem] = None):
|
|
"""Play all songs within a genre."""
|
|
if not self.client or not genreName:
|
|
self.announceLive("No genre selected.")
|
|
return
|
|
self.statusBar.showMessage("Loading genre...", 2000)
|
|
self._run_background(
|
|
"play genre",
|
|
lambda: self._fetchGenreSongsForPlay(genreName),
|
|
on_success=lambda result: self._onGenreLoadedForPlay(genreName, item, result),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Genre Error", str(err), parent=self)
|
|
)
|
|
|
|
def playPlaylist(self, playlistId: str, item: Optional[QTreeWidgetItem] = None):
|
|
"""Load a playlist and start playback."""
|
|
self.statusBar.showMessage("Loading playlist...", 2000)
|
|
self._run_background(
|
|
"play playlist",
|
|
lambda: self.client.getPlaylist(playlistId),
|
|
on_success=lambda playlist: self._onPlaylistLoadedForPlay(item, playlist),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Playlist Error", str(err), parent=self)
|
|
)
|
|
|
|
def playFavorites(self):
|
|
"""Play all starred songs."""
|
|
if not self.client:
|
|
self.announceLive("Not connected to a server.")
|
|
return
|
|
self.statusBar.showMessage("Loading favorites...", 2000)
|
|
self._run_background(
|
|
"play favorites",
|
|
lambda: self.client.getStarred(),
|
|
on_success=lambda starred: self.startPlaybackWithSongs(starred.get("songs", []), "favorites"),
|
|
on_error=lambda err: AccessibleTextDialog.showError("Favorites Error", str(err), parent=self)
|
|
)
|
|
|
|
# ============ Background fetch helpers for playback ============
|
|
|
|
def _fetchAlbumWithSongs(self, albumId: str) -> Tuple[Optional[Album], List[Song]]:
|
|
"""Fetch album metadata and songs."""
|
|
albumData = self.client.getAlbum(albumId)
|
|
return albumData.get("album"), albumData.get("songs", [])
|
|
|
|
def _onAlbumLoadedForPlay(self, albumId: str, item: Optional[QTreeWidgetItem], data: Tuple[Optional[Album], List[Song]]):
|
|
"""Start playback for a loaded album and populate UI if needed."""
|
|
album, songs = data
|
|
self.startPlaybackWithSongs(songs, f"album {album.name if album else albumId}")
|
|
if item and songs:
|
|
self._populateAlbumItemWithSongs(item, songs)
|
|
item.setExpanded(True)
|
|
|
|
def _fetchArtistSongs(self, artistId: str) -> Tuple[Optional[Artist], List[Song]]:
|
|
"""Fetch all songs for an artist across albums with throttling."""
|
|
if self._shutting_down:
|
|
return None, []
|
|
artistData = self.client.getArtist(artistId)
|
|
artist: Optional[Artist] = artistData.get("artist")
|
|
albums: List[Album] = artistData.get("albums", [])
|
|
songs: List[Song] = []
|
|
request_count = 0
|
|
for album in albums:
|
|
if self._shutting_down:
|
|
break
|
|
albumData = self.client.getAlbum(album.id)
|
|
songs.extend(albumData.get("songs", []))
|
|
request_count += 1
|
|
self._throttle_requests(request_count)
|
|
return artist, songs
|
|
|
|
def _onArtistLoadedForPlay(self, item: Optional[QTreeWidgetItem], payload: Tuple[Optional[Artist], List[Song]]):
|
|
"""Start playback for a loaded artist and expand UI if present."""
|
|
artist, songs = payload
|
|
label = artist.name if artist else "artist"
|
|
self.startPlaybackWithSongs(songs, label)
|
|
if item:
|
|
item.setExpanded(True)
|
|
|
|
def _progressiveLoadAllArtists(self) -> int:
|
|
"""Fetch songs across all artists in chunks and enqueue progressively."""
|
|
if self._shutting_down:
|
|
return 0
|
|
artists: List[Artist] = self.client.getArtists()
|
|
batch: List[Song] = []
|
|
request_count = 0
|
|
total = 0
|
|
first_batch = True
|
|
|
|
for artist in artists:
|
|
if self._shutting_down:
|
|
break
|
|
artistData = self.client.getArtist(artist.id)
|
|
for album in artistData.get("albums", []):
|
|
if self._shutting_down:
|
|
break
|
|
albumData = self.client.getAlbum(album.id)
|
|
songs = albumData.get("songs", [])
|
|
batch.extend(songs)
|
|
total += len(songs)
|
|
request_count += 1
|
|
if len(batch) >= self.bulkChunkSize:
|
|
self._on_ui(lambda b=batch.copy(), first=first_batch: self._enqueueBatchForPlayback(b, "all artists", first))
|
|
batch.clear()
|
|
first_batch = False
|
|
self._throttle_requests(request_count)
|
|
|
|
if batch and not self._shutting_down:
|
|
self._on_ui(lambda b=batch.copy(), first=first_batch: self._enqueueBatchForPlayback(b, "all artists", first))
|
|
|
|
return total
|
|
|
|
def _enqueueBatchForPlayback(self, batch: List[Song], label: str, first: bool):
|
|
"""Enqueue a batch of songs, starting playback on the first batch."""
|
|
if not batch:
|
|
return
|
|
if first:
|
|
self.playback.clearQueue()
|
|
self.playback.enqueueMany(batch, playFirst=True)
|
|
self.statusBar.showMessage(f"Playing {label}", 4000)
|
|
else:
|
|
self.playback.enqueueMany(batch, playFirst=False)
|
|
self.statusBar.showMessage(f"Queued {len(batch)} more tracks from {label}", 2000)
|
|
self.enablePlaybackActions(True)
|
|
|
|
def _announceLiveIfAny(self, count: int, label: str):
|
|
"""Announce completion if any tracks were queued."""
|
|
if count > 0:
|
|
self.announceLive(f"Queued {count} tracks from {label}")
|
|
|
|
def _fetchGenreSongsForPlay(self, genreName: str) -> Tuple[List[Song], bool]:
|
|
"""Fetch songs for a genre with throttling for playback."""
|
|
songs = self.client.getSongsByGenre(genreName)
|
|
return songs, False
|
|
|
|
def _onGenreLoadedForPlay(self, genreName: str, item: Optional[QTreeWidgetItem], result: Tuple[List[Song], bool]):
|
|
"""Start playback for a genre and populate UI if present."""
|
|
songs, truncated = result
|
|
self.startPlaybackWithSongs(songs, f"genre {genreName}")
|
|
if item and songs:
|
|
self._populateGenreItemWithSongs(item, songs)
|
|
item.setExpanded(True)
|
|
|
|
def _onPlaylistLoadedForPlay(self, item: Optional[QTreeWidgetItem], playlist: Playlist):
|
|
"""Start playback for a playlist and populate UI if present."""
|
|
songs = playlist.songs
|
|
if not songs:
|
|
AccessibleTextDialog.showWarning("Playlist Empty", "This playlist has no tracks.", parent=self)
|
|
return
|
|
self.playback.clearQueue()
|
|
self.playback.enqueueMany(songs, playFirst=True)
|
|
self.statusBar.showMessage(f"Playing playlist {playlist.name}", 4000)
|
|
if item:
|
|
item.takeChildren()
|
|
for song in songs:
|
|
label = self.formatSongLabel(song, includeAlbum=True)
|
|
child = QTreeWidgetItem([label])
|
|
child.setData(0, Qt.UserRole, {"type": "song", "song": song})
|
|
child.setData(0, Qt.AccessibleTextRole, label)
|
|
item.addChild(child)
|
|
item.setData(0, Qt.UserRole, {"type": "playlist", "id": playlist.id, "playlist": playlist, "loaded": True})
|
|
self.enablePlaybackActions(True)
|
|
|
|
def onTrackChanged(self, song: Song):
|
|
"""Update UI when track changes."""
|
|
self.nowPlayingInfo.setText(f"{song.title} — {song.artist} ({song.album})")
|
|
self.announceTrackAction.setEnabled(True)
|
|
self.announcePositionAction.setEnabled(True)
|
|
self.positionSlider.setEnabled(True)
|
|
self.favoriteAction.setEnabled(True)
|
|
self.unfavoriteAction.setEnabled(True)
|
|
if self.announceTrackChangesEnabled and self._announceOnTrackChange:
|
|
self.announceCurrentTrackLive()
|
|
self._announceOnTrackChange = False
|
|
|
|
def onPlaybackStateChanged(self, state: str):
|
|
"""Update playback state labels and actions."""
|
|
self.playbackStatus.setText(state.capitalize())
|
|
if state in ("playing", "paused"):
|
|
self.playButton.setText("Pause" if state == "playing" else "Play")
|
|
self.playPauseAction.setText("Pause" if state == "playing" else "Play")
|
|
else:
|
|
self.playButton.setText("Play/Pause")
|
|
self.playPauseAction.setText("Play/Pause")
|
|
|
|
def onPositionChanged(self, position: int, duration: int):
|
|
"""Update seek slider and position label."""
|
|
if duration <= 0:
|
|
self.positionLabel.setText("00:00 / 00:00")
|
|
return
|
|
|
|
self.positionLabel.setText(f"{self.formatTime(position)} / {self.formatTime(duration)}")
|
|
sliderValue = int((position / duration) * 1000)
|
|
self.positionSlider.blockSignals(True)
|
|
self.positionSlider.setValue(sliderValue)
|
|
self.positionSlider.blockSignals(False)
|
|
|
|
def onPlaybackError(self, message: str):
|
|
"""Show playback errors accessibly."""
|
|
AccessibleTextDialog.showError("Playback Error", message, parent=self)
|
|
self.playbackStatus.setText("Error")
|
|
|
|
def onRepeatToggled(self, checked: bool):
|
|
"""Cycle repeat mode."""
|
|
mode = "all" if checked else "none"
|
|
self.applyRepeatMode(mode)
|
|
|
|
def onShuffleToggled(self, checked: bool):
|
|
"""Handle shuffle toggle with announcement."""
|
|
self.setShuffleState(checked)
|
|
|
|
def onAnnounceTrackChangesToggled(self, checked: bool):
|
|
"""Persist preference for automatic track change announcements."""
|
|
self.announceTrackChangesEnabled = checked
|
|
self.settings.set("interface", "announceTrackChanges", checked)
|
|
self.announceLive(f"Announce track changes {'enabled' if checked else 'disabled'}")
|
|
|
|
def onSeekRequested(self, value: int):
|
|
"""Seek within the current track using slider position."""
|
|
duration = self.playback.player.duration()
|
|
if duration > 0:
|
|
target = int((value / 1000) * duration)
|
|
self.playback.seek(target)
|
|
|
|
def bringToFront(self):
|
|
"""Raise the window in response to MPRIS Raise."""
|
|
self.showNormal()
|
|
self.raise_()
|
|
self.activateWindow()
|
|
|
|
def setVolumePercent(self, volumePercent: int, announce: bool = False):
|
|
"""Set volume from 0-100 and sync UI/settings."""
|
|
volumePercent = max(0, min(100, volumePercent))
|
|
self.volume = volumePercent
|
|
self.playback.setVolume(volumePercent)
|
|
self.volumeStatus.setText(f"Volume: {self.volume}%")
|
|
self.settings.set("playback", "volume", self.volume)
|
|
if announce:
|
|
self.announceLive(f"Volume {self.volume} percent")
|
|
if self.mpris and self.mpris.available:
|
|
self.mpris.notifyVolumeChanged()
|
|
|
|
def setVolumeFromExternal(self, volumePercent: int):
|
|
"""Apply volume set via MPRIS without re-announcing."""
|
|
self.setVolumePercent(volumePercent, announce=False)
|
|
|
|
def adjustVolume(self, delta: int):
|
|
"""Adjust volume by a delta and update UI/settings."""
|
|
self.setVolumePercent(self.volume + delta)
|
|
|
|
def setShuffleState(self, enabled: bool, announce: bool = True, persist: bool = True):
|
|
"""Apply shuffle setting consistently for UI, playback, and persistence."""
|
|
self.shuffleAction.blockSignals(True)
|
|
self.shuffleAction.setChecked(enabled)
|
|
self.shuffleAction.blockSignals(False)
|
|
|
|
self.playback.setShuffle(enabled)
|
|
self.shuffleEnabled = enabled
|
|
if persist:
|
|
self.settings.set("playback", "shuffle", enabled)
|
|
if announce:
|
|
self.announceLive(f"Shuffle {'enabled' if enabled else 'disabled'}")
|
|
if self.mpris and self.mpris.available:
|
|
self.mpris.notifyShuffleChanged()
|
|
|
|
def setShuffleFromExternal(self, enabled: bool, announce: bool = False):
|
|
"""Handle shuffle updates coming from MPRIS."""
|
|
self.setShuffleState(enabled, announce=announce, persist=True)
|
|
|
|
def applyRepeatMode(self, mode: str, announce: bool = True, persist: bool = True):
|
|
"""Apply repeat mode ('none', 'one', 'all') across UI and playback."""
|
|
if mode not in ("none", "one", "all"):
|
|
mode = "none"
|
|
|
|
self.repeatAction.blockSignals(True)
|
|
self.repeatAction.setChecked(mode != "none")
|
|
if mode == "one":
|
|
self.repeatAction.setText("Repeat One")
|
|
elif mode == "all":
|
|
self.repeatAction.setText("Repeat All")
|
|
else:
|
|
self.repeatAction.setText("Repeat Off")
|
|
self.repeatAction.blockSignals(False)
|
|
|
|
self.repeatMode = mode
|
|
self.playback.setRepeatMode(mode)
|
|
if persist:
|
|
self.settings.set("playback", "repeat", mode)
|
|
if announce:
|
|
self.announceLive(f"Repeat {'enabled' if mode != 'none' else 'disabled'}")
|
|
if self.mpris and self.mpris.available:
|
|
self.mpris.notifyLoopStatus()
|
|
|
|
def setRepeatFromExternal(self, loopStatus: str, announce: bool = False):
|
|
"""Map MPRIS loop status to internal repeat modes."""
|
|
mode = {"Track": "one", "Playlist": "all", "None": "none"}.get(loopStatus, "none")
|
|
self.applyRepeatMode(mode, announce=announce, persist=True)
|
|
|
|
def announceTrack(self):
|
|
"""Speak current track details via an accessible dialog."""
|
|
text = self.nowPlayingInfo.text()
|
|
AccessibleTextDialog.showInfo("Current Track", text, parent=self)
|
|
|
|
def announcePosition(self):
|
|
"""Speak current playback position."""
|
|
AccessibleTextDialog.showInfo("Playback Position", self.positionLabel.text(), parent=self)
|
|
|
|
def announceCurrentTrackLive(self):
|
|
"""Announce the currently playing track via the live region."""
|
|
song = self.playback.currentSong()
|
|
message = self.buildTrackAnnouncement(song)
|
|
self.announceLive(message)
|
|
|
|
def enablePlaybackActions(self, enabled: bool):
|
|
"""Enable or disable playback-related actions."""
|
|
self.playPauseAction.setEnabled(enabled)
|
|
self.stopAction.setEnabled(enabled)
|
|
self.previousAction.setEnabled(enabled)
|
|
self.nextAction.setEnabled(enabled)
|
|
self.shuffleAction.setEnabled(True)
|
|
self.repeatAction.setEnabled(True)
|
|
self.volumeUpAction.setEnabled(True)
|
|
self.volumeDownAction.setEnabled(True)
|
|
self.favoriteAction.setEnabled(enabled)
|
|
self.unfavoriteAction.setEnabled(enabled)
|
|
|
|
self.prevButton.setEnabled(enabled)
|
|
self.playButton.setEnabled(enabled)
|
|
self.stopButton.setEnabled(enabled)
|
|
self.nextButton.setEnabled(enabled)
|
|
self.positionSlider.setEnabled(enabled)
|
|
|
|
# ============ Helpers ============
|
|
|
|
def _setLoadingPlaceholder(self, item: QTreeWidgetItem, text: str):
|
|
"""Replace children with a single loading placeholder."""
|
|
item.takeChildren()
|
|
item.addChild(QTreeWidgetItem([text]))
|
|
|
|
def _markItemLoaded(self, item: QTreeWidgetItem):
|
|
"""Mark a tree item as loaded in its user data."""
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
data["loaded"] = True
|
|
item.setData(0, Qt.UserRole, data)
|
|
|
|
def _handleItemLoadError(self, item: QTreeWidgetItem, message: str):
|
|
"""Show a load error and restore a placeholder child."""
|
|
AccessibleTextDialog.showError("Library Error", f"Could not load content: {message}", parent=self)
|
|
self._setLoadingPlaceholder(item, "Failed to load.")
|
|
|
|
def _run_background(
|
|
self,
|
|
label: str,
|
|
func: Callable[[], Any],
|
|
on_success: Optional[Callable[[Any], None]] = None,
|
|
on_error: Optional[Callable[[Exception], None]] = None
|
|
):
|
|
"""Execute blocking work off the UI thread and marshal results back."""
|
|
self.logger.info("Starting background task: %s", label)
|
|
future = self._executor.submit(func)
|
|
|
|
def _done(fut):
|
|
try:
|
|
result = fut.result()
|
|
except Exception as exc: # noqa: BLE001
|
|
self.logger.error("Background task '%s' failed: %s", label, exc)
|
|
if on_error:
|
|
self._on_ui(lambda: on_error(exc))
|
|
return
|
|
if on_success:
|
|
self._on_ui(lambda: on_success(result))
|
|
self.logger.info("Background task '%s' completed", label)
|
|
|
|
future.add_done_callback(_done)
|
|
|
|
def _on_ui(self, callback: Callable[[], None]):
|
|
"""Post a callable to the Qt event loop with error logging."""
|
|
def wrapped():
|
|
try:
|
|
callback()
|
|
except Exception as exc: # noqa: BLE001
|
|
self.logger.exception("UI callback failed: %s", exc)
|
|
# Ensure it runs on the main thread even when scheduled from workers
|
|
QTimer.singleShot(0, self, wrapped)
|
|
|
|
def _throttle_requests(self, count: int):
|
|
"""Sleep briefly every N requests to be gentle on the server.
|
|
|
|
Skips throttling for the first bulkThrottleStartAfter requests so
|
|
playback can start quickly, then throttles subsequent requests.
|
|
"""
|
|
if self.bulkRequestBatch <= 0 or self.bulkThrottleSeconds <= 0:
|
|
return
|
|
if count <= self.bulkThrottleStartAfter:
|
|
return
|
|
if count % self.bulkRequestBatch == 0:
|
|
time.sleep(self.bulkThrottleSeconds)
|
|
|
|
def keyPressEvent(self, event):
|
|
"""Handle high-priority shortcuts that might be consumed by child widgets."""
|
|
if event.matches(QKeySequence.Find):
|
|
self.openSearchDialog()
|
|
event.accept()
|
|
return
|
|
super().keyPressEvent(event)
|
|
|
|
@staticmethod
|
|
def formatTime(ms: int) -> str:
|
|
"""Convert milliseconds to mm:ss or hh:mm:ss."""
|
|
totalSeconds = ms // 1000
|
|
hours = totalSeconds // 3600
|
|
minutes = (totalSeconds % 3600) // 60
|
|
seconds = totalSeconds % 60
|
|
if hours:
|
|
return f"{hours}:{minutes:02d}:{seconds:02d}"
|
|
return f"{minutes:02d}:{seconds:02d}"
|
|
|
|
def copyToClipboard(self):
|
|
"""Allow AccessibleTreeWidget to copy the current item's text."""
|
|
item = self.libraryTree.currentItem()
|
|
if item:
|
|
QGuiApplication.clipboard().setText(item.text(0))
|
|
|
|
def buildTrackAnnouncement(self, song: Optional[Song]) -> str:
|
|
"""Construct a live announcement for the current track with fallbacks."""
|
|
if not song:
|
|
return "No track playing."
|
|
|
|
title = (song.title or "").strip()
|
|
artist = (song.artist or "").strip()
|
|
album = (song.album or "").strip()
|
|
filename = Path(song.path).name if song.path else ""
|
|
|
|
if title and artist and album:
|
|
return f"{title} by {artist} from {album}"
|
|
if title and artist:
|
|
return f"{title} by {artist}"
|
|
if filename:
|
|
return filename
|
|
if title:
|
|
return title
|
|
return "Unknown track"
|
|
|
|
def buildFavoriteAnnouncement(self, action: str, song: Optional[Song], label: str) -> str:
|
|
"""Create an announcement for favorite/unfavorite actions with track fallbacks."""
|
|
base = self.buildTrackAnnouncement(song) if song else (label or "item")
|
|
verb = "added to favorites" if action == "favorite" else "removed from favorites"
|
|
return f"{base} {verb}"
|
|
|
|
def buildFavoriteFailureAnnouncement(self, action: str, song: Optional[Song], label: str) -> str:
|
|
"""Create an error announcement for favorite/unfavorite actions."""
|
|
base = self.buildTrackAnnouncement(song) if song else (label or "item")
|
|
verb = "favorite" if action == "favorite" else "unfavorite"
|
|
return f"Could not {verb} {base}"
|
|
|
|
def _resolveCoverArtUrl(self, coverId: Optional[str]) -> Optional[str]:
|
|
"""Provide a cover art URL for MPRIS metadata when possible."""
|
|
if not coverId or not self.client:
|
|
return None
|
|
try:
|
|
return self.client.getCoverArtUrl(coverId)
|
|
except Exception:
|
|
return None
|
|
|
|
def announceLive(self, text: str):
|
|
"""Update a live region and status bar for screen readers."""
|
|
self.liveRegion.setText(text)
|
|
self.statusBar.showMessage(text, 2000)
|
|
try:
|
|
# Prefer explicit announcement event where supported
|
|
ann = QAccessibleAnnouncementEvent(self.liveRegion, text)
|
|
QAccessible.updateAccessibility(ann)
|
|
except Exception:
|
|
try:
|
|
event = QAccessibleEvent(self.liveRegion, QAccessible.Event.Alert)
|
|
QAccessible.updateAccessibility(event)
|
|
except Exception:
|
|
# As a last resort, change accessible name to trigger a change event
|
|
self.liveRegion.setAccessibleName(text)
|
|
|
|
# Shortcut handlers with feedback when no playback is active
|
|
|
|
def handlePreviousShortcut(self, checked: bool = False):
|
|
"""Go to previous track or announce why it cannot."""
|
|
if not self.playback.queue:
|
|
self.announceLive("No tracks in the queue.")
|
|
return
|
|
if not self.playback.currentSong():
|
|
self.announceLive("No track playing.")
|
|
return
|
|
self._announceOnTrackChange = True
|
|
self.playback.previous()
|
|
|
|
def handlePlayShortcut(self, checked: bool = False):
|
|
"""Start or resume playback, announcing if nothing is queued."""
|
|
if not self.playback.queue:
|
|
self.announceLive("No tracks in the queue.")
|
|
return
|
|
self.playback.play()
|
|
|
|
def handlePauseToggleShortcut(self, checked: bool = False):
|
|
"""Toggle pause or announce when nothing is playing."""
|
|
if not self.playback.currentSong():
|
|
message = "No tracks in the queue." if not self.playback.queue else "No track playing."
|
|
self.announceLive(message)
|
|
return
|
|
self.playPauseAction.trigger()
|
|
|
|
def handleStopShortcut(self, checked: bool = False):
|
|
"""Stop playback or announce when nothing is playing."""
|
|
if not self.playback.currentSong():
|
|
self.announceLive("Nothing is playing.")
|
|
return
|
|
self.stopAction.trigger()
|
|
|
|
def handleNextShortcut(self, checked: bool = False):
|
|
"""Advance to next track or announce when it is unavailable."""
|
|
if not self.playback.queue:
|
|
self.announceLive("No tracks in the queue.")
|
|
return
|
|
if not self.playback.currentSong():
|
|
self.announceLive("No track playing.")
|
|
return
|
|
self._announceOnTrackChange = True
|
|
self.playback.next()
|
|
|
|
def applyPersistedPlaybackSettings(self):
|
|
"""Restore saved shuffle and repeat preferences without announcing."""
|
|
shuffleEnabled = bool(self.shuffleEnabled)
|
|
repeatMode = self.repeatMode if self.repeatMode in ("none", "one", "all") else "none"
|
|
|
|
self.setShuffleState(shuffleEnabled, announce=False, persist=False)
|
|
self.applyRepeatMode(repeatMode, announce=False, persist=False)
|
|
|
|
self.announceTrackChangesAction.blockSignals(True)
|
|
self.announceTrackChangesAction.setChecked(self.announceTrackChangesEnabled)
|
|
self.announceTrackChangesAction.blockSignals(False)
|
|
|
|
def _handleQueueChangedForMpris(self, queue: List[Song]):
|
|
"""Update MPRIS capabilities and metadata when the queue changes."""
|
|
if not self.mpris or not self.mpris.available:
|
|
return
|
|
self.mpris.notifyCapabilities()
|
|
if not queue:
|
|
self.mpris.updateMetadata(None)
|
|
|
|
def getCurrentMediaTarget(self):
|
|
"""Return (type, id, label, song) for the most relevant selection."""
|
|
# Library tree selection
|
|
item = self.libraryTree.currentItem()
|
|
if item:
|
|
data: Dict = item.data(0, Qt.UserRole) or {}
|
|
itemType = data.get("type")
|
|
if itemType == "song":
|
|
song: Song = data["song"]
|
|
label = self.buildTrackAnnouncement(song)
|
|
return ("song", song.id, label, song)
|
|
if itemType in ("album", "artist"):
|
|
label = item.text(0)
|
|
return (itemType, data.get("id"), label, None)
|
|
|
|
# Queue selection
|
|
row = self.queueList.currentRow()
|
|
if 0 <= row < len(self.playback.queue):
|
|
song = self.playback.queue[row]
|
|
label = self.buildTrackAnnouncement(song)
|
|
return ("song", song.id, label, song)
|
|
|
|
# Now playing
|
|
current = self.playback.currentSong()
|
|
if current:
|
|
label = self.buildTrackAnnouncement(current)
|
|
return ("song", current.id, label, current)
|
|
return None
|
|
|
|
def favoriteSelection(self):
|
|
"""Favorite the selected or currently playing item."""
|
|
target = self.getCurrentMediaTarget()
|
|
if not target or not self.client:
|
|
AccessibleTextDialog.showWarning("Favorite", "No item selected to favorite.", parent=self)
|
|
return
|
|
itemType, itemId, label, song = target
|
|
if itemType != "song":
|
|
if not self._confirmBulkAction("favorite", label):
|
|
self.statusBar.showMessage("Favorite canceled", 2000)
|
|
return
|
|
try:
|
|
self.client.star(itemId, itemType)
|
|
message = self.buildFavoriteAnnouncement("favorite", song, label)
|
|
self.statusBar.showMessage(message, 3000)
|
|
self.announceLive(message)
|
|
except Exception as e:
|
|
self.announceLive(self.buildFavoriteFailureAnnouncement("favorite", song, label))
|
|
AccessibleTextDialog.showError("Favorite Failed", str(e), parent=self)
|
|
|
|
def unfavoriteSelection(self):
|
|
"""Unfavorite the selected or currently playing item."""
|
|
target = self.getCurrentMediaTarget()
|
|
if not target or not self.client:
|
|
AccessibleTextDialog.showWarning("Unfavorite", "No item selected to unfavorite.", parent=self)
|
|
return
|
|
itemType, itemId, label, song = target
|
|
if itemType != "song":
|
|
if not self._confirmBulkAction("unfavorite", label):
|
|
self.statusBar.showMessage("Unfavorite canceled", 2000)
|
|
return
|
|
try:
|
|
self.client.unstar(itemId, itemType)
|
|
message = self.buildFavoriteAnnouncement("unfavorite", song, label)
|
|
self.statusBar.showMessage(message, 3000)
|
|
self.announceLive(message)
|
|
except Exception as e:
|
|
self.announceLive(self.buildFavoriteFailureAnnouncement("unfavorite", song, label))
|
|
AccessibleTextDialog.showError("Unfavorite Failed", str(e), parent=self)
|
|
|
|
def showAbout(self):
|
|
"""Show the about dialog"""
|
|
AccessibleTextDialog.showInfo(
|
|
"About NaviPy",
|
|
"NaviPy - Accessible Navidrome Client\n\n"
|
|
"An accessible music player for Navidrome servers.\n\n"
|
|
"Built with PySide6 for full keyboard navigation "
|
|
"and screen reader compatibility.\n\n"
|
|
"Version: 0.1.0 (Development)",
|
|
parent=self
|
|
)
|
|
|
|
def closeEvent(self, event):
|
|
"""Handle window close"""
|
|
# Signal background tasks to stop
|
|
self._shutting_down = True
|
|
|
|
# Signal cached client to stop any running sync
|
|
if self.client:
|
|
self.client.shutdown()
|
|
|
|
self.settings.set("playback", "volume", self.volume)
|
|
self.settings.save()
|
|
|
|
# Stop playback and release media resources
|
|
try:
|
|
self.playback.player.stop()
|
|
self.playback.player.setSource("")
|
|
except Exception:
|
|
pass
|
|
|
|
# Shutdown MPRIS (stops GLib main loop thread)
|
|
if self.mpris:
|
|
try:
|
|
self.mpris.shutdown()
|
|
except Exception:
|
|
pass
|
|
|
|
# Shutdown thread pool - don't wait for tasks
|
|
if self._executor:
|
|
self._executor.shutdown(wait=False, cancel_futures=True)
|
|
|
|
self.logger.info("Main window closed")
|
|
event.accept()
|
|
|
|
# Schedule forced exit after event loop processes the close
|
|
def forceExit():
|
|
import os
|
|
os._exit(0)
|
|
QTimer.singleShot(100, forceExit)
|