Add comprehensive soundpack manager with security-first design

- **Soundpack Manager**: Full-featured package discovery, download, and installation system
  - Repository management with HTTPS enforcement and validation
  - Directory listing support (auto-discovers .zip files) + soundpacknames.txt fallback
  - Secure download with size limits, timeout protection, and path sanitization
  - Zip bomb protection: file count, individual size, and total extraction limits
  - Audio file validation using magic numbers (not just extensions)
  - Accessible UI with keyboard navigation and screen reader optimization

- **Auto-refresh system**: Smart timeline updates respecting user activity
  - 300-second default interval + 10-second idle buffer
  - Keyboard activity tracking prevents interrupting active users
  - True new content detection using post IDs instead of counts
  - Preserves cursor position during background refreshes

- **Enhanced notifications**: Fixed spam issues and improved targeting
  - Timeline switching now silent (no notifications for actively viewed content)
  - Initial app load notifications disabled for 2 seconds
  - Generic "New content in timeline" messages instead of misleading post counts
  - Separate handling for auto-refresh vs manual refresh scenarios

- **Load more posts improvements**: Better positioning and user experience
  - New posts load below cursor position instead of above
  - Cursor automatically focuses on first new post for natural reading flow
  - Fixed widget hierarchy issues preventing activation

- **Accessibility enhancements**: Workarounds for screen reader quirks
  - Added list headers to fix Orca single-item list reading issues
  - Improved accessible names and descriptions throughout
  - Non-selectable header items with dynamic counts
  - Better error messages and status updates

- **Settings integration**: Corrected soundpack configuration management
  - Fixed inconsistent config keys (now uses [audio] sound_pack consistently)
  - Added soundpack manager to File menu (Ctrl+Shift+P)
  - Repository persistence and validation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Storm Dragon
2025-07-20 15:18:50 -04:00
parent 8661fa67ce
commit f2c4ad1bd6
7 changed files with 1485 additions and 25 deletions

View File

@ -46,8 +46,10 @@ class AccessibleTreeWidget(QTreeWidget):
if key == Qt.Key_Return or key == Qt.Key_Enter: if key == Qt.Key_Return or key == Qt.Key_Enter:
special_data = current.data(0, Qt.UserRole) special_data = current.data(0, Qt.UserRole)
if special_data == "load_more": if special_data == "load_more":
# Emit signal for load more action # Call load_more_posts method - try self first, then parent
if hasattr(self.parent(), 'load_more_posts'): if hasattr(self, 'load_more_posts'):
self.load_more_posts()
elif hasattr(self.parent(), 'load_more_posts'):
self.parent().load_more_posts() self.parent().load_more_posts()
return return

View File

@ -0,0 +1,544 @@
"""
Soundpack Manager - Secure soundpack discovery, download, and installation
"""
import os
import re
import json
import tempfile
import zipfile
import shutil
import requests
import hashlib
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from urllib.parse import urlparse, urljoin
from dataclasses import dataclass
from config.settings import SettingsManager
@dataclass
class SoundpackRepository:
"""Represents a soundpack repository"""
url: str
description: str
enabled: bool = True
@dataclass
class SoundpackInfo:
"""Information about a soundpack"""
name: str
description: str
repository_url: str
download_url: str
installed: bool = False
version: Optional[str] = None
class SoundpackManager:
"""Manages soundpack repositories, discovery, and installation"""
# Security limits
MAX_DOWNLOAD_SIZE = 50 * 1024 * 1024 # 50MB max download
MAX_EXTRACTED_SIZE = 100 * 1024 * 1024 # 100MB max extracted
MAX_FILES_IN_ZIP = 50 # Max files in a soundpack
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB max per file
NETWORK_TIMEOUT = 30 # 30 second timeout
# Allowed audio file signatures (magic numbers)
AUDIO_SIGNATURES = {
b'OggS': 'ogg', # Ogg Vorbis
b'RIFF': 'wav', # WAV (check for WAVE later)
}
def __init__(self, settings: SettingsManager):
self.settings = settings
self.soundpacks_dir = settings.get_sounds_dir()
self.repositories = self._load_repositories()
self.temp_dir = None
def _load_repositories(self) -> List[SoundpackRepository]:
"""Load repository list from settings"""
repos_json = self.settings.get('soundpacks', 'repositories', '[]')
try:
repos_data = json.loads(repos_json)
repositories = []
for repo_data in repos_data:
repositories.append(SoundpackRepository(
url=repo_data['url'],
description=repo_data['description'],
enabled=repo_data.get('enabled', True)
))
# Add default repository if none exist
if not repositories:
repositories.append(SoundpackRepository(
url="https://stormux.org/bifrost/soundpacks",
description="The default Bifrost sound pack repository"
))
self._save_repositories(repositories)
return repositories
except (json.JSONDecodeError, KeyError):
# Return default repository on error
default_repo = SoundpackRepository(
url="https://stormux.org/bifrost/soundpacks",
description="The default Bifrost sound pack repository"
)
self._save_repositories([default_repo])
return [default_repo]
def _save_repositories(self, repositories: List[SoundpackRepository]):
"""Save repository list to settings"""
repos_data = []
for repo in repositories:
repos_data.append({
'url': repo.url,
'description': repo.description,
'enabled': repo.enabled
})
self.settings.set('soundpacks', 'repositories', json.dumps(repos_data))
self.settings.save_settings()
self.repositories = repositories
def validate_url(self, url: str) -> Tuple[bool, str]:
"""Validate repository URL format and security"""
try:
parsed = urlparse(url.strip())
# Must be HTTPS for security
if parsed.scheme != 'https':
return False, "Repository URLs must use HTTPS for security"
# Must have hostname
if not parsed.netloc:
return False, "Invalid URL format - missing hostname"
# Basic format validation
if not re.match(r'^https://[a-zA-Z0-9.-]+[a-zA-Z0-9]/.*$', url):
return False, "Invalid URL format"
return True, "Valid URL format"
except Exception:
return False, "Invalid URL format"
def validate_repository(self, url: str) -> Tuple[bool, str]:
"""Validate that repository contains required files"""
try:
session = requests.Session()
session.headers.update({'User-Agent': 'Bifrost-Soundpack-Manager/1.0'})
# Try directory listing first (simpler approach)
response = session.get(url.rstrip('/') + '/', timeout=self.NETWORK_TIMEOUT)
if response.status_code == 200:
# Check if we can see .zip files in the directory listing
if '.zip' in response.text:
return True, "Valid repository (directory listing)"
# Fallback: check for soundpacknames.txt
list_url = urljoin(url.rstrip('/') + '/', 'soundpacknames.txt')
response = session.head(list_url, timeout=self.NETWORK_TIMEOUT)
if response.status_code == 200:
return True, "Valid repository (soundpacknames.txt)"
return False, "Repository does not contain soundpacks or soundpacknames.txt"
except requests.RequestException as e:
return False, f"Cannot access repository: {str(e)}"
except Exception as e:
return False, f"Repository validation failed: {str(e)}"
def add_repository(self, url: str, description: str) -> Tuple[bool, str]:
"""Add a new repository after validation"""
# Validate URL format
valid_format, format_msg = self.validate_url(url)
if not valid_format:
return False, format_msg
# Check if already exists
for repo in self.repositories:
if repo.url == url:
return False, "Repository already exists"
# Validate repository content
valid_repo, repo_msg = self.validate_repository(url)
if not valid_repo:
return False, repo_msg
# Add repository
new_repo = SoundpackRepository(url=url, description=description)
self.repositories.append(new_repo)
self._save_repositories(self.repositories)
return True, "Repository added successfully"
def remove_repository(self, url: str) -> bool:
"""Remove a repository"""
self.repositories = [repo for repo in self.repositories if repo.url != url]
self._save_repositories(self.repositories)
return True
def get_repositories(self) -> List[SoundpackRepository]:
"""Get all repositories"""
return self.repositories.copy()
def discover_soundpacks(self) -> List[SoundpackInfo]:
"""Discover all available soundpacks from enabled repositories"""
all_soundpacks = []
installed_packs = self._get_installed_soundpacks()
for repo in self.repositories:
if not repo.enabled:
continue
try:
soundpacks = self._discover_from_repository(repo.url)
for pack in soundpacks:
pack.installed = pack.name in installed_packs
all_soundpacks.append(pack)
except Exception as e:
print(f"Failed to discover soundpacks from {repo.url}: {e}")
continue
return all_soundpacks
def _discover_from_repository(self, repo_url: str) -> List[SoundpackInfo]:
"""Discover soundpacks from a single repository"""
soundpacks = []
try:
session = requests.Session()
session.headers.update({'User-Agent': 'Bifrost-Soundpack-Manager/1.0'})
# Try directory listing first
response = session.get(repo_url.rstrip('/') + '/', timeout=self.NETWORK_TIMEOUT)
if response.status_code == 200 and '.zip' in response.text:
soundpacks = self._parse_directory_listing(repo_url, response.text)
if soundpacks:
return soundpacks
# Fallback to soundpacknames.txt
list_url = urljoin(repo_url.rstrip('/') + '/', 'soundpacknames.txt')
response = session.get(list_url, timeout=self.NETWORK_TIMEOUT)
response.raise_for_status()
# Parse soundpack names from text file
lines = response.text.strip().split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
# Try to get description
description = self._get_soundpack_description(repo_url, line)
soundpack = SoundpackInfo(
name=line,
description=description,
repository_url=repo_url,
download_url=urljoin(repo_url.rstrip('/') + '/', f'{line}.zip')
)
soundpacks.append(soundpack)
except Exception as e:
raise Exception(f"Failed to discover soundpacks: {e}")
return soundpacks
def _parse_directory_listing(self, repo_url: str, html_content: str) -> List[SoundpackInfo]:
"""Parse directory listing HTML to find soundpacks"""
import re
soundpacks = []
# Find all .zip files in the directory listing
# This regex looks for href="filename.zip" patterns
zip_pattern = r'href=["\']([^"\']*\.zip)["\']'
zip_matches = re.findall(zip_pattern, html_content, re.IGNORECASE)
for zip_filename in zip_matches:
# Extract soundpack name (remove .zip extension)
soundpack_name = zip_filename.replace('.zip', '')
# Skip any file that looks like metadata or has suspicious characters
if (soundpack_name.lower() in ['soundpacknames', 'index', 'readme'] or
not re.match(r'^[a-zA-Z0-9._-]+$', soundpack_name)):
continue
# Try to get description
description = self._get_soundpack_description(repo_url, soundpack_name)
soundpack = SoundpackInfo(
name=soundpack_name,
description=description,
repository_url=repo_url,
download_url=urljoin(repo_url.rstrip('/') + '/', zip_filename)
)
soundpacks.append(soundpack)
return soundpacks
def _get_soundpack_description(self, repo_url: str, soundpack_name: str) -> str:
"""Get description for a soundpack"""
try:
desc_url = urljoin(repo_url.rstrip('/') + '/', f'{soundpack_name}.txt')
session = requests.Session()
session.headers.update({'User-Agent': 'Bifrost-Soundpack-Manager/1.0'})
response = session.get(desc_url, timeout=self.NETWORK_TIMEOUT)
if response.status_code == 200:
return response.text.strip()
except:
pass
return "No description available"
def _get_installed_soundpacks(self) -> List[str]:
"""Get list of installed soundpack names"""
installed = []
if self.soundpacks_dir.exists():
for item in self.soundpacks_dir.iterdir():
if item.is_dir() and (item / 'pack.json').exists():
installed.append(item.name)
return installed
def _validate_audio_file(self, file_path: Path) -> bool:
"""Validate that a file is actually an audio file"""
try:
# Check file size
if file_path.stat().st_size > self.MAX_FILE_SIZE:
return False
# Check file signature
with open(file_path, 'rb') as f:
header = f.read(8)
# Check for known audio signatures
for signature, file_type in self.AUDIO_SIGNATURES.items():
if header.startswith(signature):
if file_type == 'wav':
# For WAV, also check for WAVE marker
if len(header) >= 8 and header[8:12] == b'WAVE':
return True
else:
return True
return False
except Exception:
return False
def _sanitize_path(self, path: str) -> Optional[str]:
"""Sanitize and validate file path to prevent directory traversal"""
# Remove any path traversal attempts
path = os.path.normpath(path)
# Check for suspicious patterns
if '..' in path or path.startswith('/') or ':' in path:
return None
# Only allow alphanumeric, dash, underscore, and dot
if not re.match(r'^[a-zA-Z0-9._-]+$', path):
return None
return path
def install_soundpack(self, soundpack: SoundpackInfo) -> Tuple[bool, str]:
"""Securely download and install a soundpack"""
if soundpack.installed:
return False, "Soundpack is already installed"
self.temp_dir = tempfile.mkdtemp(prefix='bifrost_soundpack_')
try:
# Download soundpack
success, message = self._download_soundpack(soundpack)
if not success:
return False, message
# Extract and validate
success, message = self._extract_and_validate(soundpack)
if not success:
return False, message
# Install to final location
success, message = self._install_to_final_location(soundpack)
if not success:
return False, message
return True, "Soundpack installed successfully"
except Exception as e:
return False, f"Installation failed: {str(e)}"
finally:
# Clean up temporary directory
if self.temp_dir and Path(self.temp_dir).exists():
shutil.rmtree(self.temp_dir, ignore_errors=True)
self.temp_dir = None
def _download_soundpack(self, soundpack: SoundpackInfo) -> Tuple[bool, str]:
"""Download soundpack with security checks"""
try:
session = requests.Session()
session.headers.update({'User-Agent': 'Bifrost-Soundpack-Manager/1.0'})
response = session.get(
soundpack.download_url,
timeout=self.NETWORK_TIMEOUT,
stream=True
)
response.raise_for_status()
# Check content length
content_length = response.headers.get('content-length')
if content_length and int(content_length) > self.MAX_DOWNLOAD_SIZE:
return False, f"Soundpack too large (max {self.MAX_DOWNLOAD_SIZE // 1024 // 1024}MB)"
# Download to temporary file
zip_path = Path(self.temp_dir) / f"{soundpack.name}.zip"
downloaded = 0
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
downloaded += len(chunk)
if downloaded > self.MAX_DOWNLOAD_SIZE:
return False, f"Download exceeded size limit"
f.write(chunk)
return True, "Download successful"
except requests.RequestException as e:
return False, f"Download failed: {str(e)}"
def _extract_and_validate(self, soundpack: SoundpackInfo) -> Tuple[bool, str]:
"""Extract zip and validate contents"""
zip_path = Path(self.temp_dir) / f"{soundpack.name}.zip"
extract_dir = Path(self.temp_dir) / "extracted"
extract_dir.mkdir()
try:
with zipfile.ZipFile(zip_path, 'r') as zip_file:
# Check number of files
if len(zip_file.filelist) > self.MAX_FILES_IN_ZIP:
return False, f"Too many files in soundpack (max {self.MAX_FILES_IN_ZIP})"
total_size = 0
audio_files = 0
for file_info in zip_file.filelist:
# Sanitize filename
safe_name = self._sanitize_path(file_info.filename)
if not safe_name:
return False, f"Invalid filename in archive: {file_info.filename}"
# Check individual file size
if file_info.file_size > self.MAX_FILE_SIZE:
return False, f"File too large: {file_info.filename}"
total_size += file_info.file_size
# Check total extracted size
if total_size > self.MAX_EXTRACTED_SIZE:
return False, "Soundpack too large when extracted"
# Extract file
extracted_path = extract_dir / safe_name
with zip_file.open(file_info) as source, open(extracted_path, 'wb') as target:
shutil.copyfileobj(source, target)
# Validate audio files
if safe_name.endswith(('.ogg', '.wav')):
if not self._validate_audio_file(extracted_path):
return False, f"Invalid audio file: {safe_name}"
audio_files += 1
elif safe_name == 'pack.json':
# Validate pack.json
try:
with open(extracted_path, 'r') as f:
pack_data = json.load(f)
if 'name' not in pack_data or 'sounds' not in pack_data:
return False, "Invalid pack.json format"
except json.JSONDecodeError:
return False, "Invalid pack.json file"
else:
return False, f"Unauthorized file type: {safe_name}"
if audio_files == 0:
return False, "No valid audio files found in soundpack"
return True, "Validation successful"
except zipfile.BadZipFile:
return False, "Invalid or corrupted zip file"
except Exception as e:
return False, f"Extraction failed: {str(e)}"
def _install_to_final_location(self, soundpack: SoundpackInfo) -> Tuple[bool, str]:
"""Move validated soundpack to final installation directory"""
try:
extract_dir = Path(self.temp_dir) / "extracted"
final_dir = self.soundpacks_dir / soundpack.name
# Create soundpacks directory if it doesn't exist
self.soundpacks_dir.mkdir(parents=True, exist_ok=True)
# Remove existing installation if present
if final_dir.exists():
shutil.rmtree(final_dir)
# Move extracted files to final location
shutil.move(str(extract_dir), str(final_dir))
return True, "Installation completed"
except Exception as e:
return False, f"Failed to install: {str(e)}"
def uninstall_soundpack(self, soundpack_name: str) -> Tuple[bool, str]:
"""Uninstall a soundpack"""
try:
soundpack_dir = self.soundpacks_dir / soundpack_name
if not soundpack_dir.exists():
return False, "Soundpack not installed"
# Don't allow uninstalling default soundpack
if soundpack_name == 'default':
return False, "Cannot uninstall default soundpack"
# Check if currently active
current_pack = self.settings.get('audio', 'sound_pack', 'default')
if current_pack == soundpack_name:
# Switch to default before uninstalling
self.settings.set('audio', 'sound_pack', 'default')
self.settings.save_settings()
# Remove directory
shutil.rmtree(soundpack_dir)
return True, "Soundpack uninstalled successfully"
except Exception as e:
return False, f"Uninstall failed: {str(e)}"
def switch_soundpack(self, soundpack_name: str) -> Tuple[bool, str]:
"""Switch to a different soundpack"""
try:
soundpack_dir = self.soundpacks_dir / soundpack_name
if not soundpack_dir.exists():
return False, "Soundpack not installed"
if not (soundpack_dir / 'pack.json').exists():
return False, "Invalid soundpack installation"
# Update settings
self.settings.set('audio', 'sound_pack', soundpack_name)
self.settings.save_settings()
return True, f"Switched to {soundpack_name} soundpack"
except Exception as e:
return False, f"Failed to switch soundpack: {str(e)}"

View File

@ -60,7 +60,7 @@ class SettingsManager:
self.config.set('general', 'instance_url', '') self.config.set('general', 'instance_url', '')
self.config.set('general', 'username', '') self.config.set('general', 'username', '')
self.config.set('general', 'current_sound_pack', 'default') self.config.set('general', 'current_sound_pack', 'default')
self.config.set('general', 'timeline_refresh_interval', '60') self.config.set('general', 'timeline_refresh_interval', '300')
self.config.set('general', 'auto_refresh_enabled', 'true') self.config.set('general', 'auto_refresh_enabled', 'true')
# Sound settings # Sound settings
@ -141,5 +141,5 @@ class SettingsManager:
def get_current_sound_pack_dir(self) -> Path: def get_current_sound_pack_dir(self) -> Path:
"""Get the current sound pack directory""" """Get the current sound pack directory"""
pack_name = self.get('general', 'current_sound_pack', 'default') pack_name = self.get('audio', 'sound_pack', 'default')
return self.get_sounds_dir() / pack_name return self.get_sounds_dir() / pack_name

View File

@ -6,8 +6,9 @@ from PySide6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QMenuBar, QStatusBar, QPushButton, QTabWidget QLabel, QMenuBar, QStatusBar, QPushButton, QTabWidget
) )
from PySide6.QtCore import Qt, Signal from PySide6.QtCore import Qt, Signal, QTimer
from PySide6.QtGui import QKeySequence, QAction, QTextCursor from PySide6.QtGui import QKeySequence, QAction, QTextCursor
import time
from config.settings import SettingsManager from config.settings import SettingsManager
from config.accounts import AccountManager from config.accounts import AccountManager
@ -16,6 +17,7 @@ from widgets.compose_dialog import ComposeDialog
from widgets.login_dialog import LoginDialog from widgets.login_dialog import LoginDialog
from widgets.account_selector import AccountSelector from widgets.account_selector import AccountSelector
from widgets.settings_dialog import SettingsDialog from widgets.settings_dialog import SettingsDialog
from widgets.soundpack_manager_dialog import SoundpackManagerDialog
from activitypub.client import ActivityPubClient from activitypub.client import ActivityPubClient
@ -26,9 +28,15 @@ class MainWindow(QMainWindow):
super().__init__() super().__init__()
self.settings = SettingsManager() self.settings = SettingsManager()
self.account_manager = AccountManager(self.settings) self.account_manager = AccountManager(self.settings)
# Auto-refresh tracking
self.last_activity_time = time.time()
self.is_initial_load = True # Flag to skip notifications on first load
self.setup_ui() self.setup_ui()
self.setup_menus() self.setup_menus()
self.setup_shortcuts() self.setup_shortcuts()
self.setup_auto_refresh()
# Check if we need to show login dialog # Check if we need to show login dialog
if not self.account_manager.has_accounts(): if not self.account_manager.has_accounts():
@ -38,6 +46,9 @@ class MainWindow(QMainWindow):
if hasattr(self.timeline, 'sound_manager'): if hasattr(self.timeline, 'sound_manager'):
self.timeline.sound_manager.play_startup() self.timeline.sound_manager.play_startup()
# Mark initial load as complete after startup
QTimer.singleShot(2000, self.mark_initial_load_complete)
def setup_ui(self): def setup_ui(self):
"""Initialize the user interface""" """Initialize the user interface"""
self.setWindowTitle("Bifrost - Fediverse Client") self.setWindowTitle("Bifrost - Fediverse Client")
@ -122,6 +133,12 @@ class MainWindow(QMainWindow):
settings_action.triggered.connect(self.show_settings) settings_action.triggered.connect(self.show_settings)
file_menu.addAction(settings_action) file_menu.addAction(settings_action)
# Soundpack Manager action
soundpack_action = QAction("Sound&pack Manager", self)
soundpack_action.setShortcut(QKeySequence("Ctrl+Shift+P"))
soundpack_action.triggered.connect(self.show_soundpack_manager)
file_menu.addAction(soundpack_action)
file_menu.addSeparator() file_menu.addSeparator()
# Quit action # Quit action
@ -206,6 +223,89 @@ class MainWindow(QMainWindow):
# Additional shortcuts that don't need menu items # Additional shortcuts that don't need menu items
pass pass
def setup_auto_refresh(self):
"""Set up auto-refresh timer"""
# Create auto-refresh timer
self.auto_refresh_timer = QTimer()
self.auto_refresh_timer.timeout.connect(self.check_auto_refresh)
# Check every 30 seconds if we should refresh
self.auto_refresh_timer.start(30000) # 30 seconds
def mark_initial_load_complete(self):
"""Mark that initial loading is complete"""
self.is_initial_load = False
# Enable notifications on the timeline
if hasattr(self.timeline, 'enable_notifications'):
self.timeline.enable_notifications()
def keyPressEvent(self, event):
"""Track keyboard activity for auto-refresh"""
self.last_activity_time = time.time()
super().keyPressEvent(event)
def check_auto_refresh(self):
"""Check if we should auto-refresh the timeline"""
# Skip if auto-refresh is disabled
if not self.settings.get_bool('general', 'auto_refresh_enabled', True):
return
# Skip if no account is active
if not self.account_manager.get_active_account():
return
# Get refresh interval from settings
refresh_interval = self.settings.get_int('general', 'timeline_refresh_interval', 300)
# Check if enough time has passed since last activity
time_since_activity = time.time() - self.last_activity_time
required_idle_time = refresh_interval + 10 # refresh_rate + 10 seconds
if time_since_activity >= required_idle_time:
self.auto_refresh_timeline()
def auto_refresh_timeline(self):
"""Automatically refresh the timeline"""
# Store the current scroll position and selected item
current_item = self.timeline.currentItem()
# Store the current newest post ID to detect new content
old_newest_post_id = self.timeline.newest_post_id
# Temporarily disable notifications to prevent double notifications
old_skip_notifications = self.timeline.skip_notifications
self.timeline.skip_notifications = True
# Refresh the timeline
self.timeline.refresh()
# Restore notification setting
self.timeline.skip_notifications = old_skip_notifications
# Check for new content by comparing newest post ID
if (self.timeline.newest_post_id and
old_newest_post_id and
self.timeline.newest_post_id != old_newest_post_id and
not self.is_initial_load):
timeline_name = {
'home': 'home timeline',
'local': 'local timeline',
'federated': 'federated timeline',
'notifications': 'notifications'
}.get(self.timeline.timeline_type, 'timeline')
# Show desktop notification for new content
if hasattr(self.timeline, 'notification_manager'):
self.timeline.notification_manager.notify_new_content(timeline_name)
# Try to restore focus to the previous item
if current_item:
self.timeline.setCurrentItem(current_item)
# Reset activity timer to prevent immediate re-refresh
self.last_activity_time = time.time()
def show_compose_dialog(self): def show_compose_dialog(self):
"""Show the compose post dialog""" """Show the compose post dialog"""
dialog = ComposeDialog(self.account_manager, self) dialog = ComposeDialog(self.account_manager, self)
@ -288,6 +388,11 @@ class MainWindow(QMainWindow):
self.timeline.sound_manager.reload_settings() self.timeline.sound_manager.reload_settings()
self.status_bar.showMessage("Settings saved successfully", 2000) self.status_bar.showMessage("Settings saved successfully", 2000)
def show_soundpack_manager(self):
"""Show the soundpack manager dialog"""
dialog = SoundpackManagerDialog(self.settings, self)
dialog.exec()
def refresh_timeline(self): def refresh_timeline(self):
"""Refresh the current timeline""" """Refresh the current timeline"""
self.timeline.refresh() self.timeline.refresh()

View File

@ -107,3 +107,14 @@ class NotificationManager:
message=message, message=message,
notification_type='timeline_updates' notification_type='timeline_updates'
) )
def notify_new_content(self, timeline_type: str = "timeline"):
"""Show notification for new content without counting posts"""
if not self.is_enabled('timeline_updates'):
return
self.show_notification(
title="Timeline updated",
message=f"New content in your {timeline_type}",
notification_type='timeline_updates'
)

View File

@ -0,0 +1,674 @@
"""
Soundpack Manager Dialog - UI for managing soundpack repositories and installations
"""
from PySide6.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QTabWidget, QWidget,
QListWidget, QListWidgetItem, QPushButton, QLabel,
QLineEdit, QTextEdit, QGroupBox, QMessageBox,
QProgressDialog, QDialogButtonBox, QFormLayout,
QCheckBox, QSplitter
)
from PySide6.QtCore import Qt, QThread, Signal, QTimer
from PySide6.QtGui import QFont
from typing import List, Optional
from config.settings import SettingsManager
from audio.soundpack_manager import SoundpackManager, SoundpackRepository, SoundpackInfo
class SoundpackOperationThread(QThread):
"""Thread for soundpack operations to prevent UI blocking"""
operation_complete = Signal(bool, str) # success, message
progress_update = Signal(str) # status message
def __init__(self, operation, *args):
super().__init__()
self.operation = operation
self.args = args
def run(self):
try:
if self.operation == 'discover':
self.discover_soundpacks()
elif self.operation == 'install':
self.install_soundpack()
elif self.operation == 'validate_repo':
self.validate_repository()
except Exception as e:
self.operation_complete.emit(False, f"Operation failed: {str(e)}")
def discover_soundpacks(self):
manager = self.args[0]
self.progress_update.emit("Discovering soundpacks...")
soundpacks = manager.discover_soundpacks()
self.operation_complete.emit(True, f"Found {len(soundpacks)} soundpacks")
def install_soundpack(self):
manager, soundpack = self.args
self.progress_update.emit(f"Installing {soundpack.name}...")
success, message = manager.install_soundpack(soundpack)
self.operation_complete.emit(success, message)
def validate_repository(self):
manager, url = self.args
self.progress_update.emit("Validating repository...")
success, message = manager.validate_repository(url)
self.operation_complete.emit(success, message)
class SoundpackManagerDialog(QDialog):
"""Dialog for managing soundpacks and repositories"""
def __init__(self, settings: SettingsManager, parent=None):
super().__init__(parent)
self.settings = settings
self.manager = SoundpackManager(settings)
self.soundpacks = []
self.current_thread = None
self.setWindowTitle("Soundpack Manager")
self.setMinimumSize(800, 600)
self.setModal(True)
self.setup_ui()
self.load_data()
def setup_ui(self):
"""Initialize the user interface"""
layout = QVBoxLayout(self)
# Create tab widget
self.tab_widget = QTabWidget()
layout.addWidget(self.tab_widget)
# Soundpacks tab
self.setup_soundpacks_tab()
# Repositories tab
self.setup_repositories_tab()
# Installed tab
self.setup_installed_tab()
# Button box
button_box = QDialogButtonBox(QDialogButtonBox.Close)
button_box.rejected.connect(self.close)
layout.addWidget(button_box)
def setup_soundpacks_tab(self):
"""Setup the available soundpacks tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Header
header_label = QLabel("Available Soundpacks")
header_label.setFont(QFont("", 12, QFont.Bold))
header_label.setAccessibleName("Available Soundpacks")
layout.addWidget(header_label)
# Refresh button
refresh_layout = QHBoxLayout()
self.refresh_button = QPushButton("&Refresh Soundpack List")
self.refresh_button.setAccessibleName("Refresh soundpack list from repositories")
self.refresh_button.clicked.connect(self.refresh_soundpacks)
refresh_layout.addWidget(self.refresh_button)
refresh_layout.addStretch()
layout.addLayout(refresh_layout)
# Splitter for list and details
splitter = QSplitter(Qt.Horizontal)
layout.addWidget(splitter)
# Soundpack list
self.soundpack_list = QListWidget()
self.soundpack_list.setAccessibleName("Available soundpacks")
self.soundpack_list.itemSelectionChanged.connect(self.on_soundpack_selected)
splitter.addWidget(self.soundpack_list)
# Details panel
details_widget = QWidget()
details_layout = QVBoxLayout(details_widget)
self.details_label = QLabel("Select a soundpack to view details")
self.details_label.setAccessibleName("Soundpack details")
self.details_label.setWordWrap(True)
details_layout.addWidget(self.details_label)
self.description_text = QTextEdit()
self.description_text.setAccessibleName("Soundpack description")
self.description_text.setReadOnly(True)
self.description_text.setMaximumHeight(150)
details_layout.addWidget(self.description_text)
# Action buttons
action_layout = QHBoxLayout()
self.install_button = QPushButton("&Install")
self.install_button.setAccessibleName("Install selected soundpack")
self.install_button.clicked.connect(self.install_selected)
self.install_button.setEnabled(False)
action_layout.addWidget(self.install_button)
self.switch_button = QPushButton("&Switch To")
self.switch_button.setAccessibleName("Switch to selected soundpack")
self.switch_button.clicked.connect(self.switch_to_selected)
self.switch_button.setEnabled(False)
action_layout.addWidget(self.switch_button)
action_layout.addStretch()
details_layout.addLayout(action_layout)
details_layout.addStretch()
splitter.addWidget(details_widget)
# Set splitter proportions
splitter.setSizes([400, 400])
self.tab_widget.addTab(tab, "&Available")
def setup_repositories_tab(self):
"""Setup the repositories management tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Header
header_label = QLabel("Soundpack Repositories")
header_label.setFont(QFont("", 12, QFont.Bold))
header_label.setAccessibleName("Soundpack repositories")
layout.addWidget(header_label)
# Repository list
self.repo_list = QListWidget()
self.repo_list.setAccessibleName("Repository list")
self.repo_list.itemSelectionChanged.connect(self.on_repository_selected)
layout.addWidget(self.repo_list)
# Repository management buttons
repo_button_layout = QHBoxLayout()
self.add_repo_button = QPushButton("&Add Repository")
self.add_repo_button.setAccessibleName("Add new repository")
self.add_repo_button.clicked.connect(self.add_repository)
repo_button_layout.addWidget(self.add_repo_button)
self.remove_repo_button = QPushButton("&Remove Repository")
self.remove_repo_button.setAccessibleName("Remove selected repository")
self.remove_repo_button.clicked.connect(self.remove_repository)
self.remove_repo_button.setEnabled(False)
repo_button_layout.addWidget(self.remove_repo_button)
repo_button_layout.addStretch()
layout.addLayout(repo_button_layout)
self.tab_widget.addTab(tab, "&Repositories")
def setup_installed_tab(self):
"""Setup the installed soundpacks tab"""
tab = QWidget()
layout = QVBoxLayout(tab)
# Header
header_label = QLabel("Installed Soundpacks")
header_label.setFont(QFont("", 12, QFont.Bold))
header_label.setAccessibleName("Installed soundpacks")
layout.addWidget(header_label)
# Current soundpack info
current_group = QGroupBox("Current Soundpack")
current_layout = QVBoxLayout(current_group)
self.current_pack_label = QLabel("Loading...")
self.current_pack_label.setAccessibleName("Current soundpack")
current_layout.addWidget(self.current_pack_label)
layout.addWidget(current_group)
# Installed soundpacks list
self.installed_list = QListWidget()
self.installed_list.setAccessibleName("Installed soundpacks")
self.installed_list.itemSelectionChanged.connect(self.on_installed_selected)
layout.addWidget(self.installed_list)
# Management buttons
installed_button_layout = QHBoxLayout()
self.switch_installed_button = QPushButton("&Switch To")
self.switch_installed_button.setAccessibleName("Switch to selected installed soundpack")
self.switch_installed_button.clicked.connect(self.switch_to_installed)
self.switch_installed_button.setEnabled(False)
installed_button_layout.addWidget(self.switch_installed_button)
self.uninstall_button = QPushButton("&Uninstall")
self.uninstall_button.setAccessibleName("Uninstall selected soundpack")
self.uninstall_button.clicked.connect(self.uninstall_selected)
self.uninstall_button.setEnabled(False)
installed_button_layout.addWidget(self.uninstall_button)
installed_button_layout.addStretch()
layout.addLayout(installed_button_layout)
self.tab_widget.addTab(tab, "&Installed")
def load_data(self):
"""Load initial data"""
self.load_repositories()
self.load_installed_soundpacks()
self.update_current_soundpack()
self.refresh_soundpacks()
def load_repositories(self):
"""Load repository list"""
self.repo_list.clear()
repositories = self.manager.get_repositories()
# Add header item to help Orca read single-item lists
header_item = QListWidgetItem(f"Repositories ({len(repositories)} configured):")
header_item.setFlags(header_item.flags() & ~Qt.ItemIsSelectable) # Make it non-selectable
header_item.setData(Qt.AccessibleDescriptionRole, "Repository list header")
self.repo_list.addItem(header_item)
for repo in repositories:
# Format for screen readers - use single line with clear separators
item_text = f"{repo.description} - {repo.url}"
if not repo.enabled:
item_text += " (Disabled)"
item = QListWidgetItem(item_text)
item.setData(Qt.UserRole, repo)
# Set accessible description with more detail
item.setData(Qt.AccessibleDescriptionRole, f"Repository: {repo.description}, URL: {repo.url}")
self.repo_list.addItem(item)
def load_installed_soundpacks(self):
"""Load installed soundpacks list"""
self.installed_list.clear()
installed = self.manager._get_installed_soundpacks()
current_pack = self.settings.get('audio', 'sound_pack', 'default')
for pack_name in installed:
item_text = pack_name
if pack_name == current_pack:
item_text += " (Current)"
item = QListWidgetItem(item_text)
item.setData(Qt.UserRole, pack_name)
self.installed_list.addItem(item)
def update_current_soundpack(self):
"""Update current soundpack display"""
# Check both possible keys - audio.sound_pack is the real one being used
current_pack = self.settings.get('audio', 'sound_pack', None)
if not current_pack:
current_pack = self.settings.get('audio', 'sound_pack', 'default')
print(f"Debug: Found soundpack setting = '{current_pack}'")
self.current_pack_label.setText(f"Current soundpack: {current_pack}")
def refresh_soundpacks(self):
"""Refresh available soundpacks from repositories"""
if self.current_thread and self.current_thread.isRunning():
return
self.refresh_button.setEnabled(False)
self.refresh_button.setText("Refreshing...")
# Start discovery in background thread
self.current_thread = SoundpackOperationThread('discover', self.manager)
self.current_thread.operation_complete.connect(self.on_discover_complete)
self.current_thread.start()
def on_discover_complete(self, success: bool, message: str):
"""Handle soundpack discovery completion"""
self.refresh_button.setEnabled(True)
self.refresh_button.setText("&Refresh Soundpack List")
if success:
self.soundpacks = self.manager.discover_soundpacks()
self.update_soundpack_list()
else:
QMessageBox.warning(self, "Discovery Failed", f"Failed to discover soundpacks: {message}")
def update_soundpack_list(self):
"""Update the soundpack list display"""
self.soundpack_list.clear()
# Add header item to help Orca read single-item lists
header_item = QListWidgetItem(f"Available Soundpacks ({len(self.soundpacks)} found):")
header_item.setFlags(header_item.flags() & ~Qt.ItemIsSelectable) # Make it non-selectable
header_item.setData(Qt.AccessibleDescriptionRole, "Available soundpacks list header")
self.soundpack_list.addItem(header_item)
for soundpack in self.soundpacks:
item_text = soundpack.name
if soundpack.installed:
item_text += " (Installed)"
item_text += f" - {soundpack.description}"
item = QListWidgetItem(item_text)
item.setData(Qt.UserRole, soundpack)
self.soundpack_list.addItem(item)
def on_soundpack_selected(self):
"""Handle soundpack selection"""
current = self.soundpack_list.currentItem()
if not current:
self.details_label.setText("Select a soundpack to view details")
self.description_text.clear()
self.install_button.setEnabled(False)
self.switch_button.setEnabled(False)
return
soundpack = current.data(Qt.UserRole)
# Skip header items (they don't have soundpack data)
if not soundpack:
self.details_label.setText("Select a soundpack to view details")
self.description_text.clear()
self.install_button.setEnabled(False)
self.switch_button.setEnabled(False)
return
# Update details
status = "Installed" if soundpack.installed else "Not installed"
self.details_label.setText(f"Name: {soundpack.name}\nStatus: {status}\nRepository: {soundpack.repository_url}")
self.description_text.setText(soundpack.description)
# Update buttons
self.install_button.setEnabled(not soundpack.installed)
self.switch_button.setEnabled(soundpack.installed)
def install_selected(self):
"""Install the selected soundpack"""
current = self.soundpack_list.currentItem()
if not current:
return
soundpack = current.data(Qt.UserRole)
# Confirm installation
reply = QMessageBox.question(
self,
"Install Soundpack",
f"Install soundpack '{soundpack.name}'?\n\nThis will download and extract the soundpack to your soundpacks directory.",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes
)
if reply != QMessageBox.Yes:
return
# Start installation
self.install_button.setEnabled(False)
self.install_button.setText("Installing...")
self.current_thread = SoundpackOperationThread('install', self.manager, soundpack)
self.current_thread.operation_complete.connect(self.on_install_complete)
self.current_thread.progress_update.connect(self.on_progress_update)
self.current_thread.start()
# Show progress dialog
self.progress_dialog = QProgressDialog("Installing soundpack...", "Cancel", 0, 0, self)
self.progress_dialog.setWindowModality(Qt.WindowModal)
self.progress_dialog.show()
def on_install_complete(self, success: bool, message: str):
"""Handle installation completion"""
self.install_button.setEnabled(True)
self.install_button.setText("&Install")
if hasattr(self, 'progress_dialog'):
self.progress_dialog.close()
if success:
QMessageBox.information(self, "Installation Successful", message)
# Ask if user wants to switch to the new soundpack
reply = QMessageBox.question(
self,
"Switch Soundpack",
"Soundpack installed successfully!\n\nWould you like to switch to this soundpack now?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes
)
if reply == QMessageBox.Yes:
current = self.soundpack_list.currentItem()
if current:
soundpack = current.data(Qt.UserRole)
self.manager.switch_soundpack(soundpack.name)
self.update_current_soundpack()
# Refresh lists
self.refresh_soundpacks()
self.load_installed_soundpacks()
else:
QMessageBox.critical(self, "Installation Failed", f"Failed to install soundpack: {message}")
def on_progress_update(self, message: str):
"""Handle progress updates"""
if hasattr(self, 'progress_dialog'):
self.progress_dialog.setLabelText(message)
def switch_to_selected(self):
"""Switch to the selected soundpack"""
current = self.soundpack_list.currentItem()
if not current:
return
soundpack = current.data(Qt.UserRole)
success, message = self.manager.switch_soundpack(soundpack.name)
if success:
QMessageBox.information(self, "Soundpack Switched", message)
self.update_current_soundpack()
self.load_installed_soundpacks()
else:
QMessageBox.critical(self, "Switch Failed", f"Failed to switch soundpack: {message}")
def add_repository(self):
"""Add a new repository"""
dialog = AddRepositoryDialog(self.manager, self)
if dialog.exec() == QDialog.Accepted:
self.load_repositories()
def remove_repository(self):
"""Remove selected repository"""
current = self.repo_list.currentItem()
if not current:
return
repo = current.data(Qt.UserRole)
if not repo: # Skip header items
return
reply = QMessageBox.question(
self,
"Remove Repository",
f"Remove repository '{repo.description}'?\n\nThis will not affect installed soundpacks.",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
self.manager.remove_repository(repo.url)
self.load_repositories()
def on_repository_selected(self):
"""Handle repository selection"""
current = self.repo_list.currentItem()
# Enable remove button only if it's not the header item and has repo data
has_repo_data = current and current.data(Qt.UserRole) is not None
self.remove_repo_button.setEnabled(has_repo_data)
def on_installed_selected(self):
"""Handle installed soundpack selection"""
current = self.installed_list.currentItem()
if current:
pack_name = current.data(Qt.UserRole)
current_pack = self.settings.get('audio', 'sound_pack', 'default')
self.switch_installed_button.setEnabled(pack_name != current_pack)
self.uninstall_button.setEnabled(pack_name != 'default') # Can't uninstall default
else:
self.switch_installed_button.setEnabled(False)
self.uninstall_button.setEnabled(False)
def switch_to_installed(self):
"""Switch to selected installed soundpack"""
current = self.installed_list.currentItem()
if not current:
return
pack_name = current.data(Qt.UserRole)
success, message = self.manager.switch_soundpack(pack_name)
if success:
QMessageBox.information(self, "Soundpack Switched", message)
self.update_current_soundpack()
self.load_installed_soundpacks()
else:
QMessageBox.critical(self, "Switch Failed", f"Failed to switch soundpack: {message}")
def uninstall_selected(self):
"""Uninstall selected soundpack"""
current = self.installed_list.currentItem()
if not current:
return
pack_name = current.data(Qt.UserRole)
reply = QMessageBox.question(
self,
"Uninstall Soundpack",
f"Uninstall soundpack '{pack_name}'?\n\nThis will permanently remove the soundpack files.",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
success, message = self.manager.uninstall_soundpack(pack_name)
if success:
QMessageBox.information(self, "Uninstall Successful", message)
self.load_installed_soundpacks()
self.update_current_soundpack()
self.refresh_soundpacks() # Update available list
else:
QMessageBox.critical(self, "Uninstall Failed", f"Failed to uninstall soundpack: {message}")
class AddRepositoryDialog(QDialog):
"""Dialog for adding a new repository"""
def __init__(self, manager: SoundpackManager, parent=None):
super().__init__(parent)
self.manager = manager
self.setWindowTitle("Add Repository")
self.setModal(True)
self.resize(500, 200)
self.setup_ui()
def setup_ui(self):
"""Setup the UI"""
layout = QVBoxLayout(self)
# Form layout
form_layout = QFormLayout()
self.url_edit = QLineEdit()
self.url_edit.setAccessibleName("Repository URL")
self.url_edit.setPlaceholderText("https://example.com/soundpacks")
self.url_edit.textChanged.connect(self.validate_form)
form_layout.addRow("&URL:", self.url_edit)
self.description_edit = QLineEdit()
self.description_edit.setAccessibleName("Repository description")
self.description_edit.setPlaceholderText("Description of this repository")
self.description_edit.textChanged.connect(self.validate_form)
form_layout.addRow("&Description:", self.description_edit)
layout.addLayout(form_layout)
# Validation button
self.validate_button = QPushButton("&Validate Repository")
self.validate_button.setAccessibleName("Validate repository URL")
self.validate_button.clicked.connect(self.validate_repository)
self.validate_button.setEnabled(False)
layout.addWidget(self.validate_button)
# Status label
self.status_label = QLabel("")
self.status_label.setAccessibleName("Validation status")
self.status_label.setWordWrap(True)
layout.addWidget(self.status_label)
# Button box
self.button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
self.button_box.accepted.connect(self.accept_repository)
self.button_box.rejected.connect(self.reject)
self.button_box.button(QDialogButtonBox.Ok).setEnabled(False)
layout.addWidget(self.button_box)
def validate_form(self):
"""Validate form inputs"""
url = self.url_edit.text().strip()
description = self.description_edit.text().strip()
has_content = bool(url and description)
self.validate_button.setEnabled(has_content)
if has_content:
# Basic URL validation
valid_format, message = self.manager.validate_url(url)
if not valid_format:
self.status_label.setText(f"URL Error: {message}")
self.status_label.setStyleSheet("color: red;")
else:
self.status_label.setText("Click 'Validate Repository' to check if this URL contains soundpacks")
self.status_label.setStyleSheet("color: blue;")
else:
self.status_label.clear()
def validate_repository(self):
"""Validate the repository URL"""
url = self.url_edit.text().strip()
self.validate_button.setEnabled(False)
self.validate_button.setText("Validating...")
self.status_label.setText("Checking repository...")
self.status_label.setStyleSheet("color: blue;")
# Start validation in background
self.validation_thread = SoundpackOperationThread('validate_repo', self.manager, url)
self.validation_thread.operation_complete.connect(self.on_validation_complete)
self.validation_thread.start()
def on_validation_complete(self, success: bool, message: str):
"""Handle validation completion"""
self.validate_button.setEnabled(True)
self.validate_button.setText("&Validate Repository")
if success:
self.status_label.setText(f"{message}")
self.status_label.setStyleSheet("color: green;")
self.button_box.button(QDialogButtonBox.Ok).setEnabled(True)
else:
self.status_label.setText(f"{message}")
self.status_label.setStyleSheet("color: red;")
self.button_box.button(QDialogButtonBox.Ok).setEnabled(False)
def accept_repository(self):
"""Add the repository"""
url = self.url_edit.text().strip()
description = self.description_edit.text().strip()
success, message = self.manager.add_repository(url, description)
if success:
self.accept()
else:
QMessageBox.critical(self, "Add Repository Failed", f"Failed to add repository: {message}")

View File

@ -37,6 +37,8 @@ class TimelineView(AccessibleTreeWidget):
self.activitypub_client = None self.activitypub_client = None
self.posts = [] # Store loaded posts self.posts = [] # Store loaded posts
self.oldest_post_id = None # Track for pagination self.oldest_post_id = None # Track for pagination
self.newest_post_id = None # Track newest post seen for new content detection
self.skip_notifications = True # Skip notifications on initial loads
self.setup_ui() self.setup_ui()
self.refresh() self.refresh()
@ -70,7 +72,16 @@ class TimelineView(AccessibleTreeWidget):
def set_timeline_type(self, timeline_type: str): def set_timeline_type(self, timeline_type: str):
"""Set the timeline type (home, local, federated)""" """Set the timeline type (home, local, federated)"""
self.timeline_type = timeline_type self.timeline_type = timeline_type
# Disable notifications when switching timelines since user is actively viewing new content
self.skip_notifications = True
self.refresh() self.refresh()
# Re-enable notifications after a brief delay (user has had time to see the new timeline)
from PySide6.QtCore import QTimer
QTimer.singleShot(3000, self.enable_notifications) # 3 seconds
def enable_notifications(self):
"""Enable desktop notifications for timeline updates"""
self.skip_notifications = False
def refresh(self): def refresh(self):
"""Refresh the timeline content""" """Refresh the timeline content"""
@ -99,9 +110,13 @@ class TimelineView(AccessibleTreeWidget):
timeline_data = self.activitypub_client.get_timeline(self.timeline_type, limit=posts_per_page) timeline_data = self.activitypub_client.get_timeline(self.timeline_type, limit=posts_per_page)
self.load_timeline_data(timeline_data) self.load_timeline_data(timeline_data)
# Track oldest post for pagination # Track oldest and newest post IDs
if timeline_data: if timeline_data:
self.oldest_post_id = timeline_data[-1]['id'] self.oldest_post_id = timeline_data[-1]['id']
# Track newest post ID for new content detection
if not self.newest_post_id:
# First load - set newest to first post
self.newest_post_id = timeline_data[0]['id']
except Exception as e: except Exception as e:
print(f"Failed to fetch timeline: {e}") print(f"Failed to fetch timeline: {e}")
# Show error message instead of sample data # Show error message instead of sample data
@ -109,6 +124,15 @@ class TimelineView(AccessibleTreeWidget):
def load_timeline_data(self, timeline_data): def load_timeline_data(self, timeline_data):
"""Load real timeline data from ActivityPub API""" """Load real timeline data from ActivityPub API"""
# Check for new content by comparing newest post ID
has_new_content = False
if timeline_data and self.newest_post_id:
# Check if the first post (newest) is different from what we had
current_newest_id = timeline_data[0]['id']
if current_newest_id != self.newest_post_id:
has_new_content = True
self.newest_post_id = current_newest_id
self.posts = [] self.posts = []
if self.timeline_type == "notifications": if self.timeline_type == "notifications":
@ -126,18 +150,20 @@ class TimelineView(AccessibleTreeWidget):
post.notification_account = notification_data['account']['acct'] post.notification_account = notification_data['account']['acct']
self.posts.append(post) self.posts.append(post)
# Show desktop notification # Show desktop notification (skip if this is initial load)
content_preview = post.get_content_text()[:100] + "..." if len(post.get_content_text()) > 100 else post.get_content_text() if not self.skip_notifications:
content_preview = post.get_content_text()[:100] + "..." if len(post.get_content_text()) > 100 else post.get_content_text()
if notification_type == 'mention': if notification_type == 'mention':
self.notification_manager.notify_mention(sender, content_preview) self.notification_manager.notify_mention(sender, content_preview)
elif notification_type == 'reblog': elif notification_type == 'reblog':
self.notification_manager.notify_boost(sender, content_preview) self.notification_manager.notify_boost(sender, content_preview)
elif notification_type == 'favourite': elif notification_type == 'favourite':
self.notification_manager.notify_favorite(sender, content_preview) self.notification_manager.notify_favorite(sender, content_preview)
elif notification_type == 'follow': elif notification_type == 'follow':
# Handle follow notifications without status # Handle follow notifications without status (skip if initial load)
self.notification_manager.notify_follow(sender) if not self.skip_notifications:
self.notification_manager.notify_follow(sender)
except Exception as e: except Exception as e:
print(f"Error parsing notification: {e}") print(f"Error parsing notification: {e}")
continue continue
@ -153,15 +179,16 @@ class TimelineView(AccessibleTreeWidget):
print(f"Error parsing post: {e}") print(f"Error parsing post: {e}")
continue continue
# Show timeline update notification if new posts were loaded # Show timeline update notification if new content detected (skip if initial load)
if new_posts and len(new_posts) > 0: if has_new_content and not self.skip_notifications:
timeline_name = { timeline_name = {
'home': 'home timeline', 'home': 'home timeline',
'local': 'local timeline', 'local': 'local timeline',
'federated': 'federated timeline' 'federated': 'federated timeline'
}.get(self.timeline_type, 'timeline') }.get(self.timeline_type, 'timeline')
self.notification_manager.notify_timeline_update(len(new_posts), timeline_name) # Use generic "new content" message instead of counting posts
self.notification_manager.notify_new_content(timeline_name)
# Build thread structure # Build thread structure
self.build_threaded_timeline() self.build_threaded_timeline()
@ -400,6 +427,9 @@ class TimelineView(AccessibleTreeWidget):
) )
if more_data: if more_data:
# Remember current "Load more" item position
load_more_index = self.get_load_more_item_index()
# Remove current "Load more" item # Remove current "Load more" item
self.remove_load_more_item() self.remove_load_more_item()
@ -409,16 +439,37 @@ class TimelineView(AccessibleTreeWidget):
# Update oldest post ID # Update oldest post ID
self.oldest_post_id = more_data[-1]['id'] self.oldest_post_id = more_data[-1]['id']
# Rebuild timeline with all posts # Add new posts to the tree without rebuilding everything
self.build_threaded_timeline() self.add_new_posts_to_tree(more_data, load_more_index)
self.status_bar.showMessage(f"Loaded {len(more_data)} more posts", 2000) # Add new "Load more" item at the end
self.add_load_more_item()
# Focus on the first newly added post so user can arrow down to read them
if load_more_index is not None and load_more_index < self.topLevelItemCount():
first_new_item = self.topLevelItem(load_more_index)
if first_new_item:
self.setCurrentItem(first_new_item)
self.scrollToItem(first_new_item)
if hasattr(self, 'parent') and hasattr(self.parent(), 'status_bar'):
self.parent().status_bar.showMessage(f"Loaded {len(more_data)} more posts", 2000)
else: else:
self.status_bar.showMessage("No more posts to load", 2000) if hasattr(self, 'parent') and hasattr(self.parent(), 'status_bar'):
self.parent().status_bar.showMessage("No more posts to load", 2000)
except Exception as e: except Exception as e:
print(f"Failed to load more posts: {e}") print(f"Failed to load more posts: {e}")
self.status_bar.showMessage(f"Failed to load more posts: {str(e)}", 3000) if hasattr(self, 'parent') and hasattr(self.parent(), 'status_bar'):
self.parent().status_bar.showMessage(f"Failed to load more posts: {str(e)}", 3000)
def get_load_more_item_index(self):
"""Get the index of the 'Load more posts' item"""
for i in range(self.topLevelItemCount()):
item = self.topLevelItem(i)
if item.data(0, Qt.UserRole) == "load_more":
return i
return None
def remove_load_more_item(self): def remove_load_more_item(self):
"""Remove the 'Load more posts' item""" """Remove the 'Load more posts' item"""
@ -459,6 +510,79 @@ class TimelineView(AccessibleTreeWidget):
print(f"Error parsing post: {e}") print(f"Error parsing post: {e}")
continue continue
def add_new_posts_to_tree(self, timeline_data, insert_index):
"""Add new posts to the tree at the specified index without rebuilding everything"""
if insert_index is None:
insert_index = self.topLevelItemCount()
new_posts = []
# Parse the new posts
if self.timeline_type == "notifications":
for notification_data in timeline_data:
try:
notification_type = notification_data['type']
if 'status' in notification_data:
post = Post.from_api_dict(notification_data['status'])
post.notification_type = notification_type
post.notification_account = notification_data['account']['acct']
new_posts.append(post)
except Exception as e:
print(f"Error parsing notification: {e}")
continue
else:
for status_data in timeline_data:
try:
post = Post.from_api_dict(status_data)
new_posts.append(post)
except Exception as e:
print(f"Error parsing post: {e}")
continue
# Group new posts by thread and insert them
thread_roots = {}
orphaned_posts = []
# Find thread roots among new posts
for post in new_posts:
if not post.in_reply_to_id:
thread_roots[post.id] = [post]
# Assign replies to thread roots
for post in new_posts:
if post.in_reply_to_id:
root_id = self.find_thread_root(post, new_posts)
if root_id and root_id in thread_roots:
thread_roots[root_id].append(post)
else:
orphaned_posts.append(post)
# Insert thread root posts with their replies
current_insert_index = insert_index
for root_id, thread_posts in thread_roots.items():
root_post = thread_posts[0]
root_item = self.create_post_item(root_post)
self.insertTopLevelItem(current_insert_index, root_item)
# Add replies as children
for post in thread_posts[1:]:
reply_item = self.create_post_item(post)
reply_item.setData(0, Qt.UserRole + 1, post.in_reply_to_id)
root_item.addChild(reply_item)
# Collapse the thread initially
root_item.setExpanded(False)
if root_item.childCount() > 0:
self.update_child_accessibility(root_item, False)
current_insert_index += 1
# Insert orphaned posts
for post in orphaned_posts:
orphaned_item = self.create_post_item(post)
self.insertTopLevelItem(current_insert_index, orphaned_item)
current_insert_index += 1
def show_context_menu(self, position): def show_context_menu(self, position):
"""Show context menu for the selected post""" """Show context menu for the selected post"""
item = self.itemAt(position) item = self.itemAt(position)