Audiobookshelf support mostly working.

This commit is contained in:
Storm Dragon
2025-10-05 20:19:16 -04:00
parent 1d19ed377c
commit 4387a5cb56
14 changed files with 3979 additions and 67 deletions

148
.gitignore vendored Normal file
View File

@@ -0,0 +1,148 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# BookStorm specific
# User data and caches (not tracked in git)
*.m4b
*.m4a
*.mp3
*.epub
*.pdf
*.txt
*.zip
!requirements.txt
# Config and user data directories (stored in ~/.config and ~/.bookstorm)
# These are not in the repo, but listing for clarity
# ~/.config/stormux/bookstorm/
# ~/.bookstorm/
# ~/.cache/bookstorm/

File diff suppressed because it is too large Load Diff

233
src/audio_parser.py Normal file
View File

@@ -0,0 +1,233 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Audio Book Parser
Parses audio book files (M4B, M4A, MP3) and extracts chapter information.
Uses mutagen for metadata extraction.
"""
from pathlib import Path
from src.book import Book, Chapter
class AudioChapter(Chapter):
"""Chapter with audio-specific metadata"""
def __init__(self, title="Untitled", startTime=0.0, duration=0.0):
"""
Initialize audio chapter
Args:
title: Chapter title
startTime: Start time in seconds
duration: Duration in seconds
"""
super().__init__(title)
self.startTime = startTime
self.duration = duration
# Add placeholder paragraph for compatibility
self.paragraphs = [f"Audio chapter: {title}"]
class AudioBook(Book):
"""Book with audio-specific metadata"""
def __init__(self, title="Untitled", author="Unknown", audioPath=None):
"""
Initialize audio book
Args:
title: Book title
author: Book author
audioPath: Path to audio file
"""
super().__init__(title, author)
self.audioPath = audioPath
self.isAudioBook = True
self.totalDuration = 0.0
class AudioParser:
"""Parser for audio book files (M4B, M4A, MP3)"""
def __init__(self):
"""Initialize audio parser"""
self.mutagen = None
try:
import mutagen
import mutagen.mp4
import mutagen.mp3
self.mutagen = mutagen
except ImportError:
print("Warning: mutagen not installed. Install with: pip install mutagen")
def parse(self, audioPath):
"""
Parse audio file and extract chapters
Args:
audioPath: Path to audio file
Returns:
AudioBook object
"""
if not self.mutagen:
raise ImportError("mutagen library required for audio books")
audioPath = Path(audioPath)
# Try to load the audio file
try:
audioFile = self.mutagen.File(audioPath)
if audioFile is None:
raise ValueError(f"Could not read audio file: {audioPath}")
except Exception as e:
raise Exception(f"Error loading audio file: {e}")
# Create audio book
book = AudioBook(audioPath=str(audioPath))
# Extract metadata
title = self._extract_title(audioFile, audioPath)
author = self._extract_author(audioFile)
book.title = title
book.author = author
# Get duration
if hasattr(audioFile.info, 'length'):
book.totalDuration = audioFile.info.length
# Extract chapters
chapters = self._extract_chapters(audioFile, audioPath)
if chapters:
# Add parsed chapters
for chapter in chapters:
book.add_chapter(chapter)
else:
# No chapter markers - treat entire file as one chapter
singleChapter = AudioChapter(
title=title,
startTime=0.0,
duration=book.totalDuration
)
book.add_chapter(singleChapter)
return book
def _extract_title(self, audioFile, audioPath):
"""Extract title from metadata"""
# Try various title tags
titleTags = ['\xa9nam', 'TIT2', 'title', 'TITLE']
for tag in titleTags:
if tag in audioFile:
value = audioFile[tag]
if isinstance(value, list):
return str(value[0])
return str(value)
# Fallback to filename
return audioPath.stem
def _extract_author(self, audioFile):
"""Extract author from metadata"""
# Try various author tags
authorTags = ['\xa9ART', 'TPE1', 'artist', 'ARTIST', 'author', 'AUTHOR']
for tag in authorTags:
if tag in audioFile:
value = audioFile[tag]
if isinstance(value, list):
return str(value[0])
return str(value)
return "Unknown"
def _extract_chapters(self, audioFile, audioPath):
"""Extract chapter information from audio file"""
chapters = []
# Try MP4 chapter format (M4B, M4A)
if hasattr(audioFile, 'tags') and audioFile.tags:
# MP4 files store chapters in a special way
if hasattr(audioFile.tags, '_DictProxy__dict'):
tagsDict = audioFile.tags._DictProxy__dict
if 'chapters' in tagsDict:
mp4Chapters = tagsDict['chapters']
chapters = self._parse_mp4_chapters(mp4Chapters)
# Try MP3 chapter format (ID3 CHAP frames)
if not chapters and hasattr(audioFile, 'tags'):
# Look for CHAP frames in ID3 tags
if hasattr(audioFile.tags, 'getall'):
chapFrames = audioFile.tags.getall('CHAP')
if chapFrames:
chapters = self._parse_id3_chapters(chapFrames)
return chapters
def _parse_mp4_chapters(self, mp4Chapters):
"""Parse MP4 chapter list"""
chapters = []
for i, chapterData in enumerate(mp4Chapters):
if isinstance(chapterData, tuple) and len(chapterData) >= 2:
startTime = chapterData[0] / 1000.0 # Convert ms to seconds
chapterTitle = chapterData[1] if chapterData[1] else f"Chapter {i + 1}"
# Calculate duration (will be updated when we know the next chapter's start)
duration = 0.0
chapter = AudioChapter(
title=chapterTitle,
startTime=startTime,
duration=duration
)
chapters.append(chapter)
# Calculate durations based on next chapter's start time
for i in range(len(chapters) - 1):
chapters[i].duration = chapters[i + 1].startTime - chapters[i].startTime
# Last chapter duration will be set by total book duration later
return chapters
def _parse_id3_chapters(self, chapFrames):
"""Parse ID3 CHAP frames (MP3 chapters)"""
chapters = []
for i, chapFrame in enumerate(chapFrames):
startTime = 0.0
chapterTitle = f"Chapter {i + 1}"
# Extract start time and title from CHAP frame
# Note: start_time and sub_frames are external library attributes
if hasattr(chapFrame, 'start_time'):
# pylint: disable=no-member
startTime = chapFrame.start_time / 1000.0 # Convert ms to seconds
if hasattr(chapFrame, 'sub_frames'):
# pylint: disable=no-member
for subFrame in chapFrame.sub_frames:
if hasattr(subFrame, 'text'):
chapterTitle = str(subFrame.text[0]) if subFrame.text else chapterTitle
chapter = AudioChapter(
title=chapterTitle,
startTime=startTime,
duration=0.0
)
chapters.append(chapter)
# Calculate durations
for i in range(len(chapters) - 1):
chapters[i].duration = chapters[i + 1].startTime - chapters[i].startTime
return chapters
def cleanup(self):
"""Clean up resources (no temp files for audio)"""
pass

View File

@@ -0,0 +1,712 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Audiobookshelf API Client
Handles communication with Audiobookshelf server for browsing,
streaming, and syncing audiobooks.
"""
import requests
import json
from typing import Optional, Dict, List
class AudiobookshelfClient:
"""Client for Audiobookshelf API"""
def __init__(self, serverUrl: str, configManager=None):
"""
Initialize Audiobookshelf client
Args:
serverUrl: Base URL of Audiobookshelf server (e.g., https://abs.example.com)
configManager: ConfigManager instance for token storage
"""
# Remove trailing slash from server URL
self.serverUrl = serverUrl.rstrip('/')
self.configManager = configManager
self.authToken = None
# Load saved token if available
if configManager:
savedToken = configManager.get_abs_auth_token()
if savedToken:
self.authToken = savedToken
def login(self, username: str, password: str) -> bool:
"""
Login to Audiobookshelf server
Args:
username: Username
password: Password
Returns:
True if login successful, False otherwise
"""
try:
url = f"{self.serverUrl}/login"
payload = {
'username': username,
'password': password
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
data = response.json()
# Token is in response.user.token
if 'user' in data and 'token' in data['user']:
self.authToken = data['user']['token']
# Save token to config
if self.configManager:
self.configManager.set_abs_auth_token(self.authToken)
self.configManager.set_abs_username(username)
return True
else:
print("ERROR: Token not found in login response")
return False
else:
print(f"Login failed: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"Login error: {e}")
return False
def test_connection(self) -> bool:
"""
Test connection to server with current token
Returns:
True if connection successful, False otherwise
"""
if not self.authToken:
print("ERROR: No auth token available")
return False
try:
# Try to get user info to verify token
url = f"{self.serverUrl}/api/me"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return True
elif response.status_code == 401:
print("ERROR: Auth token invalid or expired")
# Clear invalid token
self.authToken = None
if self.configManager:
self.configManager.set_abs_auth_token('')
return False
else:
print(f"Connection test failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"Connection test error: {e}")
return False
def get_libraries(self) -> Optional[List[Dict]]:
"""
Get list of libraries from server
Returns:
List of library dictionaries, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}/api/libraries"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
# Response should be a dict with 'libraries' key
if isinstance(data, dict) and 'libraries' in data:
return data['libraries']
elif isinstance(data, list):
return data
else:
print(f"Unexpected response format: {data}")
return None
else:
print(f"Get libraries failed: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Get libraries error: {e}")
return None
def get_library_items(self, libraryId: str, limit: int = 100, page: int = 0) -> Optional[List[Dict]]:
"""
Get audiobooks in a library
Args:
libraryId: Library ID
limit: Max items to return (default 100)
page: Page number for pagination (default 0)
Returns:
List of library item dictionaries, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}/api/libraries/{libraryId}/items"
headers = {'Authorization': f'Bearer {self.authToken}'}
params = {'limit': limit, 'page': page}
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
# Response format varies - check for 'results' key
if isinstance(data, dict) and 'results' in data:
return data['results']
elif isinstance(data, list):
return data
else:
print(f"Unexpected response format: {data}")
return None
else:
print(f"Get library items failed: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Get library items error: {e}")
return None
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[requests.Response]:
"""
Make authenticated API request
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint (e.g., '/api/libraries')
**kwargs: Additional arguments for requests
Returns:
Response object or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}{endpoint}"
headers = kwargs.pop('headers', {})
headers['Authorization'] = f'Bearer {self.authToken}'
response = requests.request(
method=method,
url=url,
headers=headers,
timeout=kwargs.pop('timeout', 30),
**kwargs
)
return response
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
return None
def is_authenticated(self) -> bool:
"""Check if client has valid authentication token"""
return bool(self.authToken)
def get_library_item_details(self, itemId: str) -> Optional[Dict]:
"""
Get detailed information about a library item
Args:
itemId: Library item ID
Returns:
Item details dictionary, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
# Try both endpoint patterns (API changed between versions)
endpoints = [
f"/api/items/{itemId}", # v2.3+ (current)
f"/api/library-items/{itemId}", # v2.0-2.2 (legacy)
]
headers = {'Authorization': f'Bearer {self.authToken}'}
for endpoint in endpoints:
url = f"{self.serverUrl}{endpoint}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
# Try next endpoint
continue
else:
# Other error, don't try more endpoints
print(f"Get item details failed: {response.status_code}")
return None
# All endpoints failed
print(f"Get item details failed: Item not found (tried both /api/items and /api/library-items)")
return None
except requests.exceptions.RequestException as e:
print(f"Get item details error: {e}")
return None
def download_audio_file(self, itemId: str, outputPath: str, progressCallback=None) -> bool:
"""
Download audiobook file to local path
Args:
itemId: Library item ID
outputPath: Path to save the downloaded file
progressCallback: Optional callback for progress updates (percent)
Returns:
True if download successful, False otherwise
"""
if not self.authToken:
print("ERROR: Not authenticated")
return False
try:
# Use the download endpoint
# Format: /api/items/{itemId}/download?token={token}
downloadUrl = f"{self.serverUrl}/api/items/{itemId}/file"
headers = {'Authorization': f'Bearer {self.authToken}'}
print(f"DEBUG: Downloading from: {downloadUrl}")
# Download with streaming to handle large files
response = requests.get(downloadUrl, headers=headers, stream=True, timeout=30)
if response.status_code != 200:
print(f"Download failed: {response.status_code}")
return False
# Get total file size
totalSize = int(response.headers.get('content-length', 0))
# Download to file
downloaded = 0
with open(outputPath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Progress callback
if progressCallback and totalSize > 0:
percent = int((downloaded / totalSize) * 100)
progressCallback(percent)
print(f"Download complete: {outputPath}")
return True
except requests.exceptions.RequestException as e:
print(f"Download error: {e}")
return False
except IOError as e:
print(f"File write error: {e}")
return False
def get_stream_url(self, itemId: str, itemDetails: Optional[Dict] = None) -> Optional[str]:
"""
Get streaming URL for an audiobook using /play endpoint
Args:
itemId: Library item ID
itemDetails: Optional pre-fetched item details (not required)
Returns:
Streaming URL with auth token, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
# Validate item exists and has audio content (optional check)
if itemDetails:
media = itemDetails.get('media', {})
numAudioFiles = media.get('numAudioFiles', 0)
duration = media.get('duration', 0.0)
print(f"DEBUG: Item has {numAudioFiles} audio files, duration {duration}s")
# Use the /play endpoint which creates a playback session and returns stream info
# This is what the web player uses
playUrl = f"{self.serverUrl}/api/items/{itemId}/play"
print(f"DEBUG: Requesting play session from: {playUrl}")
response = requests.post(
playUrl,
headers={'Authorization': f'Bearer {self.authToken}'},
json={
'deviceInfo': {
'deviceId': 'bookstorm',
'clientName': 'BookStorm'
},
'forceDirectPlay': False,
'forceTranscode': False,
'supportedMimeTypes': ['audio/mpeg', 'audio/mp4', 'audio/flac', 'audio/ogg']
},
timeout=10
)
if response.status_code != 200:
print(f"ERROR: /play endpoint failed with status {response.status_code}")
print(f"Response: {response.text}")
return None
playData = response.json()
print(f"DEBUG: Play response keys: {list(playData.keys())}")
# Extract the actual stream URL from the play response
# The response contains either 'audioTracks' or direct 'url'
streamUrl = None
# Try different response formats
if 'audioTracks' in playData and playData['audioTracks']:
# Multi-file audiobook - use first track or concatenated stream
audioTrack = playData['audioTracks'][0]
streamUrl = audioTrack.get('contentUrl')
print(f"DEBUG: Using audioTrack URL")
elif 'url' in playData:
# Direct URL
streamUrl = playData.get('url')
print(f"DEBUG: Using direct URL")
elif 'contentUrl' in playData:
# Alternative format
streamUrl = playData.get('contentUrl')
print(f"DEBUG: Using contentUrl")
if not streamUrl:
print(f"ERROR: No stream URL found in play response")
print(f"Play data: {playData}")
return None
# Make URL absolute if it's relative
if streamUrl.startswith('/'):
streamUrl = f"{self.serverUrl}{streamUrl}"
print(f"DEBUG: Stream URL: {streamUrl[:100]}...")
return streamUrl
except Exception as e:
print(f"Get stream URL error: {e}")
import traceback
traceback.print_exc()
return None
def get_item_chapters(self, itemId: str) -> Optional[List[Dict]]:
"""
Get chapter information for an audiobook
Args:
itemId: Library item ID
Returns:
List of chapter dictionaries, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
# Get item details which includes chapter info
itemDetails = self.get_library_item_details(itemId)
if not itemDetails:
return None
# Extract chapters from media
media = itemDetails.get('media', {})
chapters = media.get('chapters', [])
return chapters if chapters else None
except Exception as e:
print(f"Get chapters error: {e}")
return None
def update_progress(self, itemId: str, currentTime: float, duration: float, progress: float) -> bool:
"""
Update playback progress for an item
Args:
itemId: Library item ID
currentTime: Current playback time in seconds
duration: Total duration in seconds
progress: Progress as decimal (0.0 to 1.0)
Returns:
True if update successful, False otherwise
"""
if not self.authToken:
print("ERROR: Not authenticated")
return False
try:
url = f"{self.serverUrl}/api/me/progress/{itemId}"
headers = {'Authorization': f'Bearer {self.authToken}'}
payload = {
'currentTime': currentTime,
'duration': duration,
'progress': progress,
'isFinished': progress >= 0.99
}
response = requests.patch(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
return True
else:
print(f"Update progress failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"Update progress error: {e}")
return False
def get_progress(self, itemId: str) -> Optional[Dict]:
"""
Get playback progress for an item
Args:
itemId: Library item ID
Returns:
Progress dictionary, or None if error or no progress
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}/api/me/progress/{itemId}"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
# No progress found
return None
else:
print(f"Get progress failed: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Get progress error: {e}")
return None
def create_session(self, itemId: str) -> Optional[str]:
"""
Create a new listening session
Args:
itemId: Library item ID
Returns:
Session ID, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
# Try the correct endpoint for starting a playback session
url = f"{self.serverUrl}/api/session/local"
headers = {'Authorization': f'Bearer {self.authToken}'}
payload = {
'libraryItemId': itemId,
'mediaPlayer': 'BookStorm',
'deviceInfo': {
'deviceId': 'bookstorm',
'clientName': 'BookStorm'
}
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
sessionId = data.get('id')
print(f"DEBUG: Session created successfully: {sessionId}")
return sessionId
else:
# Session creation not critical, just log and continue
print(f"Note: Could not create session (status {response.status_code}), continuing without session tracking")
return None
except requests.exceptions.RequestException as e:
# Session creation not critical, just log and continue
print(f"Note: Could not create session ({e}), continuing without session tracking")
return None
def sync_session(self, sessionId: str, currentTime: float, duration: float, progress: float) -> bool:
"""
Sync session progress
Args:
sessionId: Session ID
currentTime: Current playback time in seconds
duration: Total duration in seconds
progress: Progress as decimal (0.0 to 1.0)
Returns:
True if sync successful, False otherwise
"""
if not self.authToken:
print("ERROR: Not authenticated")
return False
try:
url = f"{self.serverUrl}/api/session/{sessionId}/sync"
headers = {'Authorization': f'Bearer {self.authToken}'}
payload = {
'currentTime': currentTime,
'duration': duration,
'progress': progress
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
return True
else:
print(f"Sync session failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"Sync session error: {e}")
return False
def close_session(self, sessionId: str) -> bool:
"""
Close a listening session
Args:
sessionId: Session ID
Returns:
True if close successful, False otherwise
"""
if not self.authToken:
print("ERROR: Not authenticated")
return False
try:
url = f"{self.serverUrl}/api/session/{sessionId}/close"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.post(url, headers=headers, timeout=10)
if response.status_code == 200:
return True
else:
print(f"Close session failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"Close session error: {e}")
return False
def get_library_series(self, libraryId: str) -> Optional[List[Dict]]:
"""
Get series in a library
Args:
libraryId: Library ID
Returns:
List of series dictionaries, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}/api/libraries/{libraryId}/series"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
# Response format may vary
if isinstance(data, dict) and 'series' in data:
return data['series']
elif isinstance(data, dict) and 'results' in data:
return data['results']
elif isinstance(data, list):
return data
else:
return None
else:
print(f"Get series failed: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Get series error: {e}")
return None
def get_library_collections(self, libraryId: str) -> Optional[List[Dict]]:
"""
Get collections in a library
Args:
libraryId: Library ID
Returns:
List of collection dictionaries, or None if error
"""
if not self.authToken:
print("ERROR: Not authenticated")
return None
try:
url = f"{self.serverUrl}/api/libraries/{libraryId}/collections"
headers = {'Authorization': f'Bearer {self.authToken}'}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
# Response format may vary
if isinstance(data, dict) and 'collections' in data:
return data['collections']
elif isinstance(data, dict) and 'results' in data:
return data['results']
elif isinstance(data, list):
return data
else:
return None
else:
print(f"Get collections failed: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Get collections error: {e}")
return None

579
src/audiobookshelf_menu.py Normal file
View File

@@ -0,0 +1,579 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Audiobookshelf Menu
Interactive browser for Audiobookshelf server content.
Browse libraries, audiobooks, series, and collections.
"""
from pathlib import Path
class AudiobookshelfMenu:
"""Audiobookshelf browser interface"""
def __init__(self, absClient, configManager, speechEngine=None):
"""
Initialize Audiobookshelf menu
Args:
absClient: AudiobookshelfClient instance
configManager: ConfigManager instance
speechEngine: SpeechEngine instance for accessibility
"""
self.absClient = absClient
self.configManager = configManager
self.speechEngine = speechEngine
# Menu state
self.inMenu = False
self.currentView = 'libraries' # 'libraries', 'books', 'stream_download'
self.currentSelection = 0
self.items = []
# Context tracking
self.selectedLibrary = None
self.selectedBook = None
# View mode for books list
self.booksViewMode = 'all' # 'all', 'series', 'collections'
# Stream/download submenu options
self.streamDownloadOptions = [
{'action': 'stream', 'label': 'Stream from server'},
{'action': 'download', 'label': 'Download to library'},
{'action': 'cancel', 'label': 'Cancel'}
]
def enter_menu(self):
"""Enter the Audiobookshelf browser"""
self.inMenu = True
self.currentSelection = 0
# Reset state from previous sessions
self.selectedLibrary = None
self.selectedBook = None
self.booksViewMode = 'all'
if self.speechEngine:
self.speechEngine.speak("Audiobookshelf browser. Loading libraries...")
# Get libraries from server
libraries = self.absClient.get_libraries()
if not libraries:
if self.speechEngine:
self.speechEngine.speak("Error loading libraries. Check server connection.")
self.inMenu = False
return
# If only one library, skip library selection and go straight to books
if len(libraries) == 1:
self.selectedLibrary = libraries[0]
self._load_books()
else:
# Multiple libraries - show library selection
self.currentView = 'libraries'
self.items = libraries
if self.speechEngine:
self.speechEngine.speak(f"Found {len(libraries)} libraries. Use arrow keys to navigate.")
self._speak_current_item()
def _load_books(self):
"""Load books from selected library"""
if not self.selectedLibrary:
return
libraryId = self.selectedLibrary.get('id')
if not libraryId:
if self.speechEngine:
self.speechEngine.speak("Error: Invalid library")
return
if self.speechEngine:
libraryName = self.selectedLibrary.get('name', 'Library')
self.speechEngine.speak(f"Loading books from {libraryName}...")
# Get books from library
books = self.absClient.get_library_items(libraryId)
if not books:
if self.speechEngine:
self.speechEngine.speak("No books found in library")
self.items = []
self.currentView = 'books'
return
# Store books without checking local status (too slow)
# Local check will happen on selection only
self.items = books
self.currentView = 'books'
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak(f"Loaded {len(books)} books. Use arrow keys to navigate, left-right to change view.")
self._speak_current_item()
def _check_if_local(self, book):
"""
Check if book exists locally
Args:
book: Book dictionary from server
Returns:
True if book exists locally, False otherwise
"""
# Get library directory
libraryDir = self.configManager.get_library_directory()
if not libraryDir:
return False
libraryPath = Path(libraryDir)
if not libraryPath.exists():
return False
# Extract book metadata
media = book.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', '')
author = metadata.get('authorName', '')
if not title:
return False
# Search for matching files in library
# Look for common audio formats
audioExtensions = ['.m4b', '.m4a', '.mp3', '.ogg']
for audioExt in audioExtensions:
# Try exact title match
for filePath in libraryPath.rglob(f"*{audioExt}"):
fileName = filePath.stem.lower()
if title.lower() in fileName:
# Found potential match - could enhance with duration check later
return True
return False
def navigate_menu(self, direction):
"""Navigate menu up or down"""
# Handle stream/download submenu separately
if self.currentView == 'stream_download':
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(self.streamDownloadOptions)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(self.streamDownloadOptions)
self._speak_current_item()
return
if not self.items:
return
oldSelection = self.currentSelection
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(self.items)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(self.items)
# Debug output
print(f"DEBUG NAV: {direction} - moved from {oldSelection} to {self.currentSelection} (total items: {len(self.items)})")
if self.currentSelection < len(self.items):
item = self.items[self.currentSelection]
media = item.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', 'Unknown')
print(f"DEBUG NAV: Current item title: {title}")
self._speak_current_item()
def change_view(self, direction):
"""Change view mode (only in books view)"""
if self.currentView != 'books':
return
views = ['all', 'series', 'collections']
currentIndex = views.index(self.booksViewMode)
if direction == 'left':
newIndex = (currentIndex - 1) % len(views)
elif direction == 'right':
newIndex = (currentIndex + 1) % len(views)
else:
return
oldViewMode = self.booksViewMode
self.booksViewMode = views[newIndex]
# Announce new view
if self.speechEngine:
viewName = {
'all': 'All books',
'series': 'Series view',
'collections': 'Collections view'
}[self.booksViewMode]
self.speechEngine.speak(viewName)
# Reload content for new view
if self.booksViewMode == 'all':
self._load_books()
elif self.booksViewMode == 'series':
self._load_series()
elif self.booksViewMode == 'collections':
self._load_collections()
def _speak_current_item(self):
"""Speak current item"""
if not self.speechEngine:
return
if self.currentView == 'stream_download':
# Speak submenu option
if self.currentSelection < len(self.streamDownloadOptions):
option = self.streamDownloadOptions[self.currentSelection]
text = f"{option['label']}, {self.currentSelection + 1} of {len(self.streamDownloadOptions)}"
self.speechEngine.speak(text)
return
if not self.items:
return
item = self.items[self.currentSelection]
if self.currentView == 'libraries':
# Speak library name
libraryName = item.get('name', 'Unknown library')
text = f"{libraryName}, {self.currentSelection + 1} of {len(self.items)}"
self.speechEngine.speak(text)
elif self.currentView == 'books':
# Different handling based on view mode
if self.booksViewMode == 'series':
# Speaking a series
name = item.get('name', 'Unknown series')
numBooks = item.get('books', [])
bookCount = len(numBooks) if isinstance(numBooks, list) else item.get('numBooks', 0)
text = f"{name}, {bookCount} books, {self.currentSelection + 1} of {len(self.items)}"
self.speechEngine.speak(text)
elif self.booksViewMode == 'collections':
# Speaking a collection
name = item.get('name', 'Unknown collection')
numBooks = item.get('books', [])
bookCount = len(numBooks) if isinstance(numBooks, list) else item.get('numBooks', 0)
text = f"{name}, {bookCount} books, {self.currentSelection + 1} of {len(self.items)}"
self.speechEngine.speak(text)
else:
# Speaking a book (all books view)
media = item.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', 'Unknown title')
author = metadata.get('authorName', '')
# Build description
parts = [title]
if author:
parts.append(f"by {author}")
parts.append(f"{self.currentSelection + 1} of {len(self.items)}")
text = ", ".join(parts)
self.speechEngine.speak(text)
def activate_current_item(self):
"""
Activate current item (select library or book)
Returns:
None if navigating, dictionary if book selected
"""
# Handle stream/download submenu
if self.currentView == 'stream_download':
if self.currentSelection < len(self.streamDownloadOptions):
option = self.streamDownloadOptions[self.currentSelection]
action = option['action']
if action == 'cancel':
# Go back to books list
self.currentView = 'books'
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak("Cancelled")
self._speak_current_item()
return None
elif action == 'stream':
# Return stream action
return {
'action': 'stream',
'serverBook': self.selectedBook
}
elif action == 'download':
# Return download action
return {
'action': 'download',
'serverBook': self.selectedBook
}
return None
if not self.items:
if self.speechEngine:
self.speechEngine.speak("No items")
return None
item = self.items[self.currentSelection]
if self.currentView == 'libraries':
# Library selected - load books from this library
self.selectedLibrary = item
self._load_books()
return None
elif self.currentView == 'books':
# Check if we're selecting a series or collection (not a book)
if self.booksViewMode == 'series':
# Series selected - show books in this series
seriesBooks = item.get('books', [])
if not seriesBooks:
if self.speechEngine:
self.speechEngine.speak("No books in this series")
return None
# Store books directly (local check too slow)
self.items = seriesBooks
self.booksViewMode = 'all' # Switch to "all books" view
self.currentSelection = 0
if self.speechEngine:
seriesName = item.get('name', 'Series')
self.speechEngine.speak(f"{seriesName}. {len(seriesBooks)} books")
self._speak_current_item()
return None
elif self.booksViewMode == 'collections':
# Collection selected - show books in this collection
collectionBooks = item.get('books', [])
if not collectionBooks:
if self.speechEngine:
self.speechEngine.speak("No books in this collection")
return None
# Store books directly (local check too slow)
self.items = collectionBooks
self.booksViewMode = 'all' # Switch to "all books" view
self.currentSelection = 0
if self.speechEngine:
collectionName = item.get('name', 'Collection')
self.speechEngine.speak(f"{collectionName}. {len(collectionBooks)} books")
self._speak_current_item()
return None
# Book selected
self.selectedBook = item
# Debug: what book did user select?
media = item.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', 'Unknown')
print(f"\nDEBUG SELECT: User pressed ENTER on item {self.currentSelection}")
print(f"DEBUG SELECT: Book title: {title}")
print(f"DEBUG SELECT: Book ID: {item.get('id', 'NO ID')}")
# For books from series/collections, fetch full details if needed
# (they might have incomplete metadata)
if not item.get('media'):
# Book doesn't have full media details, fetch them
bookId = item.get('id') or item.get('libraryItemId')
if bookId:
print(f"\nFetching full details for book ID: {bookId}")
fullDetails = self.absClient.get_library_item_details(bookId)
if fullDetails:
self.selectedBook = fullDetails
item = fullDetails
print("Full book details loaded")
# Re-extract metadata from full details
media = item.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', 'Unknown')
print(f"DEBUG SELECT: After fetch, title is: {title}")
# Check if local copy exists
isLocal = self._check_if_local(item)
if isLocal:
# Book exists locally - return it for direct opening
if self.speechEngine:
media = item.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', 'Book')
self.speechEngine.speak(f"Opening local copy of {title}")
# Find local file path
localPath = self._find_local_path(item)
if localPath:
return {
'action': 'open_local',
'path': localPath,
'serverBook': item
}
else:
if self.speechEngine:
self.speechEngine.speak("Error: Could not find local file")
return None
else:
# Book not local - enter stream/download submenu
self.currentView = 'stream_download'
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak("Choose playback option. Use arrow keys to navigate.")
self._speak_current_item()
return None
return None
def _find_local_path(self, book):
"""
Find local file path for a book
Args:
book: Book dictionary from server
Returns:
Path to local file, or None if not found
"""
libraryDir = self.configManager.get_library_directory()
if not libraryDir:
return None
libraryPath = Path(libraryDir)
if not libraryPath.exists():
return None
# Extract book metadata
media = book.get('media', {})
metadata = media.get('metadata', {})
title = metadata.get('title', '')
if not title:
return None
# Search for matching files
audioExtensions = ['.m4b', '.m4a', '.mp3', '.ogg']
for audioExt in audioExtensions:
for filePath in libraryPath.rglob(f"*{audioExt}"):
fileName = filePath.stem.lower()
if title.lower() in fileName:
return str(filePath)
return None
def go_back(self):
"""Go back to previous view"""
if self.currentView == 'stream_download':
# Go back to books list
self.currentView = 'books'
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak("Back to books")
self._speak_current_item()
elif self.currentView == 'books':
# Go back to libraries (if multiple)
libraries = self.absClient.get_libraries()
if libraries and len(libraries) > 1:
self.currentView = 'libraries'
self.items = libraries
self.currentSelection = 0
self.selectedLibrary = None
if self.speechEngine:
self.speechEngine.speak("Back to libraries")
self._speak_current_item()
else:
# Only one library - exit menu
self.exit_menu()
else:
# Already at top level - exit
self.exit_menu()
def is_in_menu(self):
"""Check if currently in menu"""
return self.inMenu
def _load_series(self):
"""Load series from selected library"""
if not self.selectedLibrary:
return
libraryId = self.selectedLibrary.get('id')
if not libraryId:
if self.speechEngine:
self.speechEngine.speak("Error: Invalid library")
return
if self.speechEngine:
self.speechEngine.speak("Loading series...")
# Get series from library
seriesList = self.absClient.get_library_series(libraryId)
if not seriesList:
if self.speechEngine:
self.speechEngine.speak("No series found")
self.items = []
self.currentSelection = 0
return
self.items = seriesList
self.currentView = 'books' # Keep in books view but showing series
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak(f"Loaded {len(seriesList)} series")
self._speak_current_item()
def _load_collections(self):
"""Load collections from selected library"""
if not self.selectedLibrary:
return
libraryId = self.selectedLibrary.get('id')
if not libraryId:
if self.speechEngine:
self.speechEngine.speak("Error: Invalid library")
return
if self.speechEngine:
self.speechEngine.speak("Loading collections...")
# Get collections from library
collectionsList = self.absClient.get_library_collections(libraryId)
if not collectionsList:
if self.speechEngine:
self.speechEngine.speak("No collections found")
self.items = []
self.currentSelection = 0
return
self.items = collectionsList
self.currentView = 'books' # Keep in books view but showing collections
self.currentSelection = 0
if self.speechEngine:
self.speechEngine.speak(f"Loaded {len(collectionsList)} collections")
self._speak_current_item()
def exit_menu(self):
"""Exit the menu"""
self.inMenu = False
self.currentView = 'libraries'
self.items = []
self.selectedLibrary = None
self.selectedBook = None
if self.speechEngine:
self.speechEngine.speak("Closed Audiobookshelf browser")

View File

@@ -50,6 +50,27 @@ class BookmarkManager:
)
''')
# Add audio_position column if it doesn't exist (migration for existing databases)
try:
cursor.execute('ALTER TABLE bookmarks ADD COLUMN audio_position REAL DEFAULT 0.0')
except sqlite3.OperationalError:
# Column already exists
pass
# Create named_bookmarks table for multiple bookmarks per book
cursor.execute('''
CREATE TABLE IF NOT EXISTS named_bookmarks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
book_id TEXT NOT NULL,
name TEXT NOT NULL,
chapter_index INTEGER NOT NULL DEFAULT 0,
paragraph_index INTEGER NOT NULL DEFAULT 0,
audio_position REAL DEFAULT 0.0,
created_at TEXT NOT NULL,
UNIQUE(book_id, name)
)
''')
conn.commit()
conn.close()
@@ -58,7 +79,7 @@ class BookmarkManager:
bookPath = str(Path(bookPath).resolve())
return hashlib.sha256(bookPath.encode()).hexdigest()[:16]
def save_bookmark(self, bookPath, bookTitle, chapterIndex, paragraphIndex, sentenceIndex=0):
def save_bookmark(self, bookPath, bookTitle, chapterIndex, paragraphIndex, sentenceIndex=0, audioPosition=0.0):
"""
Save bookmark for a book
@@ -68,6 +89,7 @@ class BookmarkManager:
chapterIndex: Current chapter index
paragraphIndex: Current paragraph index
sentenceIndex: Current sentence index (default: 0)
audioPosition: Audio playback position in seconds (default: 0.0)
"""
bookId = self._get_book_id(bookPath)
timestamp = datetime.now().isoformat()
@@ -78,11 +100,11 @@ class BookmarkManager:
cursor.execute('''
INSERT OR REPLACE INTO bookmarks
(book_id, book_path, book_title, chapter_index, paragraph_index,
sentence_index, last_accessed, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?,
sentence_index, audio_position, last_accessed, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?,
COALESCE((SELECT created_at FROM bookmarks WHERE book_id = ?), ?))
''', (bookId, str(bookPath), bookTitle, chapterIndex, paragraphIndex,
sentenceIndex, timestamp, bookId, timestamp))
sentenceIndex, audioPosition, timestamp, bookId, timestamp))
conn.commit()
conn.close()
@@ -104,7 +126,7 @@ class BookmarkManager:
cursor.execute('''
SELECT chapter_index, paragraph_index, sentence_index,
book_title, last_accessed
book_title, last_accessed, audio_position
FROM bookmarks
WHERE book_id = ?
''', (bookId,))
@@ -118,7 +140,8 @@ class BookmarkManager:
'paragraphIndex': row[1],
'sentenceIndex': row[2],
'bookTitle': row[3],
'lastAccessed': row[4]
'lastAccessed': row[4],
'audioPosition': row[5] if row[5] is not None else 0.0
}
return None
@@ -172,3 +195,125 @@ class BookmarkManager:
})
return bookmarks
def create_named_bookmark(self, bookPath, name, chapterIndex, paragraphIndex, audioPosition=0.0):
"""
Create a named bookmark for a book
Args:
bookPath: Path to book file
name: Bookmark name
chapterIndex: Chapter index
paragraphIndex: Paragraph index
audioPosition: Audio position in seconds (default: 0.0)
Returns:
True if created successfully, False if name already exists
"""
bookId = self._get_book_id(bookPath)
timestamp = datetime.now().isoformat()
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO named_bookmarks
(book_id, name, chapter_index, paragraph_index, audio_position, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (bookId, name, chapterIndex, paragraphIndex, audioPosition, timestamp))
conn.commit()
conn.close()
return True
except sqlite3.IntegrityError:
# Bookmark with this name already exists
conn.close()
return False
def get_named_bookmarks(self, bookPath):
"""
Get all named bookmarks for a book
Args:
bookPath: Path to book file
Returns:
List of named bookmark dictionaries
"""
bookId = self._get_book_id(bookPath)
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
SELECT id, name, chapter_index, paragraph_index, audio_position, created_at
FROM named_bookmarks
WHERE book_id = ?
ORDER BY created_at DESC
''', (bookId,))
rows = cursor.fetchall()
conn.close()
bookmarks = []
for row in rows:
bookmarks.append({
'id': row[0],
'name': row[1],
'chapterIndex': row[2],
'paragraphIndex': row[3],
'audioPosition': row[4],
'createdAt': row[5]
})
return bookmarks
def delete_named_bookmark(self, bookmarkId):
"""
Delete a named bookmark by ID
Args:
bookmarkId: Bookmark ID
"""
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('DELETE FROM named_bookmarks WHERE id = ?', (bookmarkId,))
conn.commit()
conn.close()
def get_named_bookmark_by_id(self, bookmarkId):
"""
Get a named bookmark by ID
Args:
bookmarkId: Bookmark ID
Returns:
Bookmark dictionary or None if not found
"""
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
SELECT name, chapter_index, paragraph_index, audio_position
FROM named_bookmarks
WHERE id = ?
''', (bookmarkId,))
row = cursor.fetchone()
conn.close()
if row:
return {
'name': row[0],
'chapterIndex': row[1],
'paragraphIndex': row[2],
'audioPosition': row[3]
}
return None

196
src/bookmarks_menu.py Normal file
View File

@@ -0,0 +1,196 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Bookmarks Menu
Interactive menu for managing named bookmarks.
Allows creating, browsing, and jumping to bookmarks.
"""
class BookmarksMenu:
"""Menu for named bookmarks"""
def __init__(self, bookmarkManager, speechEngine=None):
"""
Initialize bookmarks menu
Args:
bookmarkManager: BookmarkManager instance
speechEngine: SpeechEngine instance for accessibility
"""
self.bookmarkManager = bookmarkManager
self.speechEngine = speechEngine
# Menu state
self.inMenu = False
self.currentView = 'list' # 'list' or 'create'
self.currentSelection = 0
self.bookmarks = []
# Current book context
self.currentBookPath = None
# Menu options for list view
self.listOptions = [] # Will be populated with bookmarks + "Create new"
def enter_menu(self, bookPath):
"""
Enter the bookmarks menu for a specific book
Args:
bookPath: Path to current book
"""
self.inMenu = True
self.currentBookPath = bookPath
self.currentView = 'list'
self.currentSelection = 0
# Load bookmarks for this book
self._load_bookmarks()
if self.speechEngine:
if len(self.bookmarks) > 0:
self.speechEngine.speak(f"Bookmarks. {len(self.bookmarks)} bookmarks found. Use arrow keys to navigate.")
else:
self.speechEngine.speak("Bookmarks. No bookmarks yet. Press Enter to create one.")
self._speak_current_item()
def _load_bookmarks(self):
"""Load bookmarks for current book"""
if not self.currentBookPath:
self.bookmarks = []
self.listOptions = []
return
self.bookmarks = self.bookmarkManager.get_named_bookmarks(self.currentBookPath)
# Build list options: bookmarks + "Create new bookmark"
self.listOptions = []
for bookmark in self.bookmarks:
self.listOptions.append({
'type': 'bookmark',
'data': bookmark
})
# Add "Create new" option
self.listOptions.append({
'type': 'create',
'data': None
})
def navigate_menu(self, direction):
"""Navigate menu up or down"""
if not self.listOptions:
return
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(self.listOptions)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(self.listOptions)
self._speak_current_item()
def _speak_current_item(self):
"""Speak current item"""
if not self.speechEngine or not self.listOptions:
return
if self.currentSelection >= len(self.listOptions):
return
option = self.listOptions[self.currentSelection]
if option['type'] == 'bookmark':
bookmark = option['data']
name = bookmark['name']
text = f"{name}, bookmark {self.currentSelection + 1} of {len(self.listOptions)}"
self.speechEngine.speak(text)
elif option['type'] == 'create':
text = f"Create new bookmark, {self.currentSelection + 1} of {len(self.listOptions)}"
self.speechEngine.speak(text)
def activate_current_item(self):
"""
Activate current item
Returns:
Dictionary with action and bookmark data, or None
"""
if not self.listOptions:
return None
if self.currentSelection >= len(self.listOptions):
return None
option = self.listOptions[self.currentSelection]
if option['type'] == 'bookmark':
# Jump to bookmark
bookmark = option['data']
return {
'action': 'jump',
'bookmark': bookmark
}
elif option['type'] == 'create':
# Create new bookmark
return {
'action': 'create'
}
return None
def delete_current_bookmark(self):
"""
Delete currently selected bookmark
Returns:
True if deleted, False otherwise
"""
if not self.listOptions:
return False
if self.currentSelection >= len(self.listOptions):
return False
option = self.listOptions[self.currentSelection]
if option['type'] == 'bookmark':
bookmark = option['data']
bookmarkId = bookmark['id']
bookmarkName = bookmark['name']
# Delete from database
self.bookmarkManager.delete_named_bookmark(bookmarkId)
if self.speechEngine:
self.speechEngine.speak(f"Deleted bookmark: {bookmarkName}")
# Reload bookmarks
self._load_bookmarks()
# Adjust selection if needed
if self.currentSelection >= len(self.listOptions):
self.currentSelection = max(0, len(self.listOptions) - 1)
self._speak_current_item()
return True
return False
def is_in_menu(self):
"""Check if currently in menu"""
return self.inMenu
def exit_menu(self):
"""Exit the menu"""
self.inMenu = False
self.currentView = 'list'
self.currentSelection = 0
self.bookmarks = []
self.listOptions = []
if self.speechEngine:
self.speechEngine.speak("Closed bookmarks")

View File

@@ -62,6 +62,16 @@ class ConfigManager:
'library_directory': ''
}
self.config['Audiobookshelf'] = {
'server_url': '',
'username': '',
'auth_token': '',
'auto_sync': 'true',
'sync_interval': '30',
'prefer_local': 'true',
'stream_cache_limit': '500'
}
self.save()
def get(self, section, key, fallback=None):
@@ -219,3 +229,80 @@ class ConfigManager:
"""Set library directory"""
self.set('Paths', 'library_directory', str(libraryDir))
self.save()
# Audiobookshelf settings
def get_abs_server_url(self):
"""Get Audiobookshelf server URL"""
return self.get('Audiobookshelf', 'server_url', '')
def set_abs_server_url(self, serverUrl):
"""Set Audiobookshelf server URL"""
self.set('Audiobookshelf', 'server_url', str(serverUrl))
self.save()
def get_abs_username(self):
"""Get Audiobookshelf username"""
return self.get('Audiobookshelf', 'username', '')
def set_abs_username(self, username):
"""Set Audiobookshelf username"""
self.set('Audiobookshelf', 'username', str(username))
self.save()
def get_abs_auth_token(self):
"""Get Audiobookshelf authentication token"""
return self.get('Audiobookshelf', 'auth_token', '')
def set_abs_auth_token(self, token):
"""Set Audiobookshelf authentication token"""
self.set('Audiobookshelf', 'auth_token', str(token))
self.save()
def get_abs_auto_sync(self):
"""Get Audiobookshelf auto-sync setting"""
return self.get_bool('Audiobookshelf', 'auto_sync', True)
def set_abs_auto_sync(self, enabled):
"""Set Audiobookshelf auto-sync setting"""
self.set('Audiobookshelf', 'auto_sync', str(enabled).lower())
self.save()
def get_abs_sync_interval(self):
"""Get Audiobookshelf sync interval in seconds"""
try:
return int(self.get('Audiobookshelf', 'sync_interval', '30'))
except ValueError:
return 30
def set_abs_sync_interval(self, seconds):
"""Set Audiobookshelf sync interval in seconds"""
self.set('Audiobookshelf', 'sync_interval', str(seconds))
self.save()
def get_abs_prefer_local(self):
"""Get Audiobookshelf prefer local books setting"""
return self.get_bool('Audiobookshelf', 'prefer_local', True)
def set_abs_prefer_local(self, enabled):
"""Set Audiobookshelf prefer local books setting"""
self.set('Audiobookshelf', 'prefer_local', str(enabled).lower())
self.save()
def get_abs_stream_cache_limit(self):
"""Get Audiobookshelf stream cache limit in MB"""
try:
return int(self.get('Audiobookshelf', 'stream_cache_limit', '500'))
except ValueError:
return 500
def set_abs_stream_cache_limit(self, limitMb):
"""Set Audiobookshelf stream cache limit in MB"""
self.set('Audiobookshelf', 'stream_cache_limit', str(limitMb))
self.save()
def is_abs_configured(self):
"""Check if Audiobookshelf server is configured"""
serverUrl = self.get_abs_server_url()
username = self.get_abs_username()
return bool(serverUrl and username)

View File

@@ -65,6 +65,10 @@ class OptionsMenu:
showText = self.config.get_show_text()
showTextLabel = "Show Text Display: On" if showText else "Show Text Display: Off"
# Add Audiobookshelf setup status
absConfigured = self.config.is_abs_configured()
absLabel = "Audiobookshelf: Configured" if absConfigured else "Audiobookshelf: Not Configured"
menuItems.extend([
{
'label': showTextLabel,
@@ -74,6 +78,10 @@ class OptionsMenu:
'label': "Speech Rate Settings",
'action': 'speech_rate'
},
{
'label': absLabel,
'action': 'audiobookshelf_setup'
},
{
'label': "Back",
'action': 'back'
@@ -124,6 +132,8 @@ class OptionsMenu:
return self._toggle_show_text()
elif action == 'speech_rate':
return self._speech_rate_info()
elif action == 'audiobookshelf_setup':
return self._audiobookshelf_setup()
elif action == 'back':
self.speechEngine.speak("Closing options menu")
return False
@@ -483,6 +493,96 @@ class OptionsMenu:
if menuItems and self.currentSelection < len(menuItems):
self.speechEngine.speak(menuItems[self.currentSelection]['label'])
def _audiobookshelf_setup(self):
"""Setup Audiobookshelf server connection"""
from src.ui import get_input
self.speechEngine.speak("Audiobookshelf setup starting.")
# Show current settings if configured
currentUrl = ""
currentUser = ""
if self.config.is_abs_configured():
currentUrl = self.config.get_abs_server_url()
currentUser = self.config.get_abs_username()
self.speechEngine.speak(f"Current server: {currentUrl}. Current username: {currentUser}. Leave fields blank to keep current values.")
# Get server URL
serverUrlPrompt = "Enter Audiobookshelf server URL. Example: https colon slash slash abs dot example dot com"
serverUrl = get_input(self.speechEngine, serverUrlPrompt, currentUrl)
if serverUrl is None:
self.speechEngine.speak("Setup cancelled.")
return True
serverUrl = serverUrl.strip()
if not serverUrl and self.config.is_abs_configured():
serverUrl = currentUrl
self.speechEngine.speak(f"Using current URL: {serverUrl}")
elif not serverUrl:
self.speechEngine.speak("Server URL required. Setup cancelled.")
return True
# Validate URL format
if not serverUrl.startswith(('http://', 'https://')):
self.speechEngine.speak("Invalid URL. Must start with http or https. Setup cancelled.")
return True
# Get username
usernamePrompt = "Enter Audiobookshelf username"
username = get_input(self.speechEngine, usernamePrompt, currentUser)
if username is None:
self.speechEngine.speak("Setup cancelled.")
return True
username = username.strip()
if not username and self.config.is_abs_configured():
username = currentUser
self.speechEngine.speak(f"Using current username: {username}")
elif not username:
self.speechEngine.speak("Username required. Setup cancelled.")
return True
# Get password
passwordPrompt = "Enter password for testing connection. Note: Password is NOT saved, only the authentication token."
password = get_input(self.speechEngine, passwordPrompt, "")
if password is None:
self.speechEngine.speak("Setup cancelled.")
return True
if not password:
self.speechEngine.speak("Password required. Setup cancelled.")
return True
# Test connection
self.speechEngine.speak("Testing connection. Please wait.")
# Import here to avoid circular dependency
from src.audiobookshelf_client import AudiobookshelfClient
# Create temporary client to test
testClient = AudiobookshelfClient(serverUrl, None)
if not testClient.login(username, password):
self.speechEngine.speak("Login failed. Check server URL, username, and password. Make sure the server is reachable and credentials are correct.")
return True
# Login successful - get token
authToken = testClient.authToken
# Save settings
self.config.set_abs_server_url(serverUrl)
self.config.set_abs_username(username)
self.config.set_abs_auth_token(authToken)
self.speechEngine.speak("Setup successful. Audiobookshelf configured. You can now press a to browse your Audiobookshelf library.")
return True
def exit_menu(self):
"""Exit the menu"""
self.inMenu = False

View File

@@ -19,6 +19,9 @@ class PygamePlayer:
self.isInitialized = False
self.isPaused = False
self.currentSound = None # Track current sound for cleanup
self.audioFileLoaded = False # Track if audio file is loaded
self.audioFilePath = None # Current audio file path
self.tempAudioFile = None # Temporary transcoded audio file
try:
# Initialize pygame mixer only (not full pygame)
@@ -106,10 +109,20 @@ class PygamePlayer:
def cleanup(self):
"""Cleanup resources"""
# Clean up audio file playback state
if self.audioFileLoaded:
# Note: We don't delete cached files - they're kept for future use
self.tempAudioFile = None
self.audioFileLoaded = False
self.audioFilePath = None
if self.isInitialized:
# Stop and cleanup current sound
if self.currentSound:
try:
self.currentSound.stop()
except Exception:
pass # Mixer may be shutting down
del self.currentSound
self.currentSound = None
pygame.mixer.quit()
@@ -118,3 +131,389 @@ class PygamePlayer:
def is_available(self):
"""Check if pygame mixer is available"""
return self.isInitialized
# Audio file playback methods (for audiobooks)
def load_audio_file(self, audioPath, authToken=None):
"""
Load an audio file for streaming playback
Args:
audioPath: Path to audio file or URL
authToken: Optional Bearer token for authenticated URLs
Returns:
True if loaded successfully
"""
if not self.isInitialized:
return False
from pathlib import Path
audioPath = str(audioPath) # Ensure it's a string
# Check if this is a URL (for streaming from Audiobookshelf)
isUrl = audioPath.startswith('http://') or audioPath.startswith('https://')
if isUrl:
# Use ffmpeg for streaming from URLs
print(f"DEBUG: Loading URL for streaming")
return self._load_url_with_ffmpeg(audioPath, authToken=authToken)
# Local file - use existing logic
fileSuffix = Path(audioPath).suffix.lower()
try:
# Stop any current playback and clean up temp files
self.stop_audio_file()
# Try to load audio file directly using pygame.mixer.music
pygame.mixer.music.load(audioPath)
self.audioFileLoaded = True
self.audioFilePath = audioPath
return True
except Exception as e:
print(f"Direct load failed: {e}")
# Try transcoding with ffmpeg if direct load failed
if "ModPlug_Load failed" in str(e) or "Unrecognized" in str(e):
print(f"Attempting to transcode {fileSuffix} with ffmpeg...")
return self._load_with_ffmpeg_transcode(audioPath)
# Unknown error
print(f"Error loading audio file: {e}")
self.audioFileLoaded = False
return False
def _load_url_with_ffmpeg(self, streamUrl, authToken=None):
"""
Stream from URL using ffmpeg to transcode to cache
Args:
streamUrl: URL to stream from (e.g., Audiobookshelf URL)
authToken: Optional Bearer token for authentication
Returns:
True if successful
"""
import subprocess
import shutil
import hashlib
from pathlib import Path
# Check if ffmpeg is available
if not shutil.which('ffmpeg'):
print("\nffmpeg not found. Falling back to direct download...")
print("Install ffmpeg for better streaming: sudo pacman -S ffmpeg")
return False
# Set up cache directory
cacheDir = Path.home() / '.cache' / 'bookstorm' / 'audiobookshelf'
cacheDir.mkdir(parents=True, exist_ok=True)
# Generate cache filename from hash of URL (without token for consistency)
# Extract base URL without token parameter
baseUrl = streamUrl.split('?')[0] if '?' in streamUrl else streamUrl
urlHash = hashlib.sha256(baseUrl.encode()).hexdigest()[:16]
cachedPath = cacheDir / f"{urlHash}.ogg"
# Check if cached version exists
if cachedPath.exists():
print(f"\nUsing cached stream")
try:
pygame.mixer.music.load(str(cachedPath))
self.audioFileLoaded = True
self.audioFilePath = streamUrl
self.tempAudioFile = str(cachedPath)
print("Cached file loaded! Starting playback...")
return True
except Exception as e:
print(f"Cached file corrupted, re-downloading: {e}")
cachedPath.unlink(missing_ok=True)
# No cache available, stream and transcode
try:
print(f"\nStreaming from server...")
print("Transcoding to cache. This will take a moment.")
print(f"(Cached for future use in {cacheDir})\n")
# Build ffmpeg command with authentication headers if token provided
ffmpegCmd = ['ffmpeg']
# Add authentication header for Audiobookshelf
if authToken:
# ffmpeg needs headers in the format "Name: Value\r\n"
authHeader = f"Authorization: Bearer {authToken}"
ffmpegCmd.extend(['-headers', authHeader])
print(f"DEBUG: Using Bearer token authentication")
ffmpegCmd.extend([
'-i', streamUrl,
'-vn', # No video
'-c:a', 'libvorbis',
'-q:a', '4', # Medium quality
'-threads', '0', # Use all CPU cores
'-y',
str(cachedPath)
])
print(f"DEBUG: ffmpeg command: {' '.join(ffmpegCmd[:6])}...")
# Run ffmpeg with progress output
result = subprocess.run(
ffmpegCmd,
capture_output=False, # Show progress to user
text=True,
timeout=1800 # 30 minute timeout for large audiobooks
)
if result.returncode != 0:
print(f"\nStreaming/transcoding failed (exit code {result.returncode})")
cachedPath.unlink(missing_ok=True)
return False
# Try to load the transcoded file
try:
pygame.mixer.music.load(str(cachedPath))
self.audioFileLoaded = True
self.audioFilePath = streamUrl # Keep URL for reference
self.tempAudioFile = str(cachedPath)
print("\nStream cached successfully!")
print("Starting playback...")
return True
except Exception as e:
print(f"Error loading transcoded stream: {e}")
cachedPath.unlink(missing_ok=True)
return False
except subprocess.TimeoutExpired:
print("\nStreaming timed out (file too large or connection too slow)")
cachedPath.unlink(missing_ok=True)
return False
except KeyboardInterrupt:
print("\nStreaming cancelled by user")
cachedPath.unlink(missing_ok=True)
return False
except Exception as e:
print(f"Error during streaming: {e}")
cachedPath.unlink(missing_ok=True)
return False
def _load_with_ffmpeg_transcode(self, audioPath, fastMode=False):
"""
Transcode audio file using ffmpeg and load the result
Args:
audioPath: Path to original audio file
fastMode: If True, use faster/lower quality settings
Returns:
True if successful
"""
import subprocess
import shutil
import hashlib
from pathlib import Path
# Check if ffmpeg is available
if not shutil.which('ffmpeg'):
print("\nffmpeg not found. Please install ffmpeg or convert the file manually:")
print(f" ffmpeg -i '{audioPath}' -c:a libmp3lame -q:a 2 output.mp3")
return False
# Set up persistent cache directory
cacheDir = Path.home() / '.cache' / 'bookstorm' / 'audio'
cacheDir.mkdir(parents=True, exist_ok=True)
# Generate cache filename from hash of original file path
pathHash = hashlib.sha256(str(Path(audioPath).resolve()).encode()).hexdigest()[:16]
cachedPath = cacheDir / f"{pathHash}.ogg"
# Check if cached version exists
if cachedPath.exists():
print(f"\nUsing cached transcoded file for {Path(audioPath).name}")
try:
pygame.mixer.music.load(str(cachedPath))
self.audioFileLoaded = True
self.audioFilePath = audioPath
self.tempAudioFile = str(cachedPath)
print("Cached file loaded! Starting playback...")
return True
except Exception as e:
print(f"Cached file corrupted, re-transcoding: {e}")
cachedPath.unlink(missing_ok=True)
# No cache available, transcode the file
try:
print(f"\nTranscoding {Path(audioPath).name}...")
print("This will take a moment. Press Ctrl+C to cancel.")
print(f"(Cached for future use in {cacheDir})\n")
# Build ffmpeg command
if fastMode:
# Fast mode: lower quality, faster encoding
ffmpegCmd = [
'ffmpeg',
'-i', audioPath,
'-vn', # No video
'-c:a', 'libvorbis',
'-q:a', '1', # Lower quality (0-10, lower is better)
'-threads', '0', # Use all CPU cores
'-y',
str(cachedPath)
]
else:
# Normal mode: balanced quality
ffmpegCmd = [
'ffmpeg',
'-i', audioPath,
'-vn',
'-c:a', 'libvorbis',
'-q:a', '4', # Medium quality
'-threads', '0',
'-y',
str(cachedPath)
]
# Run ffmpeg with progress output
result = subprocess.run(
ffmpegCmd,
capture_output=False, # Show progress to user
text=True,
timeout=600 # 10 minute timeout
)
if result.returncode != 0:
print(f"\nTranscoding failed (exit code {result.returncode})")
cachedPath.unlink(missing_ok=True)
return False
# Try to load the transcoded file
try:
pygame.mixer.music.load(str(cachedPath))
self.audioFileLoaded = True
self.audioFilePath = audioPath # Keep original path for reference
self.tempAudioFile = str(cachedPath)
print("\nTranscoding complete! Cached for future use.")
print("Starting playback...")
return True
except Exception as e:
print(f"Error loading transcoded file: {e}")
cachedPath.unlink(missing_ok=True)
return False
except subprocess.TimeoutExpired:
print("\nTranscoding timed out (file too large or system too slow)")
cachedPath.unlink(missing_ok=True)
return False
except KeyboardInterrupt:
print("\nTranscoding cancelled by user")
cachedPath.unlink(missing_ok=True)
return False
except Exception as e:
print(f"Error during transcoding: {e}")
cachedPath.unlink(missing_ok=True)
return False
def play_audio_file(self, startPosition=0.0):
"""
Play loaded audio file from a specific position
Args:
startPosition: Start time in seconds
Returns:
True if playback started successfully
"""
if not self.isInitialized or not self.audioFileLoaded:
return False
try:
# Start playback
pygame.mixer.music.play(start=startPosition)
self.isPaused = False
return True
except Exception as e:
print(f"Error playing audio file: {e}")
return False
def pause_audio_file(self):
"""Pause audio file playback"""
if self.isInitialized and self.audioFileLoaded:
pygame.mixer.music.pause()
self.isPaused = True
def resume_audio_file(self):
"""Resume audio file playback"""
if self.isInitialized and self.audioFileLoaded:
pygame.mixer.music.unpause()
self.isPaused = False
def stop_audio_file(self):
"""Stop audio file playback"""
# Only stop if mixer is initialized
if self.isInitialized and self.audioFileLoaded:
try:
pygame.mixer.music.stop()
except Exception:
pass # Mixer may already be shut down
self.isPaused = False
# Note: We don't delete tempAudioFile anymore since it's a persistent cache
# The cache files are kept in ~/.cache/bookstorm/audio/ for future use
def is_audio_file_playing(self):
"""Check if audio file is currently playing"""
if not self.isInitialized or not self.audioFileLoaded:
return False
return pygame.mixer.music.get_busy()
def get_audio_position(self):
"""
Get current playback position in milliseconds
Returns:
Position in milliseconds, or 0.0 if not playing
"""
if not self.isInitialized or not self.audioFileLoaded:
return 0.0
# pygame.mixer.music.get_pos() returns time in milliseconds
return pygame.mixer.music.get_pos() / 1000.0
def seek_audio(self, position):
"""
Seek to a specific position in the audio file
Args:
position: Position in seconds
Note:
pygame.mixer.music doesn't support direct seeking.
We need to stop and restart from the position.
"""
if not self.isInitialized or not self.audioFileLoaded:
return False
try:
# Stop current playback
pygame.mixer.music.stop()
# Restart from new position
pygame.mixer.music.play(start=position)
self.isPaused = False
return True
except Exception as e:
print(f"Error seeking audio: {e}")
return False
def unload_audio_file(self):
"""Unload the current audio file"""
if self.audioFileLoaded:
self.stop_audio_file() # This also cleans up temp files
self.audioFileLoaded = False
self.audioFilePath = None

106
src/recent_books_menu.py Normal file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Recent Books Menu
Displays the 10 most recently accessed books for quick access.
"""
from pathlib import Path
class RecentBooksMenu:
"""Recent books selection interface"""
def __init__(self, bookmarkManager, speechEngine=None):
"""
Initialize recent books menu
Args:
bookmarkManager: BookmarkManager instance for fetching recent books
speechEngine: SpeechEngine instance for accessibility
"""
self.bookmarkManager = bookmarkManager
self.speechEngine = speechEngine
self.currentSelection = 0
self.inMenu = False
self.items = []
def enter_menu(self):
"""Enter the recent books menu"""
self.inMenu = True
self.currentSelection = 0
# Get recent books from bookmarks
allBookmarks = self.bookmarkManager.list_bookmarks()
# Limit to 10 most recent, filter out non-existent files
self.items = []
for bookmark in allBookmarks[:10]:
bookPath = Path(bookmark['bookPath']).resolve()
if bookPath.exists():
self.items.append({
'path': bookPath,
'title': bookmark['bookTitle'] or bookPath.name,
'lastAccessed': bookmark['lastAccessed']
})
if self.speechEngine:
self.speechEngine.speak("Recent books. Use arrow keys to navigate, Enter to select, Escape to cancel.")
if self.items:
self._speak_current_item()
elif self.speechEngine:
self.speechEngine.speak("No recent books found")
def navigate_menu(self, direction):
"""Navigate menu up or down"""
if not self.items:
return
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(self.items)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(self.items)
self._speak_current_item()
def _speak_current_item(self):
"""Speak current item"""
if not self.items or not self.speechEngine:
return
item = self.items[self.currentSelection]
# Speak title and position
text = f"{item['title']}, {self.currentSelection + 1} of {len(self.items)}"
# speak() has interrupt=True by default, which stops any current speech
self.speechEngine.speak(text)
def activate_current_item(self):
"""
Activate current item (return book path)
Returns:
Book path if selected, None if no items
"""
if not self.items:
if self.speechEngine:
self.speechEngine.speak("No items")
return None
item = self.items[self.currentSelection]
if self.speechEngine:
self.speechEngine.speak(f"Loading: {item['title']}")
return str(item['path'])
def is_in_menu(self):
"""Check if currently in menu"""
return self.inMenu
def exit_menu(self):
"""Exit the menu"""
self.inMenu = False
if self.speechEngine:
self.speechEngine.speak("Cancelled")

131
src/server_link_manager.py Normal file
View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Server Link Manager
Manages sidecar files that link local books to Audiobookshelf server books.
Enables progress sync and prevents duplicate downloads.
"""
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict
class ServerLinkManager:
"""Manages server link metadata for local books"""
def __init__(self):
"""Initialize server link manager"""
# Create sidecar directory
homePath = Path.home()
self.sidecarDir = homePath / ".bookstorm" / "server_links"
self.sidecarDir.mkdir(parents=True, exist_ok=True)
def _get_book_hash(self, bookPath: str) -> str:
"""Generate hash from book path for sidecar filename"""
pathStr = str(Path(bookPath).resolve())
return hashlib.sha256(pathStr.encode()).hexdigest()[:16]
def create_link(self, bookPath: str, serverUrl: str, serverId: str, libraryId: str,
title: str = "", author: str = "", duration: float = 0.0,
chapters: int = 0, manualOverride: bool = False):
"""
Create server link for a local book
Args:
bookPath: Path to local book file
serverUrl: Audiobookshelf server URL
serverId: Server's library item ID
libraryId: Server's library ID
title: Book title
author: Author name
duration: Audio duration in seconds
chapters: Number of chapters
manualOverride: True if user manually linked despite mismatch
"""
bookHash = self._get_book_hash(bookPath)
sidecarPath = self.sidecarDir / f"{bookHash}.json"
linkData = {
'server_url': serverUrl,
'server_id': serverId,
'library_id': libraryId,
'local_path': str(Path(bookPath).resolve()),
'linked_at': datetime.now().isoformat(),
'validation': {
'duration': duration,
'chapters': chapters,
'title': title,
'author': author
},
'manual_override': manualOverride
}
with open(sidecarPath, 'w') as f:
json.dump(linkData, f, indent=2)
print(f"Created server link: {sidecarPath}")
def get_link(self, bookPath: str) -> Optional[Dict]:
"""
Get server link for a local book
Args:
bookPath: Path to local book file
Returns:
Link data dictionary, or None if no link exists
"""
bookHash = self._get_book_hash(bookPath)
sidecarPath = self.sidecarDir / f"{bookHash}.json"
if not sidecarPath.exists():
return None
try:
with open(sidecarPath, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Error reading server link: {e}")
return None
def has_link(self, bookPath: str) -> bool:
"""Check if book has server link"""
return self.get_link(bookPath) is not None
def find_by_server_id(self, serverId: str) -> Optional[str]:
"""
Find local book path by server ID
Args:
serverId: Server's library item ID
Returns:
Local book path if found, None otherwise
"""
# Search all sidecar files
for sidecarPath in self.sidecarDir.glob("*.json"):
try:
with open(sidecarPath, 'r') as f:
linkData = json.load(f)
if linkData.get('server_id') == serverId:
localPath = linkData.get('local_path')
# Verify file still exists
if localPath and Path(localPath).exists():
return localPath
except (json.JSONDecodeError, IOError):
continue
return None
def delete_link(self, bookPath: str):
"""Delete server link for a book"""
bookHash = self._get_book_hash(bookPath)
sidecarPath = self.sidecarDir / f"{bookHash}.json"
if sidecarPath.exists():
sidecarPath.unlink()
print(f"Deleted server link: {sidecarPath}")

View File

@@ -43,6 +43,16 @@ class SpeechEngine:
else:
print("Warning: python3-speechd not installed. UI will not be accessible.")
def close(self):
"""Close speech-dispatcher connection"""
if self.client:
try:
self.client.close()
except:
pass
self.client = None
self.isAvailable = False
def speak(self, text, interrupt=True):
"""
Speak text using speech-dispatcher

205
src/ui.py Normal file
View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
UI Components for BookStorm
Accessible UI input and display components adapted from libstormgames.
"""
import pygame
import time
def get_input(speechEngine, prompt="Enter text:", text=""):
"""Display an accessible text input dialog using pygame.
Features:
- Speaks each character as typed
- Left/Right arrows navigate and speak characters
- Up/Down arrows read full text content
- Backspace announces deletions
- Enter submits, Escape cancels
- Control key repeats the original prompt message
- Fully accessible without screen reader dependency
Args:
speechEngine: SpeechEngine instance for speaking
prompt (str): Prompt text to display (default: "Enter text:")
text (str): Initial text in input box (default: "")
Returns:
str: User input text, or None if cancelled
"""
# Initialize text buffer and cursor
textBuffer = list(text) # Use list for easier character manipulation
cursorPos = len(textBuffer) # Start at end of initial text
# Announce the prompt and initial text as a single message
if text:
initialMessage = f"{prompt} Default text: {text}"
else:
initialMessage = f"{prompt} Empty text field"
speechEngine.speak(initialMessage)
# Clear any pending events
pygame.event.clear()
# Main input loop
while True:
event = pygame.event.wait()
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_RETURN:
# Submit the input
result = ''.join(textBuffer)
speechEngine.speak(f"Submitted: {result if result else 'empty'}")
return result
elif event.key == pygame.K_ESCAPE:
# Cancel input
speechEngine.speak("Cancelled")
return None
elif event.key == pygame.K_BACKSPACE:
# Delete character before cursor
if cursorPos > 0:
deletedChar = textBuffer.pop(cursorPos - 1)
cursorPos -= 1
speechEngine.speak(f"{deletedChar} deleted")
else:
speechEngine.speak("Nothing to delete")
elif event.key == pygame.K_DELETE:
# Delete character at cursor
if cursorPos < len(textBuffer):
deletedChar = textBuffer.pop(cursorPos)
speechEngine.speak(f"{deletedChar} deleted")
else:
speechEngine.speak("Nothing to delete")
elif event.key == pygame.K_LEFT:
# Move cursor left and speak character
if cursorPos > 0:
cursorPos -= 1
if cursorPos == 0:
speechEngine.speak("Beginning of text")
else:
speechEngine.speak(textBuffer[cursorPos])
else:
speechEngine.speak("Beginning of text")
elif event.key == pygame.K_RIGHT:
# Move cursor right and speak character
if cursorPos < len(textBuffer):
speechEngine.speak(textBuffer[cursorPos])
cursorPos += 1
if cursorPos == len(textBuffer):
speechEngine.speak("End of text")
else:
speechEngine.speak("End of text")
elif event.key == pygame.K_UP or event.key == pygame.K_DOWN:
# Read entire text content
if textBuffer:
speechEngine.speak(''.join(textBuffer))
else:
speechEngine.speak("Empty text field")
elif event.key == pygame.K_HOME:
# Move to beginning
cursorPos = 0
speechEngine.speak("Beginning of text")
elif event.key == pygame.K_END:
# Move to end
cursorPos = len(textBuffer)
speechEngine.speak("End of text")
elif event.key == pygame.K_LCTRL or event.key == pygame.K_RCTRL:
# Repeat the original prompt message
speechEngine.speak(initialMessage)
else:
# Handle regular character input
if event.unicode and event.unicode.isprintable():
char = event.unicode
# Insert character at cursor position
textBuffer.insert(cursorPos, char)
cursorPos += 1
# Speak the character name
if char == ' ':
speechEngine.speak("space")
elif char == '\\':
speechEngine.speak("backslash")
elif char == '/':
speechEngine.speak("slash")
elif char == '!':
speechEngine.speak("exclamation mark")
elif char == '"':
speechEngine.speak("quotation mark")
elif char == '#':
speechEngine.speak("hash")
elif char == '$':
speechEngine.speak("dollar sign")
elif char == '%':
speechEngine.speak("percent")
elif char == '&':
speechEngine.speak("ampersand")
elif char == "'":
speechEngine.speak("apostrophe")
elif char == '(':
speechEngine.speak("left parenthesis")
elif char == ')':
speechEngine.speak("right parenthesis")
elif char == '*':
speechEngine.speak("asterisk")
elif char == '+':
speechEngine.speak("plus")
elif char == ',':
speechEngine.speak("comma")
elif char == '-':
speechEngine.speak("minus")
elif char == '.':
speechEngine.speak("period")
elif char == ':':
speechEngine.speak("colon")
elif char == ';':
speechEngine.speak("semicolon")
elif char == '<':
speechEngine.speak("less than")
elif char == '=':
speechEngine.speak("equals")
elif char == '>':
speechEngine.speak("greater than")
elif char == '?':
speechEngine.speak("question mark")
elif char == '@':
speechEngine.speak("at sign")
elif char == '[':
speechEngine.speak("left bracket")
elif char == ']':
speechEngine.speak("right bracket")
elif char == '^':
speechEngine.speak("caret")
elif char == '_':
speechEngine.speak("underscore")
elif char == '`':
speechEngine.speak("grave accent")
elif char == '{':
speechEngine.speak("left brace")
elif char == '|':
speechEngine.speak("pipe")
elif char == '}':
speechEngine.speak("right brace")
elif char == '~':
speechEngine.speak("tilde")
else:
# For regular letters, numbers, and other characters
speechEngine.speak(char)
# Allow other events to be processed
pygame.event.pump()
pygame.event.clear()
time.sleep(0.001)