Initial commit.

This commit is contained in:
Storm Dragon
2025-10-04 02:55:01 -04:00
commit 1d19ed377c
16 changed files with 4401 additions and 0 deletions

1198
bookstorm.py Executable file

File diff suppressed because it is too large Load Diff

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# BookStorm source modules

66
src/book.py Normal file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Book Data Structures
Common book and chapter classes used by all parsers.
"""
class Book:
"""Represents a parsed book with navigation and content"""
def __init__(self, title="Untitled", author="Unknown"):
"""
Initialize book
Args:
title: Book title
author: Book author
"""
self.title = title
self.author = author
self.chapters = [] # List of Chapter objects
def add_chapter(self, chapter):
"""Add a chapter to the book"""
self.chapters.append(chapter)
def get_chapter(self, index):
"""Get chapter by index"""
if 0 <= index < len(self.chapters):
return self.chapters[index]
return None
def get_total_chapters(self):
"""Get total number of chapters"""
return len(self.chapters)
class Chapter:
"""Represents a single chapter with paragraphs"""
def __init__(self, title="Untitled"):
"""
Initialize chapter
Args:
title: Chapter title
"""
self.title = title
self.paragraphs = [] # List of paragraph strings
def add_paragraph(self, text):
"""Add a paragraph to the chapter"""
if text and text.strip():
self.paragraphs.append(text.strip())
def get_paragraph(self, index):
"""Get paragraph by index"""
if 0 <= index < len(self.paragraphs):
return self.paragraphs[index]
return None
def get_total_paragraphs(self):
"""Get total number of paragraphs"""
return len(self.paragraphs)

319
src/book_selector.py Normal file
View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Book Selector
Interactive file browser for selecting book files.
Supports navigation and filtering by supported formats.
"""
from pathlib import Path
import zipfile
class BookSelector:
"""Book file selection interface"""
def __init__(self, startDir=None, supportedFormats=None, speechEngine=None):
"""
Initialize book selector
Args:
startDir: Starting directory (default: home)
supportedFormats: List of supported file extensions (default: ['.zip', '.epub'])
speechEngine: SpeechEngine instance for accessibility
"""
if startDir is None:
startDir = Path.home()
if supportedFormats is None:
supportedFormats = ['.zip', '.epub']
self.currentDir = Path(startDir).resolve()
self.supportedFormats = supportedFormats
self.speechEngine = speechEngine
self.currentSelection = 0
self.inBrowser = False
self.items = []
def select_book_interactive(self):
"""
Interactive book selection with directory navigation
Returns:
Selected book path or None if cancelled
"""
while True:
print(f"\nCurrent directory: {self.currentDir}")
print("-" * 60)
# List directories and supported files
items = self._list_items()
if not items:
print("No books or directories found")
print("\nCommands:")
print(" .. - Go to parent directory")
print(" q - Cancel")
print()
choice = input("Select> ").strip()
if choice == 'q':
return None
elif choice == '..':
self._go_parent()
continue
# Display items
for idx, item in enumerate(items):
prefix = "[DIR]" if item['isDir'] else "[BOOK]"
print(f"{idx + 1}. {prefix} {item['name']}")
print("-" * 60)
print("\nCommands:")
print(" <number> - Select item")
print(" .. - Go to parent directory")
print(" q - Cancel")
print()
try:
choice = input("Select> ").strip()
if choice == 'q':
return None
elif choice == '..':
self._go_parent()
else:
# Select item by number
try:
itemNum = int(choice)
if 1 <= itemNum <= len(items):
selectedItem = items[itemNum - 1]
if selectedItem['isDir']:
# Navigate into directory
self.currentDir = selectedItem['path']
else:
# Return selected book
return str(selectedItem['path'])
else:
print(f"Invalid number. Choose 1-{len(items)}")
except ValueError:
print("Invalid input. Enter a number, '..' for parent, or 'q' to cancel")
except (EOFError, KeyboardInterrupt):
print("\nCancelled")
return None
def _list_items(self):
"""
List directories and supported book files in current directory
Returns:
List of item dictionaries
"""
items = []
try:
# Add directories (excluding hidden)
for item in sorted(self.currentDir.iterdir()):
if item.name.startswith('.'):
continue
if item.is_dir():
items.append({
'name': item.name,
'path': item,
'isDir': True
})
# Add supported book files
for item in sorted(self.currentDir.iterdir()):
if item.name.startswith('.'):
continue
if item.is_file() and item.suffix.lower() in self.supportedFormats:
# For zip files, validate that they're actually DAISY books
if item.suffix.lower() == '.zip':
if not self._is_daisy_zip(item):
continue # Skip non-DAISY zip files
items.append({
'name': item.name,
'path': item,
'isDir': False
})
except PermissionError:
print(f"Permission denied: {self.currentDir}")
return items
def _go_parent(self):
"""Navigate to parent directory"""
parent = self.currentDir.parent
if parent != self.currentDir: # Not at root
self.currentDir = parent
else:
print("Already at root directory")
def _is_daisy_zip(self, zipPath):
"""
Check if a zip file contains a DAISY book
Args:
zipPath: Path to zip file
Returns:
True if zip contains DAISY book markers, False otherwise
"""
try:
with zipfile.ZipFile(zipPath, 'r') as zf:
fileList = zf.namelist()
# Check for DAISY 2.02 marker (ncc.html)
if 'ncc.html' in fileList:
return True
# Check for DAISY 3 marker (.ncx file)
for filename in fileList:
if filename.lower().endswith('.ncx'):
return True
return False
except (zipfile.BadZipFile, PermissionError, OSError):
# If we can't read it, don't show it
return False
def reset_to_directory(self, directory):
"""
Reset browser to a specific directory
Args:
directory: Path to directory to reset to
"""
dirPath = Path(directory).resolve()
if dirPath.exists() and dirPath.is_dir():
self.currentDir = dirPath
def get_current_directory(self):
"""
Get current directory path
Returns:
Path object of current directory
"""
return self.currentDir
def enter_browser(self):
"""Enter the book browser"""
self.inBrowser = True
self.currentSelection = 0
self.items = self._list_items()
if self.speechEngine:
self.speechEngine.speak("Book browser. Use arrow keys to navigate, Enter to select, Backspace for parent directory, L to set library, Escape to cancel.")
# Speak current directory and first item
if self.speechEngine:
dirName = self.currentDir.name if self.currentDir.name else str(self.currentDir)
self.speechEngine.speak(f"Directory: {dirName}")
if self.items:
self._speak_current_item()
elif self.speechEngine:
self.speechEngine.speak("Empty directory")
def navigate_browser(self, direction):
"""Navigate browser up or down"""
if not self.items:
return
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(self.items)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(self.items)
self._speak_current_item()
def _speak_current_item(self):
"""Speak current item with name first, then type"""
if not self.items or not self.speechEngine:
return
item = self.items[self.currentSelection]
if item['isDir']:
text = f"{item['name']}, directory"
else:
text = f"{item['name']}, book"
self.speechEngine.speak(text)
def activate_current_item(self):
"""
Activate current item (enter directory or return book path)
Returns:
Book path if selected, None if navigating or empty
"""
if not self.items:
if self.speechEngine:
self.speechEngine.speak("No items")
return None
item = self.items[self.currentSelection]
if item['isDir']:
# Navigate into directory
self.currentDir = item['path']
self.currentSelection = 0
self.items = self._list_items()
if self.speechEngine:
dirName = self.currentDir.name if self.currentDir.name else str(self.currentDir)
self.speechEngine.speak(f"Entered directory: {dirName}")
if self.items:
self._speak_current_item()
elif self.speechEngine:
self.speechEngine.speak("Empty directory")
return None
else:
# Book selected
if self.speechEngine:
self.speechEngine.speak(f"Loading: {item['name']}")
return str(item['path'])
def go_parent_directory(self):
"""Go to parent directory"""
parent = self.currentDir.parent
if parent != self.currentDir:
self.currentDir = parent
self.currentSelection = 0
self.items = self._list_items()
if self.speechEngine:
dirName = self.currentDir.name if self.currentDir.name else str(self.currentDir)
self.speechEngine.speak(f"Parent directory: {dirName}")
if self.items:
self._speak_current_item()
elif self.speechEngine:
self.speechEngine.speak("Empty directory")
else:
if self.speechEngine:
self.speechEngine.speak("Already at root")
def is_in_browser(self):
"""Check if currently in browser"""
return self.inBrowser
def exit_browser(self):
"""Exit the browser"""
self.inBrowser = False
if self.speechEngine:
self.speechEngine.speak("Cancelled")

174
src/bookmark_manager.py Normal file
View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Bookmark Manager
Manages reading positions for books using SQLite for persistence.
Tracks chapter, paragraph, and sentence positions.
"""
import sqlite3
import hashlib
from pathlib import Path
from datetime import datetime
class BookmarkManager:
"""Manages bookmarks for books"""
def __init__(self, dbPath=None):
"""
Initialize bookmark manager
Args:
dbPath: Path to SQLite database (default: ~/.bookstorm/bookmarks.db)
"""
if dbPath is None:
homePath = Path.home()
bookstormDir = homePath / ".bookstorm"
bookstormDir.mkdir(exist_ok=True)
dbPath = bookstormDir / "bookmarks.db"
self.dbPath = dbPath
self._init_db()
def _init_db(self):
"""Initialize database schema"""
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS bookmarks (
book_id TEXT PRIMARY KEY,
book_path TEXT NOT NULL,
book_title TEXT,
chapter_index INTEGER NOT NULL DEFAULT 0,
paragraph_index INTEGER NOT NULL DEFAULT 0,
sentence_index INTEGER NOT NULL DEFAULT 0,
last_accessed TEXT,
created_at TEXT
)
''')
conn.commit()
conn.close()
def _get_book_id(self, bookPath):
"""Generate unique book ID from file path"""
bookPath = str(Path(bookPath).resolve())
return hashlib.sha256(bookPath.encode()).hexdigest()[:16]
def save_bookmark(self, bookPath, bookTitle, chapterIndex, paragraphIndex, sentenceIndex=0):
"""
Save bookmark for a book
Args:
bookPath: Path to book file
bookTitle: Title of the book
chapterIndex: Current chapter index
paragraphIndex: Current paragraph index
sentenceIndex: Current sentence index (default: 0)
"""
bookId = self._get_book_id(bookPath)
timestamp = datetime.now().isoformat()
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO bookmarks
(book_id, book_path, book_title, chapter_index, paragraph_index,
sentence_index, last_accessed, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?,
COALESCE((SELECT created_at FROM bookmarks WHERE book_id = ?), ?))
''', (bookId, str(bookPath), bookTitle, chapterIndex, paragraphIndex,
sentenceIndex, timestamp, bookId, timestamp))
conn.commit()
conn.close()
def get_bookmark(self, bookPath):
"""
Get bookmark for a book
Args:
bookPath: Path to book file
Returns:
Dictionary with bookmark data or None if not found
"""
bookId = self._get_book_id(bookPath)
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
SELECT chapter_index, paragraph_index, sentence_index,
book_title, last_accessed
FROM bookmarks
WHERE book_id = ?
''', (bookId,))
row = cursor.fetchone()
conn.close()
if row:
return {
'chapterIndex': row[0],
'paragraphIndex': row[1],
'sentenceIndex': row[2],
'bookTitle': row[3],
'lastAccessed': row[4]
}
return None
def delete_bookmark(self, bookPath):
"""
Delete bookmark for a book
Args:
bookPath: Path to book file
"""
bookId = self._get_book_id(bookPath)
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('DELETE FROM bookmarks WHERE book_id = ?', (bookId,))
conn.commit()
conn.close()
def list_bookmarks(self):
"""
List all bookmarks
Returns:
List of dictionaries with bookmark data
"""
conn = sqlite3.connect(self.dbPath)
cursor = conn.cursor()
cursor.execute('''
SELECT book_path, book_title, chapter_index, paragraph_index,
sentence_index, last_accessed
FROM bookmarks
ORDER BY last_accessed DESC
''')
rows = cursor.fetchall()
conn.close()
bookmarks = []
for row in rows:
bookmarks.append({
'bookPath': row[0],
'bookTitle': row[1],
'chapterIndex': row[2],
'paragraphIndex': row[3],
'sentenceIndex': row[4],
'lastAccessed': row[5]
})
return bookmarks

221
src/config_manager.py Normal file
View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Config Manager
Manages BookStorm settings using INI format.
Settings stored in ~/.config/stormux/bookstorm/settings.ini
"""
import configparser
from pathlib import Path
class ConfigManager:
"""Manages application configuration"""
def __init__(self, configPath=None):
"""
Initialize config manager
Args:
configPath: Path to config file (optional)
"""
if configPath is None:
homePath = Path.home()
configDir = homePath / ".config" / "stormux" / "bookstorm"
configDir.mkdir(parents=True, exist_ok=True)
configPath = configDir / "settings.ini"
self.configPath = Path(configPath)
self.config = configparser.ConfigParser()
# Load or create config
if self.configPath.exists():
self.config.read(self.configPath)
else:
self._create_default_config()
def _create_default_config(self):
"""Create default configuration"""
self.config['TTS'] = {
'voice_model': '/usr/share/piper-voices/en/en_US/hfc_male/medium/en_US-hfc_male-medium.onnx',
'voice_dir': '/usr/share/piper-voices/en/en_US',
'reader_engine': 'piper',
'speechd_voice': '',
'speechd_output_module': '',
'speech_rate': '0'
}
self.config['Reading'] = {
'auto_advance': 'true',
'auto_save_bookmark': 'true'
}
self.config['Display'] = {
'show_text': 'true'
}
self.config['Paths'] = {
'last_book': '',
'books_directory': str(Path.home()),
'library_directory': ''
}
self.save()
def get(self, section, key, fallback=None):
"""
Get configuration value
Args:
section: Config section
key: Config key
fallback: Default value if not found
Returns:
Configuration value
"""
try:
return self.config.get(section, key)
except (configparser.NoSectionError, configparser.NoOptionError):
return fallback
def get_bool(self, section, key, fallback=False):
"""
Get boolean configuration value
Args:
section: Config section
key: Config key
fallback: Default value if not found
Returns:
Boolean configuration value
"""
try:
return self.config.getboolean(section, key)
except (configparser.NoSectionError, configparser.NoOptionError):
return fallback
def set(self, section, key, value):
"""
Set configuration value
Args:
section: Config section
key: Config key
value: Value to set
"""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, str(value))
def save(self):
"""Save configuration to file"""
with open(self.configPath, 'w') as configFile:
self.config.write(configFile)
def get_voice_model(self):
"""Get configured voice model path"""
return self.get('TTS', 'voice_model')
def set_voice_model(self, modelPath):
"""Set voice model path"""
self.set('TTS', 'voice_model', str(modelPath))
self.save()
def get_voice_dir(self):
"""Get voice models directory"""
return self.get('TTS', 'voice_dir', '/usr/share/piper-voices/en/en_US')
def set_voice_dir(self, voiceDir):
"""Set voice models directory"""
self.set('TTS', 'voice_dir', str(voiceDir))
self.save()
def get_last_book(self):
"""Get last opened book path"""
lastBook = self.get('Paths', 'last_book')
return lastBook if lastBook else None
def set_last_book(self, bookPath):
"""Set last opened book path"""
self.set('Paths', 'last_book', str(bookPath))
self.save()
def get_books_directory(self):
"""Get books directory for file browser"""
return self.get('Paths', 'books_directory', str(Path.home()))
def set_books_directory(self, booksDir):
"""Set books directory"""
self.set('Paths', 'books_directory', str(booksDir))
self.save()
def get_auto_advance(self):
"""Get auto-advance setting"""
return self.get_bool('Reading', 'auto_advance', True)
def get_auto_save(self):
"""Get auto-save bookmark setting"""
return self.get_bool('Reading', 'auto_save_bookmark', True)
def get_reader_engine(self):
"""Get reader engine (piper or speechd)"""
return self.get('TTS', 'reader_engine', 'piper')
def set_reader_engine(self, engine):
"""Set reader engine (piper or speechd)"""
if engine in ['piper', 'speechd']:
self.set('TTS', 'reader_engine', engine)
self.save()
def get_speechd_voice(self):
"""Get speech-dispatcher voice"""
return self.get('TTS', 'speechd_voice', '')
def set_speechd_voice(self, voice):
"""Set speech-dispatcher voice"""
self.set('TTS', 'speechd_voice', str(voice))
self.save()
def get_speechd_output_module(self):
"""Get speech-dispatcher output module"""
return self.get('TTS', 'speechd_output_module', '')
def set_speechd_output_module(self, module):
"""Set speech-dispatcher output module"""
self.set('TTS', 'speechd_output_module', str(module))
self.save()
def get_speech_rate(self):
"""Get speech rate"""
try:
return int(self.get('TTS', 'speech_rate', '0'))
except ValueError:
return 0
def set_speech_rate(self, rate):
"""Set speech rate"""
self.set('TTS', 'speech_rate', str(rate))
self.save()
def get_show_text(self):
"""Get show text display setting"""
return self.get_bool('Display', 'show_text', True)
def set_show_text(self, enabled):
"""Set show text display setting"""
self.set('Display', 'show_text', str(enabled).lower())
self.save()
def get_library_directory(self):
"""Get library directory (default starting point for book browser)"""
return self.get('Paths', 'library_directory', '')
def set_library_directory(self, libraryDir):
"""Set library directory"""
self.set('Paths', 'library_directory', str(libraryDir))
self.save()

324
src/daisy_parser.py Normal file
View File

@@ -0,0 +1,324 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DAISY Book Parser
Handles parsing of DAISY 2.02 and DAISY 3 book formats.
Extracts structure and content for text-to-speech playback.
"""
import zipfile
import tempfile
import shutil
from pathlib import Path
from bs4 import BeautifulSoup
import re
from src.book import Book, Chapter
class DaisyParser:
"""Parser for DAISY format books"""
def __init__(self):
self.tempDir = None
def parse(self, daisyPath):
"""
Parse a DAISY book (zip file)
Args:
daisyPath: Path to DAISY zip file
Returns:
Book object
"""
daisyPath = Path(daisyPath)
if not daisyPath.exists():
raise FileNotFoundError(f"DAISY file not found: {daisyPath}")
# Extract zip to temp directory
self.tempDir = tempfile.mkdtemp(prefix="daisy_")
tempPath = Path(self.tempDir)
try:
with zipfile.ZipFile(daisyPath, 'r') as zipRef:
zipRef.extractall(tempPath)
# Detect DAISY version and parse accordingly
if (tempPath / "ncc.html").exists():
return self._parse_daisy2(tempPath)
elif (tempPath / "navigation.ncx").exists() or list(tempPath.glob("*.ncx")):
return self._parse_daisy3(tempPath)
else:
raise ValueError("Unknown DAISY format: no ncc.html or navigation.ncx found")
except Exception as e:
self.cleanup()
raise e
def _parse_daisy2(self, basePath):
"""Parse DAISY 2.02 format (NCC.html based)"""
nccPath = basePath / "ncc.html"
with open(nccPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
# Get title
titleTag = soup.find('title')
bookTitle = titleTag.get_text().strip() if titleTag else "Unknown Title"
# Find all headings (h1-h6) which represent navigation points
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
chapters = []
for heading in headings:
# Get chapter title
chapterTitle = heading.get_text().strip()
# Find linked content file
link = heading.find('a')
if not link or not link.get('href'):
continue
contentHref = link.get('href')
contentPath = basePath / contentHref.split('#')[0]
if contentPath.exists():
paragraphs = self._extract_paragraphs(contentPath)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
book = Book(bookTitle)
for chapter in chapters:
book.add_chapter(chapter)
return book
def _parse_daisy3(self, basePath):
"""Parse DAISY 3 format (NCX based)"""
# Find NCX file for title
ncxFiles = list(basePath.glob("*.ncx"))
if not ncxFiles:
ncxFiles = [basePath / "navigation.ncx"]
ncxPath = ncxFiles[0]
with open(ncxPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), features='xml')
# Get title
titleTag = soup.find('docTitle')
if titleTag:
textTag = titleTag.find('text')
bookTitle = textTag.get_text().strip() if textTag else "Unknown Title"
else:
bookTitle = "Unknown Title"
# Find DTBook XML file (main content)
dtbookFiles = list(basePath.glob("*.xml"))
# Exclude navigation.ncx if it was named .xml
dtbookFiles = [f for f in dtbookFiles if not f.name.endswith('.ncx')]
if dtbookFiles:
# Try to parse DTBook using NCX navigation structure first
chapters = self._parse_dtbook_with_ncx(dtbookFiles[0], basePath, soup)
if not chapters:
# Fallback: Parse DTBook XML directly for content
chapters = self._parse_dtbook(dtbookFiles[0])
else:
# Fallback to old method for HTML-based DAISY
chapters = self._parse_daisy3_html(basePath, soup)
book = Book(bookTitle)
for chapter in chapters:
book.add_chapter(chapter)
return book
def _parse_daisy3_html(self, basePath, ncxSoup):
"""Parse DAISY 3 with HTML content files (fallback)"""
navPoints = ncxSoup.find_all('navPoint')
chapters = []
for navPoint in navPoints:
# Get chapter title
navLabel = navPoint.find('navLabel')
if navLabel:
textTag = navLabel.find('text')
chapterTitle = textTag.get_text().strip() if textTag else "Untitled Chapter"
else:
chapterTitle = "Untitled Chapter"
# Find content source
content = navPoint.find('content')
if not content or not content.get('src'):
continue
contentSrc = content.get('src')
contentPath = basePath / contentSrc.split('#')[0]
if contentPath.exists():
paragraphs = self._extract_paragraphs(contentPath)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters
def _parse_dtbook_with_ncx(self, dtbookPath, basePath, ncxSoup):
"""
Parse DTBook using NCX navigation structure
Args:
dtbookPath: Path to DTBook XML file
basePath: Base directory path
ncxSoup: BeautifulSoup object of parsed NCX
Returns:
List of Chapter objects or None if parsing fails
"""
try:
# Load DTBook content
with open(dtbookPath, 'r', encoding='utf-8', errors='ignore') as f:
dtbookSoup = BeautifulSoup(f.read(), features='xml')
# Find all top-level navPoints (chapters)
navMap = ncxSoup.find('navMap')
if not navMap:
return None
chapters = []
for navPoint in navMap.find_all('navPoint', recursive=False):
# Get chapter title
navLabel = navPoint.find('navLabel')
if navLabel:
textTag = navLabel.find('text')
chapterTitle = textTag.get_text().strip() if textTag else "Untitled"
else:
chapterTitle = "Untitled"
# Get content source
content = navPoint.find('content')
if not content or not content.get('src'):
continue
contentSrc = content.get('src')
# Extract fragment identifier (anchor)
parts = contentSrc.split('#')
anchor = parts[1] if len(parts) > 1 else None
if not anchor:
continue
# Find the element in DTBook by ID
section = dtbookSoup.find(id=anchor)
if not section:
continue
# Extract paragraphs from this section
paragraphs = []
for p in section.find_all('p'):
text = p.get_text().strip()
text = re.sub(r'\s+', ' ', text)
if text:
paragraphs.append(text)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters if chapters else None
except Exception as e:
print(f"Error parsing DTBook with NCX: {e}")
return None
def _parse_dtbook(self, dtbookPath):
"""Parse DTBook XML format"""
with open(dtbookPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), features='xml')
chapters = []
# Find all level1 elements (top-level sections)
level1Elements = soup.find_all('level1')
for level1 in level1Elements:
# Get chapter title from h1, h2, or id
chapterTitle = None
# Try to find heading
for hTag in ['h1', 'h2', 'h3']:
heading = level1.find(hTag)
if heading:
chapterTitle = heading.get_text().strip()
break
# Fallback to id
if not chapterTitle:
chapterTitle = level1.get('id', 'Untitled Chapter')
# Extract paragraphs from this level1
paragraphs = []
for p in level1.find_all('p'):
text = p.get_text().strip()
text = re.sub(r'\s+', ' ', text)
if text:
paragraphs.append(text)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters
def _extract_paragraphs(self, htmlPath):
"""Extract paragraphs from HTML content file"""
with open(htmlPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
paragraphs = []
# Find all paragraph tags
for p in soup.find_all('p'):
text = p.get_text().strip()
# Clean up whitespace
text = re.sub(r'\s+', ' ', text)
if text:
paragraphs.append(text)
# If no <p> tags, try divs or just get all text
if not paragraphs:
# Try divs
for div in soup.find_all('div'):
text = div.get_text().strip()
text = re.sub(r'\s+', ' ', text)
if text and len(text) > 10: # Avoid tiny fragments
paragraphs.append(text)
# Last resort: split body text by double newlines
if not paragraphs:
body = soup.find('body')
if body:
text = body.get_text()
# Split on multiple newlines or periods followed by newline
chunks = re.split(r'\n\n+', text)
for chunk in chunks:
chunk = chunk.strip()
chunk = re.sub(r'\s+', ' ', chunk)
if chunk:
paragraphs.append(chunk)
return paragraphs
def cleanup(self):
"""Clean up temporary files"""
if self.tempDir and Path(self.tempDir).exists():
shutil.rmtree(self.tempDir)
self.tempDir = None

426
src/epub_parser.py Normal file
View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
EPUB Parser
Parses EPUB format ebooks and extracts text content.
EPUB files are ZIP archives containing XHTML/HTML content.
"""
import zipfile
import tempfile
import shutil
from pathlib import Path
from bs4 import BeautifulSoup
from src.book import Book, Chapter
class EpubParser:
"""Parser for EPUB format ebooks"""
def __init__(self):
"""Initialize EPUB parser"""
self.tempDir = None
def parse(self, epubPath):
"""
Parse EPUB file
Args:
epubPath: Path to EPUB file
Returns:
Book object
"""
epubPath = Path(epubPath)
# Create temp directory for extraction
self.tempDir = tempfile.mkdtemp(prefix='bookstorm_epub_')
tempPath = Path(self.tempDir)
try:
# Extract EPUB (it's a ZIP file)
with zipfile.ZipFile(epubPath, 'r') as zipRef:
zipRef.extractall(tempPath)
# Find content.opf file
opfPath = self._find_opf(tempPath)
if not opfPath:
raise ValueError("Could not find content.opf in EPUB")
# Parse OPF to get metadata and spine
metadata, spine, manifest = self._parse_opf(opfPath)
# Create book
book = Book()
book.title = metadata.get('title', epubPath.stem)
book.author = metadata.get('creator', 'Unknown')
# Try to use TOC structure first
opfDir = opfPath.parent
tocChapters = self._parse_toc_structure(tempPath, opfDir, manifest)
if tocChapters:
# Successfully parsed using TOC
for chapter in tocChapters:
book.add_chapter(chapter)
else:
# Fallback: Parse content files in spine order
for itemId in spine:
if itemId in manifest:
contentPath = opfDir / manifest[itemId]
if contentPath.exists():
chapters = self._parse_content_file(contentPath)
for chapter in chapters:
book.add_chapter(chapter)
return book
except Exception as e:
raise Exception(f"Error parsing EPUB: {e}")
def _find_opf(self, epubDir):
"""
Find content.opf file in EPUB
Args:
epubDir: Extracted EPUB directory
Returns:
Path to content.opf or None
"""
# Check container.xml first
containerPath = epubDir / 'META-INF' / 'container.xml'
if containerPath.exists():
try:
with open(containerPath, 'r', encoding='utf-8') as f:
soup = BeautifulSoup(f.read(), features='xml')
rootfile = soup.find('rootfile')
if rootfile and rootfile.get('full-path'):
return epubDir / rootfile['full-path']
except Exception:
pass
# Fallback: search for .opf files
for opfFile in epubDir.rglob('*.opf'):
return opfFile
return None
def _parse_opf(self, opfPath):
"""
Parse OPF file to get metadata, spine, and manifest
Args:
opfPath: Path to content.opf
Returns:
Tuple of (metadata dict, spine list, manifest dict)
"""
with open(opfPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), features='xml')
# Extract metadata
metadata = {}
metadataTag = soup.find('metadata')
if metadataTag:
title = metadataTag.find('dc:title')
if title:
metadata['title'] = title.get_text(strip=True)
creator = metadataTag.find('dc:creator')
if creator:
metadata['creator'] = creator.get_text(strip=True)
# Extract manifest (id -> href mapping)
manifest = {}
manifestTag = soup.find('manifest')
if manifestTag:
for item in manifestTag.find_all('item'):
itemId = item.get('id')
href = item.get('href')
if itemId and href:
manifest[itemId] = href
# Extract spine (reading order)
spine = []
spineTag = soup.find('spine')
if spineTag:
for itemref in spineTag.find_all('itemref'):
idref = itemref.get('idref')
if idref:
spine.append(idref)
return metadata, spine, manifest
def _parse_toc_structure(self, epubDir, opfDir, manifest):
"""
Parse TOC structure (NCX or nav.xhtml) to get chapters
Args:
epubDir: Root EPUB directory
opfDir: Directory containing OPF file
manifest: Manifest dict from OPF
Returns:
List of Chapter objects or None if TOC not found
"""
# Try EPUB 3 nav.xhtml first
navChapters = self._parse_nav_xhtml(epubDir, opfDir, manifest)
if navChapters:
return navChapters
# Try EPUB 2 NCX
ncxChapters = self._parse_ncx(epubDir, opfDir, manifest)
if ncxChapters:
return ncxChapters
return None
def _parse_nav_xhtml(self, epubDir, opfDir, manifest):
"""
Parse EPUB 3 nav.xhtml for TOC structure
Args:
epubDir: Root EPUB directory
opfDir: Directory containing OPF file
manifest: Manifest dict from OPF
Returns:
List of Chapter objects or None
"""
# Find nav document in manifest
navPath = None
for itemId, href in manifest.items():
if 'nav' in href.lower() and href.endswith(('.xhtml', '.html')):
navPath = opfDir / href
if navPath.exists():
break
if not navPath or not navPath.exists():
return None
try:
with open(navPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
# Find TOC nav element
tocNav = soup.find('nav', attrs={'epub:type': 'toc'})
if not tocNav:
tocNav = soup.find('nav', id='toc')
if not tocNav:
return None
# Extract chapters from nav list
chapters = []
for link in tocNav.find_all('a', href=True):
chapterTitle = link.get_text(strip=True)
href = link.get('href')
if not chapterTitle or not href:
continue
# Extract content from href location
paragraphs = self._extract_content_from_href(opfDir, href)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters if chapters else None
except Exception as e:
print(f"Error parsing nav.xhtml: {e}")
return None
def _parse_ncx(self, epubDir, opfDir, manifest):
"""
Parse EPUB 2 NCX file for TOC structure
Args:
epubDir: Root EPUB directory
opfDir: Directory containing OPF file
manifest: Manifest dict from OPF
Returns:
List of Chapter objects or None
"""
# Find NCX file in manifest
ncxPath = None
for itemId, href in manifest.items():
if href.endswith('.ncx'):
ncxPath = opfDir / href
if ncxPath.exists():
break
# Fallback: search for any .ncx file
if not ncxPath or not ncxPath.exists():
ncxFiles = list(epubDir.rglob('*.ncx'))
if ncxFiles:
ncxPath = ncxFiles[0]
if not ncxPath or not ncxPath.exists():
return None
try:
with open(ncxPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), features='xml')
# Find all navPoints (top-level only)
navMap = soup.find('navMap')
if not navMap:
return None
chapters = []
for navPoint in navMap.find_all('navPoint', recursive=False):
# Get chapter title
navLabel = navPoint.find('navLabel')
if navLabel:
textTag = navLabel.find('text')
chapterTitle = textTag.get_text(strip=True) if textTag else "Untitled"
else:
chapterTitle = "Untitled"
# Get content source
content = navPoint.find('content')
if not content or not content.get('src'):
continue
href = content.get('src')
# Extract content from href location
paragraphs = self._extract_content_from_href(opfDir, href)
if paragraphs:
chapter = Chapter(chapterTitle)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters if chapters else None
except Exception as e:
print(f"Error parsing NCX: {e}")
return None
def _extract_content_from_href(self, opfDir, href):
"""
Extract paragraphs from a specific href location
Args:
opfDir: Directory containing OPF file
href: Content href (may include #anchor)
Returns:
List of paragraph strings
"""
# Split href into file and anchor
parts = href.split('#')
filePath = opfDir / parts[0]
anchor = parts[1] if len(parts) > 1 else None
if not filePath.exists():
return []
try:
with open(filePath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
# If anchor specified, find that element
if anchor:
section = soup.find(id=anchor)
if not section:
# Try to find by name attribute
section = soup.find(attrs={'name': anchor})
if not section:
# Fallback to entire body
section = soup.find('body') or soup
else:
section = soup.find('body') or soup
# Extract paragraphs from section
paragraphs = []
for p in section.find_all('p'):
text = p.get_text(strip=True)
if text:
paragraphs.append(text)
return paragraphs
except Exception as e:
print(f"Error extracting content from {href}: {e}")
return []
def _parse_content_file(self, contentPath):
"""
Parse XHTML/HTML content file
Args:
contentPath: Path to content file
Returns:
List of Chapter objects
"""
try:
with open(contentPath, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
except Exception as e:
print(f"Error reading content file {contentPath}: {e}")
return []
chapters = []
# Look for main content sections
# Try h1, h2, section elements
sections = soup.find_all(['section', 'div'], class_=lambda x: x and 'section' in x.lower() if x else False)
if not sections:
# Fallback: treat entire file as one chapter
sections = [soup.find('body') or soup]
for section in sections:
# Find chapter title
title = None
for hTag in ['h1', 'h2', 'h3']:
heading = section.find(hTag)
if heading:
title = heading.get_text(strip=True)
break
if not title:
title = contentPath.stem
# Extract paragraphs
paragraphs = []
for p in section.find_all('p'):
text = p.get_text(strip=True)
if text:
paragraphs.append(text)
# Only add chapter if it has content
if paragraphs:
chapter = Chapter(title)
chapter.paragraphs = paragraphs
chapters.append(chapter)
# If no chapters found, extract all paragraphs as one chapter
if not chapters:
title = contentPath.stem
paragraphs = []
for p in soup.find_all('p'):
text = p.get_text(strip=True)
if text:
paragraphs.append(text)
if paragraphs:
chapter = Chapter(title)
chapter.paragraphs = paragraphs
chapters.append(chapter)
return chapters
def cleanup(self):
"""Clean up temporary files"""
if self.tempDir and Path(self.tempDir).exists():
try:
shutil.rmtree(self.tempDir)
except Exception as e:
print(f"Warning: Could not remove temp directory: {e}")

490
src/options_menu.py Normal file
View File

@@ -0,0 +1,490 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Options Menu
Interactive menu system for BookStorm settings.
Inspired by soundstorm's hierarchical menu system.
"""
from pathlib import Path
from src.tts_engine import TtsEngine
class OptionsMenu:
"""Options menu for configuring BookStorm settings"""
def __init__(self, config, speechEngine, voiceSelector, audioPlayer, ttsReloadCallback=None):
"""
Initialize options menu
Args:
config: ConfigManager instance
speechEngine: SpeechEngine instance
voiceSelector: VoiceSelector instance
audioPlayer: PygamePlayer instance
ttsReloadCallback: Optional callback to reload TTS engine
"""
self.config = config
self.speechEngine = speechEngine
self.voiceSelector = voiceSelector
self.audioPlayer = audioPlayer
self.ttsReloadCallback = ttsReloadCallback
self.currentSelection = 0
self.inMenu = False
def show_main_menu(self):
"""
Show main options menu
Returns:
Menu items as list of dicts
"""
readerEngine = self.config.get_reader_engine()
readerEngineText = "Piper-TTS" if readerEngine == "piper" else "Speech-Dispatcher"
menuItems = [
{
'label': f"Reader Engine: {readerEngineText}",
'action': 'toggle_reader_engine'
},
{
'label': "Select Reader Voice",
'action': 'select_voice'
}
]
# Add output module selection only when using speech-dispatcher
if readerEngine == 'speechd':
menuItems.append({
'label': "Select Speech Engine",
'action': 'select_output_module'
})
# Add text display toggle
showText = self.config.get_show_text()
showTextLabel = "Show Text Display: On" if showText else "Show Text Display: Off"
menuItems.extend([
{
'label': showTextLabel,
'action': 'toggle_show_text'
},
{
'label': "Speech Rate Settings",
'action': 'speech_rate'
},
{
'label': "Back",
'action': 'back'
}
])
return menuItems
def navigate_menu(self, direction):
"""
Navigate menu up or down
Args:
direction: 'up' or 'down'
Returns:
Current menu item
"""
menuItems = self.show_main_menu()
if direction == 'up':
self.currentSelection = (self.currentSelection - 1) % len(menuItems)
elif direction == 'down':
self.currentSelection = (self.currentSelection + 1) % len(menuItems)
currentItem = menuItems[self.currentSelection]
self.speechEngine.speak(currentItem['label'])
return currentItem
def activate_current_item(self):
"""
Activate currently selected menu item
Returns:
True to stay in menu, False to exit
"""
menuItems = self.show_main_menu()
currentItem = menuItems[self.currentSelection]
action = currentItem['action']
if action == 'toggle_reader_engine':
return self._toggle_reader_engine()
elif action == 'select_voice':
return self._select_voice()
elif action == 'select_output_module':
return self._select_output_module()
elif action == 'toggle_show_text':
return self._toggle_show_text()
elif action == 'speech_rate':
return self._speech_rate_info()
elif action == 'back':
self.speechEngine.speak("Closing options menu")
return False
return True
def _toggle_reader_engine(self):
"""Toggle between piper-tts and speech-dispatcher"""
currentEngine = self.config.get_reader_engine()
if currentEngine == 'piper':
newEngine = 'speechd'
oldEngine = 'piper'
self.config.set_reader_engine('speechd')
message = "Reader engine: Speech-Dispatcher."
else:
newEngine = 'piper'
oldEngine = 'speechd'
self.config.set_reader_engine('piper')
message = "Reader engine: Piper-TTS."
# Reload TTS engine if callback available
needsRestart = False
if self.ttsReloadCallback:
try:
self.ttsReloadCallback()
except Exception as e:
print(f"Error reloading TTS engine: {e}")
needsRestart = True
else:
needsRestart = True
if needsRestart:
# Show restart confirmation dialog
self.previousEngine = oldEngine
self.inRestartMenu = True
self.restartSelection = 0
message = "Restart required. Restart now or cancel?"
self.speechEngine.speak(message)
print(message)
# Speak first option
self.speechEngine.speak("Restart now")
else:
self.speechEngine.speak(message)
print(message)
return True
def _select_voice(self):
"""Select voice based on current reader engine"""
readerEngine = self.config.get_reader_engine()
if readerEngine == 'piper':
return self._select_piper_voice()
else:
return self._select_speechd_voice()
def _select_piper_voice(self):
"""Select piper-tts voice"""
self.speechEngine.speak("Selecting piper voice. Use arrow keys to browse, enter to select, escape to cancel.")
voices = self.voiceSelector.get_voices()
if not voices:
self.speechEngine.speak("No piper voices found.")
return True
# Store current selection for voice browsing
self.voiceSelection = 0
self.voiceList = voices
self.inVoiceMenu = True
# Speak first voice using piper
try:
voice = voices[0]
tts = TtsEngine(voice['path'])
voiceName = voice['name'].split('(')[0].strip()
text = f"Hi, my name is {voiceName}"
wavData = tts.text_to_wav_data(text)
if wavData:
self.audioPlayer.play_wav_data(wavData)
except Exception as e:
# Fallback to speech-dispatcher
print(f"Error playing voice sample: {e}")
self.speechEngine.speak(voices[0]['name'])
return True
def _select_speechd_voice(self):
"""Select speech-dispatcher voice"""
voices = self.speechEngine.list_voices()
if not voices:
self.speechEngine.speak("No speech dispatcher voices available.")
return True
self.speechEngine.speak("Selecting speech dispatcher voice. Use arrow keys to browse, enter to select, escape to cancel.")
# Store current selection for voice browsing
self.voiceSelection = 0
self.voiceList = voices
self.inVoiceMenu = True
# Speak first voice
if len(voices) > 0:
voice = voices[0]
if isinstance(voice, tuple):
voiceName = f"{voice[0]} ({voice[1]})"
else:
voiceName = str(voice)
self.speechEngine.speak(voiceName)
return True
def navigate_voice_menu(self, direction):
"""Navigate voice selection menu"""
if not hasattr(self, 'voiceList') or not self.voiceList:
return
if direction == 'up':
self.voiceSelection = (self.voiceSelection - 1) % len(self.voiceList)
elif direction == 'down':
self.voiceSelection = (self.voiceSelection + 1) % len(self.voiceList)
# Speak current voice
voice = self.voiceList[self.voiceSelection]
readerEngine = self.config.get_reader_engine()
if readerEngine == 'piper':
# Use piper to speak the voice name with that voice
try:
tts = TtsEngine(voice['path'])
# Generate voice speaking its own name
voiceName = voice['name'].split('(')[0].strip()
text = f"Hi, my name is {voiceName}"
wavData = tts.text_to_wav_data(text)
if wavData:
self.audioPlayer.play_wav_data(wavData)
except Exception as e:
# Fallback to speech-dispatcher if error
print(f"Error playing voice sample: {e}")
self.speechEngine.speak(voice['name'])
else:
# Format speech-dispatcher voice tuple
if isinstance(voice, tuple):
voiceName = f"{voice[0]} ({voice[1]})"
else:
voiceName = str(voice)
self.speechEngine.speak(voiceName)
def select_current_voice(self):
"""Select the currently highlighted voice"""
if not hasattr(self, 'voiceList') or not self.voiceList:
return True
voice = self.voiceList[self.voiceSelection]
readerEngine = self.config.get_reader_engine()
if readerEngine == 'piper':
# Save piper voice
self.config.set_voice_model(voice['path'])
message = f"Voice selected: {voice['name']}."
# Reload TTS engine if callback available
if self.ttsReloadCallback:
try:
self.ttsReloadCallback()
except Exception as e:
print(f"Error reloading TTS engine: {e}")
message += " Restart required."
else:
message += " Restart required."
self.speechEngine.speak(message)
print(message)
else:
# Save speechd voice - extract name from tuple
if isinstance(voice, tuple):
voiceName = voice[0] # First element is the voice name
displayName = f"{voice[0]} ({voice[1]})"
else:
voiceName = str(voice)
displayName = voiceName
self.config.set_speechd_voice(voiceName)
self.speechEngine.set_voice(voiceName)
message = f"Voice selected: {displayName}"
self.speechEngine.speak(message)
print(message)
# Exit voice menu
self.inVoiceMenu = False
return True
def _select_output_module(self):
"""Select speech-dispatcher output module"""
modules = self.speechEngine.list_output_modules()
if not modules:
self.speechEngine.speak("No output modules available.")
return True
self.speechEngine.speak("Selecting speech engine. Use arrow keys to browse, enter to select, escape to cancel.")
# Store current selection for module browsing
self.moduleSelection = 0
self.moduleList = modules
self.inModuleMenu = True
# Speak first module
if len(modules) > 0:
self.speechEngine.speak(modules[0])
return True
def navigate_module_menu(self, direction):
"""Navigate output module selection menu"""
if not hasattr(self, 'moduleList') or not self.moduleList:
return
if direction == 'up':
self.moduleSelection = (self.moduleSelection - 1) % len(self.moduleList)
elif direction == 'down':
self.moduleSelection = (self.moduleSelection + 1) % len(self.moduleList)
# Speak current module
module = self.moduleList[self.moduleSelection]
self.speechEngine.speak(module)
def select_current_module(self):
"""Select the currently highlighted output module"""
if not hasattr(self, 'moduleList') or not self.moduleList:
return True
module = self.moduleList[self.moduleSelection]
# Save and set output module
self.config.set_speechd_output_module(module)
self.speechEngine.set_output_module(module)
message = f"Speech engine selected: {module}"
self.speechEngine.speak(message)
print(message)
# Exit module menu
self.inModuleMenu = False
return True
def exit_module_menu(self):
"""Exit output module selection menu"""
self.inModuleMenu = False
self.speechEngine.speak("Cancelled. Back to options menu.")
# Speak current main menu item
menuItems = self.show_main_menu()
if menuItems and self.currentSelection < len(menuItems):
self.speechEngine.speak(menuItems[self.currentSelection]['label'])
def is_in_module_menu(self):
"""Check if currently in output module selection submenu"""
return hasattr(self, 'inModuleMenu') and self.inModuleMenu
def is_in_restart_menu(self):
"""Check if currently in restart confirmation dialog"""
return hasattr(self, 'inRestartMenu') and self.inRestartMenu
def navigate_restart_menu(self, direction):
"""Navigate restart confirmation menu"""
# Toggle between "Restart now" (0) and "Cancel" (1)
if direction == 'up' or direction == 'down':
self.restartSelection = 1 - self.restartSelection
# Speak current option
if self.restartSelection == 0:
self.speechEngine.speak("Restart now")
else:
self.speechEngine.speak("Cancel")
def select_restart_option(self):
"""Handle restart menu selection"""
if self.restartSelection == 0:
# Restart now - exit cleanly
self.speechEngine.speak("Restarting. Please run bookstorm again.")
import sys
sys.exit(0)
else:
# Cancel - revert engine change
self.config.set_reader_engine(self.previousEngine)
self.inRestartMenu = False
self.speechEngine.speak("Cancelled. Engine change reverted.")
# Speak current main menu item
menuItems = self.show_main_menu()
if menuItems and self.currentSelection < len(menuItems):
self.speechEngine.speak(menuItems[self.currentSelection]['label'])
return True
def exit_restart_menu(self):
"""Exit restart confirmation menu (same as cancel)"""
self.config.set_reader_engine(self.previousEngine)
self.inRestartMenu = False
self.speechEngine.speak("Cancelled. Engine change reverted.")
# Speak current main menu item
menuItems = self.show_main_menu()
if menuItems and self.currentSelection < len(menuItems):
self.speechEngine.speak(menuItems[self.currentSelection]['label'])
def _toggle_show_text(self):
"""Toggle text display on/off"""
currentSetting = self.config.get_show_text()
newSetting = not currentSetting
self.config.set_show_text(newSetting)
if newSetting:
message = "Text display: On"
else:
message = "Text display: Off"
self.speechEngine.speak(message)
print(message)
return True
def _speech_rate_info(self):
"""Show speech rate information"""
currentRate = self.config.get_speech_rate()
message = f"Current speech rate: {currentRate}. Use Page Up and Page Down to adjust during reading."
self.speechEngine.speak(message)
print(message)
return True
def enter_menu(self):
"""Enter the options menu"""
self.inMenu = True
self.currentSelection = 0
self.inVoiceMenu = False
self.speechEngine.speak("Options menu. Use arrow keys to navigate, Enter to select, Escape to close.")
# Speak first item
menuItems = self.show_main_menu()
if menuItems:
self.speechEngine.speak(menuItems[0]['label'])
def is_in_menu(self):
"""Check if currently in menu"""
return self.inMenu
def is_in_voice_menu(self):
"""Check if currently in voice selection submenu"""
return hasattr(self, 'inVoiceMenu') and self.inVoiceMenu
def exit_voice_menu(self):
"""Exit voice selection menu"""
self.inVoiceMenu = False
self.speechEngine.speak("Cancelled. Back to options menu.")
# Speak current main menu item
menuItems = self.show_main_menu()
if menuItems and self.currentSelection < len(menuItems):
self.speechEngine.speak(menuItems[self.currentSelection]['label'])
def exit_menu(self):
"""Exit the menu"""
self.inMenu = False
self.inVoiceMenu = False
self.currentSelection = 0

94
src/pdf_parser.py Normal file
View File

@@ -0,0 +1,94 @@
"""PDF parser for BookStorm - extracts text from PDF files."""
import re
from pathlib import Path
from pypdf import PdfReader
from src.book import Book, Chapter
def clean_hyphenated_text(text):
"""Clean up hyphenated line breaks in text.
Converts 'word-\nword' to 'word' to fix PDF line wrapping.
"""
# Remove hyphens at end of lines followed by continuation
text = re.sub(r'-\s*\n\s*', '', text)
return text
def split_into_paragraphs(text):
"""Split text into paragraphs using double newlines.
Falls back to single newlines if no double newlines found.
"""
# Try splitting on double newlines first
paragraphs = re.split(r'\n\s*\n', text)
# Clean up whitespace
paragraphs = [p.strip() for p in paragraphs if p.strip()]
# If we got very few paragraphs, fall back to single newlines
if len(paragraphs) < 3:
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
return paragraphs
class PdfParser:
"""Parser for PDF files"""
def __init__(self):
pass
def parse(self, pdfPath):
"""Parse a PDF file and return Book object.
Args:
pdfPath: Path to the PDF file
Returns:
Book object with each page as a chapter
"""
pdfPath = Path(pdfPath)
if not pdfPath.exists():
raise FileNotFoundError(f"PDF file not found: {pdfPath}")
reader = PdfReader(pdfPath)
# Use filename as book title (without extension)
bookTitle = pdfPath.stem
book = Book(title=bookTitle)
for pageNum, page in enumerate(reader.pages):
# Create chapter for this page
chapterTitle = f"Page {pageNum + 1}"
chapter = Chapter(title=chapterTitle)
# Extract text from page
pageText = page.extract_text()
if not pageText or not pageText.strip():
# Empty page, add placeholder
chapter.add_paragraph(f"Page {pageNum + 1} is empty or contains no extractable text.")
else:
# Clean up hyphenated line breaks
pageText = clean_hyphenated_text(pageText)
# Split into paragraphs
paragraphs = split_into_paragraphs(pageText)
if not paragraphs:
chapter.add_paragraph(f"Page {pageNum + 1} contains no readable text.")
else:
for paragraph in paragraphs:
chapter.add_paragraph(paragraph)
book.add_chapter(chapter)
return book
def cleanup(self):
"""Cleanup any resources (no-op for PDF parser)"""
pass

120
src/pygame_player.py Normal file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Pygame Audio Player
Audio playback using pygame.mixer with integrated event handling.
Simpler and more reliable than PyAudio approach.
"""
import io
import pygame
class PygamePlayer:
"""Audio player using pygame.mixer"""
def __init__(self):
"""Initialize pygame audio player"""
self.isInitialized = False
self.isPaused = False
self.currentSound = None # Track current sound for cleanup
try:
# Initialize pygame mixer only (not full pygame)
# Use only 1 channel to force immediate cleanup of old sounds
pygame.mixer.init(frequency=22050, size=-16, channels=1, buffer=512)
pygame.mixer.set_num_channels(1) # Limit to 1 channel for sequential playback
self.isInitialized = True
except Exception as e:
print(f"Warning: Could not initialize pygame mixer: {e}")
self.isInitialized = False
def play_wav_data(self, wavData):
"""
Play WAV audio data
Args:
wavData: Bytes containing WAV audio data
Returns:
True if playback started successfully
"""
if not self.isInitialized:
return False
try:
# Cleanup previous sound to prevent memory leak
if self.currentSound:
# Explicitly stop to release pygame's internal buffers
# This is safe since we call play_wav_data only when ready for next paragraph
self.currentSound.stop()
del self.currentSound
self.currentSound = None
# Load WAV data from bytes
# CRITICAL: Must close BytesIO after Sound is created to prevent memory leak
wavBuffer = io.BytesIO(wavData)
try:
sound = pygame.mixer.Sound(wavBuffer)
finally:
# Close BytesIO buffer immediately - pygame.mixer.Sound copies the data
wavBuffer.close()
del wavBuffer # Explicitly delete
# Play the sound and keep reference for cleanup
sound.play()
self.currentSound = sound
self.isPaused = False
return True
except Exception as e:
print(f"Error playing audio: {e}")
return False
def pause(self):
"""Pause playback"""
if self.isInitialized:
pygame.mixer.pause()
self.isPaused = True
def resume(self):
"""Resume playback"""
if self.isInitialized:
pygame.mixer.unpause()
self.isPaused = False
def stop(self):
"""Stop playback"""
if self.isInitialized:
pygame.mixer.stop()
self.isPaused = False
# Cleanup current sound reference
if self.currentSound:
del self.currentSound
self.currentSound = None
def is_playing(self):
"""Check if audio is currently playing"""
if not self.isInitialized:
return False
return pygame.mixer.get_busy()
def is_paused(self):
"""Check if audio is paused"""
return self.isPaused
def cleanup(self):
"""Cleanup resources"""
if self.isInitialized:
# Stop and cleanup current sound
if self.currentSound:
self.currentSound.stop()
del self.currentSound
self.currentSound = None
pygame.mixer.quit()
self.isInitialized = False
def is_available(self):
"""Check if pygame mixer is available"""
return self.isInitialized

145
src/sleep_timer_menu.py Normal file
View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Sleep Timer Menu
Provides a menu for setting sleep timer or quitting immediately.
"""
import time
class SleepTimerMenu:
"""Sleep timer menu for quit/sleep options"""
def __init__(self, speechEngine):
"""
Initialize sleep timer menu
Args:
speechEngine: SpeechEngine instance for UI feedback
"""
self.speechEngine = speechEngine
self.inMenu = False
self.currentIndex = 0
# Menu options: (label, minutes)
# 0 means quit immediately
self.menuItems = [
("Quit now", 0),
("Sleep in 5 minutes", 5),
("Sleep in 10 minutes", 10),
("Sleep in 15 minutes", 15),
("Sleep in 20 minutes", 20),
("Sleep in 25 minutes", 25),
("Sleep in 30 minutes", 30),
("Sleep in 35 minutes", 35),
("Sleep in 40 minutes", 40),
("Sleep in 45 minutes", 45),
("Sleep in 50 minutes", 50),
("Sleep in 55 minutes", 55),
("Sleep in 60 minutes", 60),
]
# Timer state
self.timerActive = False
self.timerEndTime = None
self.timerMinutes = 0
def enter_menu(self):
"""Enter the sleep timer menu"""
self.inMenu = True
self.currentIndex = 0
self._speak_current_item()
def exit_menu(self):
"""Exit the sleep timer menu"""
self.inMenu = False
def is_in_menu(self):
"""Check if currently in menu"""
return self.inMenu
def navigate_menu(self, direction):
"""
Navigate menu up or down
Args:
direction: 'up' or 'down'
"""
if direction == 'up':
self.currentIndex = (self.currentIndex - 1) % len(self.menuItems)
elif direction == 'down':
self.currentIndex = (self.currentIndex + 1) % len(self.menuItems)
self._speak_current_item()
def activate_current_item(self):
"""
Activate current menu item
Returns:
Tuple: (shouldQuitNow, shouldContinue)
shouldQuitNow: True if user selected "Quit now"
shouldContinue: True if timer was set (continue reading)
"""
label, minutes = self.menuItems[self.currentIndex]
if minutes == 0:
# Quit now
self.speechEngine.speak("Quitting now")
self.inMenu = False
return (True, False)
else:
# Set sleep timer
self.timerActive = True
self.timerMinutes = minutes
self.timerEndTime = time.time() + (minutes * 60)
self.speechEngine.speak(f"Sleep timer set for {minutes} minutes")
self.inMenu = False
return (False, True)
def check_timer(self):
"""
Check if sleep timer has expired
Returns:
True if timer expired and should quit
"""
if self.timerActive and time.time() >= self.timerEndTime:
return True
return False
def get_time_remaining(self):
"""
Get time remaining on sleep timer
Returns:
Tuple: (minutes, seconds) or None if no timer active
"""
if not self.timerActive:
return None
remaining = self.timerEndTime - time.time()
if remaining <= 0:
return (0, 0)
minutes = int(remaining // 60)
seconds = int(remaining % 60)
return (minutes, seconds)
def cancel_timer(self):
"""Cancel active sleep timer"""
if self.timerActive:
self.timerActive = False
self.timerEndTime = None
self.speechEngine.speak("Sleep timer cancelled")
def is_timer_active(self):
"""Check if sleep timer is active"""
return self.timerActive
def _speak_current_item(self):
"""Speak the current menu item"""
label, _ = self.menuItems[self.currentIndex]
self.speechEngine.speak(label)

290
src/speech_engine.py Normal file
View File

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Speech Engine
Handles text-to-speech for UI feedback using speech-dispatcher.
Based on soundstorm's speech.py implementation.
"""
import threading
try:
import speechd
HAS_SPEECHD = True
except ImportError:
HAS_SPEECHD = False
class SpeechEngine:
"""Text-to-speech engine for UI accessibility using speech-dispatcher"""
def __init__(self):
"""Initialize speech engine"""
self.client = None
self.speechLock = threading.Lock()
self.isAvailable = False
# Reading state tracking with callback support
self.isReading = False
self.isPausedReading = False
self.readingCallback = None # Callback for when reading finishes
# Track UI speech thread to prevent accumulation
self.uiSpeechThread = None
if HAS_SPEECHD:
try:
self.client = speechd.SSIPClient('bookstorm')
self.isAvailable = True
except Exception as e:
print(f"Warning: Could not initialize speech-dispatcher: {e}")
self.isAvailable = False
else:
print("Warning: python3-speechd not installed. UI will not be accessible.")
def speak(self, text, interrupt=True):
"""
Speak text using speech-dispatcher
Args:
text: Text to speak
interrupt: If True, stop current speech first (default: True)
"""
if not self.isAvailable or not text:
return
# Safety: Wait for previous UI speech thread to finish if still running
# This prevents thread accumulation on rapid UI feedback calls
if self.uiSpeechThread and self.uiSpeechThread.is_alive():
# Don't wait forever - if thread is stuck, let it die (daemon thread)
self.uiSpeechThread.join(timeout=0.1)
def speak_thread():
with self.speechLock:
try:
if interrupt:
self.client.stop()
self.client.speak(str(text))
except Exception as e:
print(f"Speech error: {e}")
self.uiSpeechThread = threading.Thread(target=speak_thread)
self.uiSpeechThread.daemon = True
self.uiSpeechThread.start()
def stop(self):
"""Stop current speech"""
if self.isAvailable:
try:
self.client.stop()
except Exception:
pass
def speak_reading(self, text, callback=None):
"""
Speak text for book reading with callback support
Args:
text: Text to speak
callback: Optional callback function to call when speech finishes
Callback receives one argument: finish_type (COMPLETED or INTERRUPTED)
"""
if not self.isAvailable:
print("ERROR: Speech-dispatcher not available")
return
if not text:
print("ERROR: No text to speak")
return
try:
# Only cancel if we're already reading
if self.isReading:
try:
self.client.cancel()
except Exception:
pass
# Normalize text - replace newlines with spaces
# (speech-dispatcher may stop at newlines)
textStr = str(text).replace('\n', ' ').replace('\r', ' ')
# Clean up multiple spaces
import re
textStr = re.sub(r'\s+', ' ', textStr).strip()
print(f"Speech-dispatcher: Speaking {len(textStr)} characters")
self.isReading = True
self.isPausedReading = False
self.readingCallback = callback
# Define callback function for speech-dispatcher
def speech_callback(callbackType, indexMark=None):
"""Callback from speech-dispatcher when speech events occur"""
if callbackType == speechd.CallbackType.END:
# Speech completed normally
self.isReading = False
self.isPausedReading = False
if self.readingCallback:
self.readingCallback('COMPLETED')
elif callbackType == speechd.CallbackType.CANCEL:
# Speech was interrupted/cancelled
self.isReading = False
self.isPausedReading = False
if self.readingCallback:
self.readingCallback('INTERRUPTED')
# Speak with callback (event_types is speechd API parameter)
self.client.speak(
textStr,
callback=speech_callback,
event_types=[speechd.CallbackType.END, speechd.CallbackType.CANCEL]
)
except Exception as e:
print(f"Speech error: {e}")
import traceback
traceback.print_exc()
self.isReading = False
def pause_reading(self):
"""Pause current reading"""
if not self.isAvailable:
return
if self.isReading and not self.isPausedReading:
try:
self.client.pause()
self.isPausedReading = True
print("Speech-dispatcher: Paused")
except Exception as e:
print(f"Pause error: {e}")
# Reset state on error
self.isReading = False
self.isPausedReading = False
def resume_reading(self):
"""Resume paused reading"""
if not self.isAvailable:
return
if self.isPausedReading:
try:
self.client.resume()
self.isPausedReading = False
print("Speech-dispatcher: Resumed")
except Exception as e:
print(f"Resume error: {e}")
# Reset state on error
self.isReading = False
self.isPausedReading = False
def cancel_reading(self):
"""Cancel current reading"""
if not self.isAvailable:
return
try:
self.client.cancel()
# Note: Canceling will trigger the CANCEL callback
except Exception as e:
print(f"Cancel error: {e}")
finally:
# Reset state
self.isReading = False
self.isPausedReading = False
self.readingCallback = None
def is_reading_active(self):
"""Check if currently reading (not paused)"""
return self.isReading and not self.isPausedReading
def is_reading_paused(self):
"""Check if reading is paused"""
return self.isPausedReading
def set_rate(self, rate):
"""
Set speech rate
Args:
rate: Speech rate (-100 to 100, 0 is normal)
"""
if self.isAvailable:
try:
rate = max(-100, min(100, rate))
self.client.set_rate(rate)
except Exception as e:
print(f"Error setting speech rate: {e}")
def set_voice(self, voiceName):
"""
Set speech voice
Args:
voiceName: Voice name (e.g., 'Lyubov')
"""
if self.isAvailable:
try:
self.client.set_synthesis_voice(voiceName)
except Exception as e:
print(f"Error setting voice: {e}")
def list_voices(self):
"""
List available voices
Returns:
List of voice tuples (name, language, variant)
"""
if self.isAvailable:
try:
voices = self.client.list_synthesis_voices()
# Return list of tuples: (name, language, variant)
return list(voices) if voices else []
except Exception as e:
print(f"Error listing voices: {e}")
return []
return []
def list_output_modules(self):
"""
List available output modules (speech synthesizers)
Returns:
List of output module names (e.g., 'espeak-ng', 'festival', 'flite')
"""
if self.isAvailable:
try:
modules = self.client.list_output_modules()
return list(modules) if modules else []
except Exception as e:
print(f"Error listing output modules: {e}")
return []
return []
def set_output_module(self, moduleName):
"""
Set the output module (speech synthesizer)
Args:
moduleName: Name of the output module (e.g., 'espeak-ng')
"""
if self.isAvailable:
try:
self.client.set_output_module(moduleName)
except Exception as e:
print(f"Error setting output module: {e}")
def is_available(self):
"""Check if speech-dispatcher is available"""
return self.isAvailable
def cleanup(self):
"""Close speech-dispatcher connection"""
if self.isAvailable and self.client:
try:
self.client.close()
except Exception:
pass

166
src/tts_engine.py Normal file
View File

@@ -0,0 +1,166 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TTS Engine Wrapper
Wrapper for piper-tts to generate speech from text.
Handles streaming audio generation for real-time playback.
"""
import subprocess
import wave
import io
import struct
class TtsEngine:
"""Text-to-speech engine using piper-tts"""
def __init__(self, modelPath="/usr/share/piper-voices/en/en_US/hfc_male/medium/en_US-hfc_male-medium.onnx"):
"""
Initialize TTS engine
Args:
modelPath: Path to piper-tts voice model
"""
self.modelPath = modelPath
self.sampleRate = 22050
self.sampleWidth = 2 # 16-bit
self.channels = 1
def text_to_wav_data(self, text):
"""
Convert text to WAV audio data
Args:
text: Text to convert to speech
Returns:
Bytes containing WAV audio data
Raises:
RuntimeError: If piper-tts fails
"""
if not text.strip():
return None
# Safety: Limit text size to prevent excessive memory usage
# ~10,000 chars = ~10-15 minutes of audio at normal reading speed
MAX_TEXT_LENGTH = 10000
if len(text) > MAX_TEXT_LENGTH:
print(f"Warning: Paragraph too long ({len(text)} chars), truncating to {MAX_TEXT_LENGTH}")
text = text[:MAX_TEXT_LENGTH] + "..."
process = None
try:
# Run piper-tts with raw output
process = subprocess.Popen(
[
'piper-tts',
'--model', self.modelPath,
'--output-raw'
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Send text and get raw audio with timeout (60 seconds max)
# This prevents hanging if piper-tts gets stuck
rawAudio = None
stderr = None
try:
rawAudio, stderr = process.communicate(
input=text.encode('utf-8'),
timeout=60
)
except subprocess.TimeoutExpired:
process.kill()
process.communicate() # Clean up
raise RuntimeError("piper-tts timed out (>60s)")
if process.returncode != 0:
errorMsg = stderr.decode('utf-8', errors='ignore')
del stderr # Free stderr buffer immediately
stderr = None
raise RuntimeError(f"piper-tts failed: {errorMsg}")
# Free stderr buffer immediately (can be large if piper-tts is verbose)
if stderr:
del stderr
stderr = None
# Convert raw PCM to WAV format
wavData = self._raw_to_wav(rawAudio)
# CRITICAL: Delete rawAudio immediately after conversion
# This is a huge uncompressed buffer (~1-2MB per paragraph)
if rawAudio:
del rawAudio
rawAudio = None
# Ensure process is fully terminated and cleaned up
if process:
try:
process.wait(timeout=0.1)
except subprocess.TimeoutExpired:
pass
return wavData
except FileNotFoundError:
raise RuntimeError("piper-tts not found. Please install piper-tts.")
except Exception as e:
# Clean up buffers on error
if rawAudio:
del rawAudio
if stderr:
del stderr
# Ensure subprocess is terminated if something goes wrong
if process and process.poll() is None:
process.kill()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
pass # Process is truly stuck, nothing we can do
raise RuntimeError(f"TTS generation failed: {str(e)}")
def _raw_to_wav(self, rawData):
"""
Convert raw PCM data to WAV format
Args:
rawData: Raw PCM audio bytes
Returns:
WAV formatted bytes
"""
wavBuffer = io.BytesIO()
try:
with wave.open(wavBuffer, 'wb') as wavFile:
wavFile.setnchannels(self.channels)
wavFile.setsampwidth(self.sampleWidth)
wavFile.setframerate(self.sampleRate)
wavFile.writeframes(rawData)
wavBuffer.seek(0)
result = wavBuffer.read()
finally:
# Explicitly close BytesIO to free memory
wavBuffer.close()
return result
def get_audio_params(self):
"""
Get audio parameters for playback
Returns:
Dictionary with sampleRate, sampleWidth, channels
"""
return {
'sampleRate': self.sampleRate,
'sampleWidth': self.sampleWidth,
'channels': self.channels
}

169
src/txt_parser.py Normal file
View File

@@ -0,0 +1,169 @@
"""TXT parser for BookStorm - extracts text from plain text files."""
import re
from pathlib import Path
from src.book import Book, Chapter
def detect_chapter_breaks(text):
"""Detect chapter breaks in text using various heuristics.
Returns:
list of tuples: [(chapterTitle, chapterText), ...]
"""
chapters = []
# Pattern 1: Common chapter headings (case insensitive)
# Matches: "Chapter 1", "Chapter One", "CHAPTER 1:", etc.
chapterPattern = re.compile(
r'^(Chapter\s+[IVXLCDM\d]+[:\-\s]*.*)$',
re.MULTILINE | re.IGNORECASE
)
# Pattern 2: Markdown-style headings
# Matches: "# Chapter Title", "## Section Title"
markdownPattern = re.compile(
r'^(#{1,3}\s+.+)$',
re.MULTILINE
)
# Pattern 3: Page breaks (form feed character)
pageBreakPattern = re.compile(r'\f+')
# Pattern 4: Multiple blank lines (3+)
multiBlankPattern = re.compile(r'\n\s*\n\s*\n\s*\n+')
# Try chapter headings first
chapterMatches = list(chapterPattern.finditer(text))
if len(chapterMatches) >= 2:
# Found multiple chapter markers
for i, match in enumerate(chapterMatches):
title = match.group(1).strip()
start = match.start()
end = chapterMatches[i + 1].start() if i + 1 < len(chapterMatches) else len(text)
chapterText = text[start:end].strip()
# Remove title from chapter text
chapterText = chapterText[len(match.group(0)):].strip()
chapters.append((title, chapterText))
return chapters
# Try markdown headings
markdownMatches = list(markdownPattern.finditer(text))
if len(markdownMatches) >= 2:
for i, match in enumerate(markdownMatches):
title = match.group(1).strip()
# Remove leading # symbols for title
title = re.sub(r'^#+\s*', '', title)
start = match.start()
end = markdownMatches[i + 1].start() if i + 1 < len(markdownMatches) else len(text)
chapterText = text[start:end].strip()
chapterText = chapterText[len(match.group(0)):].strip()
chapters.append((title, chapterText))
return chapters
# Try page breaks
pageBreakParts = pageBreakPattern.split(text)
if len(pageBreakParts) >= 2:
for i, part in enumerate(pageBreakParts):
if part.strip():
title = f"Section {i + 1}"
chapters.append((title, part.strip()))
if chapters:
return chapters
# Try multiple blank lines as separators
multiBlankParts = multiBlankPattern.split(text)
if len(multiBlankParts) >= 3: # At least 3 sections
for i, part in enumerate(multiBlankParts):
if part.strip():
# Try to extract a title from first line
lines = part.strip().split('\n', 1)
if len(lines) > 1 and len(lines[0]) < 100:
# First line might be a title
title = lines[0].strip()
content = lines[1].strip() if len(lines) > 1 else ""
else:
title = f"Section {i + 1}"
content = part.strip()
chapters.append((title, content))
if chapters:
return chapters
# No clear chapter breaks found, treat as single chapter
return [("Full Text", text.strip())]
def split_into_paragraphs(text):
"""Split text into paragraphs using double newlines.
Falls back to single newlines if no double newlines found.
"""
# Try splitting on double newlines first
paragraphs = re.split(r'\n\s*\n', text)
# Clean up whitespace
paragraphs = [p.strip() for p in paragraphs if p.strip()]
# If we got very few paragraphs, fall back to single newlines
if len(paragraphs) < 3:
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
return paragraphs
class TxtParser:
"""Parser for plain text files"""
def __init__(self):
pass
def parse(self, txtPath):
"""Parse a TXT file and return Book object.
Args:
txtPath: Path to the TXT file
Returns:
Book object with detected chapters or single chapter
"""
txtPath = Path(txtPath)
if not txtPath.exists():
raise FileNotFoundError(f"TXT file not found: {txtPath}")
# Read file with encoding detection
try:
with open(txtPath, 'r', encoding='utf-8') as f:
text = f.read()
except UnicodeDecodeError:
# Try with latin-1 as fallback
with open(txtPath, 'r', encoding='latin-1') as f:
text = f.read()
# Use filename as book title
bookTitle = txtPath.stem
book = Book(title=bookTitle)
# Detect chapters
detectedChapters = detect_chapter_breaks(text)
# Process each chapter
for chapterTitle, chapterText in detectedChapters:
chapter = Chapter(title=chapterTitle)
# Split into paragraphs
paragraphs = split_into_paragraphs(chapterText)
if not paragraphs:
chapter.add_paragraph("(Empty chapter)")
else:
for paragraph in paragraphs:
chapter.add_paragraph(paragraph)
book.add_chapter(chapter)
return book
def cleanup(self):
"""Cleanup any resources (no-op for TXT parser)"""
pass

198
src/voice_selector.py Normal file
View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Voice Selector
Interactive voice selection interface for piper-tts voices.
Allows browsing, testing, and selecting voice models.
"""
from pathlib import Path
from src.tts_engine import TtsEngine
from src.pygame_player import PygamePlayer
class VoiceSelector:
"""Voice selection interface"""
def __init__(self, voiceDir=None):
"""
Initialize voice selector
Args:
voiceDir: Directory containing voice models
"""
if voiceDir is None:
voiceDir = "/usr/share/piper-voices/en/en_US"
self.voiceDir = Path(voiceDir)
self.voices = []
self._scan_voices()
def _scan_voices(self):
"""Scan for available voice models"""
self.voices = []
if not self.voiceDir.exists():
return
# Find all .onnx files recursively
for onnxFile in self.voiceDir.rglob("*.onnx"):
voiceName = self._extract_voice_name(onnxFile)
self.voices.append({
'name': voiceName,
'path': str(onnxFile),
'relPath': str(onnxFile.relative_to(self.voiceDir))
})
# Sort by name
self.voices.sort(key=lambda v: v['name'])
def _extract_voice_name(self, voicePath):
"""
Extract readable voice name from path
Args:
voicePath: Path to voice model file
Returns:
Human-readable voice name
"""
voicePath = Path(voicePath)
# Get parts of the path
parts = voicePath.parts
# Try to extract from filename pattern: en_US-voicename-quality.onnx
filename = voicePath.stem # Remove .onnx
nameParts = filename.split('-')
if len(nameParts) >= 2:
# nameParts[1] is usually the voice name
voiceName = nameParts[1].replace('_', ' ').title()
quality = nameParts[2] if len(nameParts) > 2 else ''
if quality:
return f"{voiceName} ({quality})"
return voiceName
# Fallback to filename
return voicePath.stem
def get_voices(self):
"""
Get list of available voices
Returns:
List of voice dictionaries
"""
return self.voices
def select_voice_interactive(self):
"""
Interactive voice selection
Returns:
Selected voice path or None if cancelled
"""
if not self.voices:
print("No voices found in", self.voiceDir)
return None
print("\nAvailable Voices:")
print("-" * 60)
for idx, voice in enumerate(self.voices):
print(f"{idx + 1}. {voice['name']}")
print("-" * 60)
print("\nCommands:")
print(" <number> - Select voice")
print(" t <number> - Test voice")
print(" q - Cancel")
print()
while True:
try:
choice = input("Select voice> ").strip().lower()
if choice == 'q':
return None
# Test voice
if choice.startswith('t '):
try:
voiceNum = int(choice[2:])
if 1 <= voiceNum <= len(self.voices):
self._test_voice(self.voices[voiceNum - 1])
else:
print(f"Invalid voice number. Choose 1-{len(self.voices)}")
except ValueError:
print("Invalid input. Use: t <number>")
continue
# Select voice
try:
voiceNum = int(choice)
if 1 <= voiceNum <= len(self.voices):
selectedVoice = self.voices[voiceNum - 1]
print(f"Selected: {selectedVoice['name']}")
return selectedVoice['path']
else:
print(f"Invalid voice number. Choose 1-{len(self.voices)}")
except ValueError:
print("Invalid input. Enter a number, 't <number>' to test, or 'q' to cancel")
except (EOFError, KeyboardInterrupt):
print("\nCancelled")
return None
def _test_voice(self, voice):
"""
Test a voice by playing sample text
Args:
voice: Voice dictionary
"""
print(f"\nTesting voice: {voice['name']}")
voiceName = voice['name'].split('(')[0].strip() # Remove quality suffix
testText = f"Hi, my name is {voiceName}, and I am a piper text to speech voice. Do you like the way I sound?"
try:
tts = TtsEngine(voice['path'])
player = PygamePlayer()
print("Generating speech...")
wavData = tts.text_to_wav_data(testText)
if wavData:
print("Playing...")
player.play_wav_data(wavData)
# Wait for playback to finish
import time
while player.is_playing():
time.sleep(0.1)
player.cleanup()
except Exception as e:
print(f"Error testing voice: {e}")
def find_voice_by_name(self, name):
"""
Find voice by name (case-insensitive partial match)
Args:
name: Voice name to search for
Returns:
Voice path or None if not found
"""
name = name.lower()
for voice in self.voices:
if name in voice['name'].lower():
return voice['path']
return None