Files
bifrost/src/main_window.py
Storm Dragon 0a217c62ba Add comprehensive feature updates based on user feedback
- Implement complete list management system with CRUD operations
- Add dynamic character count display with instance limits
- Include post visibility information in timeline display
- Show user interaction status (favorited/boosted/bookmarked)
- Extend sound system with social action events
- Add list timeline integration and management interface
- Update documentation with all new features

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-17 18:14:17 -04:00

1618 lines
64 KiB
Python

"""
Main application window for Bifrost
"""
from PySide6.QtWidgets import (
QMainWindow,
QWidget,
QVBoxLayout,
QHBoxLayout,
QLabel,
QMenuBar,
QStatusBar,
QPushButton,
QTabWidget,
)
from PySide6.QtCore import Qt, Signal, QTimer
from PySide6.QtGui import QKeySequence, QAction, QTextCursor
import time
import logging
from config.settings import SettingsManager
from config.accounts import AccountManager
from widgets.timeline_view import TimelineView
from widgets.compose_dialog import ComposeDialog
from widgets.login_dialog import LoginDialog
from widgets.account_selector import AccountSelector
from widgets.settings_dialog import SettingsDialog
from widgets.soundpack_manager_dialog import SoundpackManagerDialog
from widgets.profile_dialog import ProfileDialog
from widgets.profile_edit_dialog import ProfileEditDialog
from widgets.accessible_text_dialog import AccessibleTextDialog
from widgets.search_dialog import SearchDialog
from widgets.timeline_filter_dialog import TimelineFilterDialog
from widgets.list_manager_dialog import ListManagerDialog
from managers.post_manager import PostManager
from managers.post_actions_manager import PostActionsManager
from managers.sound_coordinator import SoundCoordinator
from managers.error_manager import ErrorManager
class MainWindow(QMainWindow):
"""Main Bifrost application window"""
def __init__(self):
super().__init__()
self.settings = SettingsManager()
self.account_manager = AccountManager(self.settings)
self.logger = logging.getLogger("bifrost.main")
# List timeline tracking
self.current_list_id = None
self.current_list_title = None
# Auto-refresh tracking
self.last_activity_time = time.time()
self.is_initial_load = True # Flag to skip notifications on first load
# Refresh mode logging state tracking
self._last_logged_refresh_interval = None
self._last_logged_stream_mode = None
self.setup_ui()
# Initialize centralized managers after timeline is created
timeline_sound_manager = getattr(self.timeline, "sound_manager", None)
# Sound coordination - single point of truth for all audio events
self.sound_coordinator = (
SoundCoordinator(timeline_sound_manager) if timeline_sound_manager else None
)
# Error management - single point of truth for error handling
self.error_manager = ErrorManager(self, self.sound_coordinator)
# Post management using sound coordinator
self.post_manager = PostManager(self.account_manager, timeline_sound_manager)
self.post_manager.post_success.connect(self.on_post_success)
self.post_manager.post_failed.connect(self.on_post_failed)
# Post actions management - single point of truth for boost/favorite/etc
self.post_actions_manager = PostActionsManager(self.account_manager, timeline_sound_manager)
self.post_actions_manager.action_success.connect(self.on_action_success)
self.post_actions_manager.action_failed.connect(self.on_action_failed)
self.post_actions_manager.refresh_requested.connect(self.on_action_refresh_requested)
self.setup_menus()
self.setup_shortcuts()
self.setup_auto_refresh()
# Connect status bar to error manager after both are created
self.error_manager.set_status_bar(self.status_bar)
# Check if we need to show login dialog
if not self.account_manager.has_accounts():
self.show_first_time_setup()
# Play startup sound through coordinator to prevent duplicates
if self.sound_coordinator:
self.sound_coordinator.play_startup("application_startup")
# Mark initial load as complete after startup
QTimer.singleShot(2000, self.mark_initial_load_complete)
def setup_ui(self):
"""Initialize the user interface"""
self.setWindowTitle("Bifrost - Fediverse Client")
self.setMinimumSize(800, 600)
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Account selector
self.account_selector = AccountSelector(self.account_manager)
self.account_selector.account_changed.connect(self.on_account_changed)
self.account_selector.add_account_requested.connect(self.show_login_dialog)
main_layout.addWidget(self.account_selector)
# Timeline tabs
self.timeline_tabs = QTabWidget()
self.timeline_tabs.setAccessibleName("Timeline Selection")
self.timeline_tabs.addTab(QWidget(), "Home")
self.timeline_tabs.addTab(QWidget(), "Messages")
self.timeline_tabs.addTab(QWidget(), "Favorites")
self.timeline_tabs.addTab(QWidget(), "Notifications")
self.timeline_tabs.addTab(QWidget(), "Local")
self.timeline_tabs.addTab(QWidget(), "Federated")
self.timeline_tabs.addTab(QWidget(), "Bookmarks")
self.timeline_tabs.addTab(QWidget(), "Followers")
self.timeline_tabs.addTab(QWidget(), "Following")
self.timeline_tabs.addTab(QWidget(), "Blocked")
self.timeline_tabs.addTab(QWidget(), "Muted")
self.timeline_tabs.currentChanged.connect(self.on_timeline_tab_changed)
main_layout.addWidget(self.timeline_tabs)
# Status label for connection info
self.status_label = QLabel()
self.status_label.setAccessibleName("Connection Status")
main_layout.addWidget(self.status_label)
self.update_status_label()
# Timeline view (main content area)
self.timeline = TimelineView(self.account_manager)
self.timeline.setAccessibleName("Timeline")
self.timeline.reply_requested.connect(self.reply_to_post)
self.timeline.boost_requested.connect(self.boost_post)
self.timeline.favorite_requested.connect(self.favorite_post)
self.timeline.profile_requested.connect(self.view_profile)
self.timeline.delete_requested.connect(self.delete_post)
self.timeline.edit_requested.connect(self.edit_post)
self.timeline.follow_requested.connect(self.follow_user)
self.timeline.unfollow_requested.connect(self.unfollow_user)
main_layout.addWidget(self.timeline)
# Compose button
compose_layout = QHBoxLayout()
self.compose_button = QPushButton("&Compose Post")
self.compose_button.setAccessibleName("Compose New Post")
self.compose_button.clicked.connect(self.show_compose_dialog)
compose_layout.addWidget(self.compose_button)
compose_layout.addStretch()
main_layout.addLayout(compose_layout)
# Status bar
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("Ready")
def setup_menus(self):
"""Create application menus"""
menubar = self.menuBar()
# File menu
file_menu = menubar.addMenu("&File")
# New post action
new_post_action = QAction("&New Post", self)
new_post_action.setShortcut(QKeySequence.New)
new_post_action.triggered.connect(self.show_compose_dialog)
file_menu.addAction(new_post_action)
file_menu.addSeparator()
# Account management
add_account_action = QAction("&Add Account", self)
add_account_action.setShortcut(QKeySequence("Ctrl+Shift+A"))
add_account_action.triggered.connect(self.show_login_dialog)
file_menu.addAction(add_account_action)
# Edit Profile action
edit_profile_action = QAction("&Edit Profile", self)
edit_profile_action.setShortcut(QKeySequence("Ctrl+Alt+E"))
edit_profile_action.triggered.connect(self.show_edit_profile)
file_menu.addAction(edit_profile_action)
file_menu.addSeparator()
# Settings action
settings_action = QAction("&Settings", self)
settings_action.setShortcut(QKeySequence.Preferences)
settings_action.triggered.connect(self.show_settings)
file_menu.addAction(settings_action)
# Soundpack Manager action
soundpack_action = QAction("Sound&pack Manager", self)
soundpack_action.setShortcut(QKeySequence("Ctrl+Shift+P"))
soundpack_action.triggered.connect(self.show_soundpack_manager)
file_menu.addAction(soundpack_action)
file_menu.addSeparator()
# Quit action
quit_action = QAction("&Quit", self)
quit_action.setShortcut(QKeySequence.Quit)
quit_action.triggered.connect(self.quit_application)
file_menu.addAction(quit_action)
# View menu
view_menu = menubar.addMenu("&View")
# Search action
search_action = QAction("&Search", self)
search_action.setShortcut(QKeySequence("Ctrl+S"))
search_action.triggered.connect(self.open_search_dialog)
view_menu.addAction(search_action)
view_menu.addSeparator()
# Timeline filters action
filter_action = QAction("Timeline &Filters", self)
filter_action.setShortcut(QKeySequence("Ctrl+Shift+F"))
filter_action.triggered.connect(self.open_timeline_filter_dialog)
view_menu.addAction(filter_action)
view_menu.addSeparator()
# Refresh timeline action
refresh_action = QAction("&Refresh Timeline", self)
refresh_action.setShortcut(QKeySequence.Refresh)
refresh_action.triggered.connect(self.refresh_timeline)
view_menu.addAction(refresh_action)
# Timeline menu
timeline_menu = menubar.addMenu("&Timeline")
# Home timeline action
home_action = QAction("&Home", self)
home_action.setShortcut(QKeySequence("Ctrl+1"))
home_action.triggered.connect(lambda: self.switch_timeline(0))
timeline_menu.addAction(home_action)
# Messages timeline action
messages_action = QAction("&Messages", self)
messages_action.setShortcut(QKeySequence("Ctrl+2"))
messages_action.triggered.connect(lambda: self.switch_timeline(1))
timeline_menu.addAction(messages_action)
# Notifications timeline action
notifications_action = QAction("&Notifications", self)
notifications_action.setShortcut(QKeySequence("Ctrl+3"))
notifications_action.triggered.connect(lambda: self.switch_timeline(2))
timeline_menu.addAction(notifications_action)
# Local timeline action
local_action = QAction("&Local", self)
local_action.setShortcut(QKeySequence("Ctrl+4"))
local_action.triggered.connect(lambda: self.switch_timeline(3))
timeline_menu.addAction(local_action)
# Federated timeline action
federated_action = QAction("&Federated", self)
federated_action.setShortcut(QKeySequence("Ctrl+5"))
federated_action.triggered.connect(lambda: self.switch_timeline(4))
timeline_menu.addAction(federated_action)
# Bookmarks timeline action
bookmarks_action = QAction("&Bookmarks", self)
bookmarks_action.setShortcut(QKeySequence("Ctrl+6"))
bookmarks_action.triggered.connect(lambda: self.switch_timeline(5))
timeline_menu.addAction(bookmarks_action)
# Followers timeline action
followers_action = QAction("Follo&wers", self)
followers_action.setShortcut(QKeySequence("Ctrl+7"))
followers_action.triggered.connect(lambda: self.switch_timeline(6))
timeline_menu.addAction(followers_action)
# Following timeline action
following_action = QAction("Follo&wing", self)
following_action.setShortcut(QKeySequence("Ctrl+8"))
following_action.triggered.connect(lambda: self.switch_timeline(7))
timeline_menu.addAction(following_action)
# Blocked users timeline action
blocked_action = QAction("Bloc&ked Users", self)
blocked_action.setShortcut(QKeySequence("Ctrl+9"))
blocked_action.triggered.connect(lambda: self.switch_timeline(8))
timeline_menu.addAction(blocked_action)
# Muted users timeline action
muted_action = QAction("M&uted Users", self)
muted_action.setShortcut(QKeySequence("Ctrl+0"))
muted_action.triggered.connect(lambda: self.switch_timeline(9))
timeline_menu.addAction(muted_action)
timeline_menu.addSeparator()
# Lists submenu
self.lists_menu = timeline_menu.addMenu("&Lists")
self.lists_menu.aboutToShow.connect(self.refresh_lists_menu)
# Post menu
post_menu = menubar.addMenu("&Post")
# Reply action
reply_action = QAction("&Reply", self)
reply_action.setShortcut(QKeySequence("Ctrl+R"))
reply_action.triggered.connect(self.reply_to_current_post)
post_menu.addAction(reply_action)
# Boost action
boost_action = QAction("&Boost", self)
boost_action.setShortcut(QKeySequence("Ctrl+B"))
boost_action.triggered.connect(self.boost_current_post)
post_menu.addAction(boost_action)
# Favorite action
favorite_action = QAction("&Favorite", self)
favorite_action.setShortcut(QKeySequence("Ctrl+F"))
favorite_action.triggered.connect(self.favorite_current_post)
post_menu.addAction(favorite_action)
post_menu.addSeparator()
# Copy action
copy_action = QAction("&Copy to Clipboard", self)
copy_action.setShortcut(QKeySequence("Ctrl+C"))
copy_action.triggered.connect(self.copy_current_post)
post_menu.addAction(copy_action)
# Open URLs action
urls_action = QAction("Open &URLs in Browser", self)
urls_action.setShortcut(QKeySequence("Ctrl+U"))
urls_action.triggered.connect(self.open_current_post_urls)
post_menu.addAction(urls_action)
post_menu.addSeparator()
# Edit action (for owned posts)
edit_action = QAction("&Edit Post", self)
edit_action.setShortcut(QKeySequence("Ctrl+Shift+E"))
edit_action.triggered.connect(self.edit_current_post)
post_menu.addAction(edit_action)
# Delete action (for owned posts)
delete_action = QAction("&Delete Post", self)
delete_action.setShortcut(QKeySequence("Shift+Delete"))
delete_action.triggered.connect(self.delete_current_post)
post_menu.addAction(delete_action)
# Social menu
social_menu = menubar.addMenu("&Social")
# Follow action
follow_action = QAction("&Follow User", self)
follow_action.setShortcut(QKeySequence("Ctrl+Shift+F"))
follow_action.triggered.connect(self.follow_current_user)
social_menu.addAction(follow_action)
# Unfollow action
unfollow_action = QAction("&Unfollow User", self)
unfollow_action.setShortcut(QKeySequence("Ctrl+Shift+U"))
unfollow_action.triggered.connect(self.unfollow_current_user)
social_menu.addAction(unfollow_action)
social_menu.addSeparator()
# Block action
block_action = QAction("&Block User", self)
block_action.setShortcut(QKeySequence("Ctrl+Shift+B"))
block_action.triggered.connect(self.block_current_user)
social_menu.addAction(block_action)
# Mute action
mute_action = QAction("&Mute User", self)
mute_action.setShortcut(QKeySequence("Ctrl+Shift+M"))
mute_action.triggered.connect(self.mute_current_user)
social_menu.addAction(mute_action)
social_menu.addSeparator()
# Lists management action
lists_action = QAction("Manage &Lists...", self)
lists_action.setShortcut(QKeySequence("Ctrl+L"))
lists_action.triggered.connect(self.open_list_manager)
social_menu.addAction(lists_action)
# Manual follow action
manual_follow_action = QAction("Follow &Specific User...", self)
manual_follow_action.setShortcut(QKeySequence("Ctrl+Shift+M"))
manual_follow_action.triggered.connect(self.show_manual_follow_dialog)
social_menu.addAction(manual_follow_action)
def setup_shortcuts(self):
"""Set up keyboard shortcuts"""
# Additional shortcuts that don't need menu items
pass
def setup_auto_refresh(self):
"""Set up auto-refresh timer and streaming"""
# Create auto-refresh timer
self.auto_refresh_timer = QTimer()
self.auto_refresh_timer.timeout.connect(self.check_auto_refresh)
# Check every 30 seconds if we should refresh
self.auto_refresh_timer.start(30000) # 30 seconds
# Initialize streaming mode state
self.streaming_mode = False
self.streaming_client = None
# Check if we should start in streaming mode
self.logger.debug("Initializing refresh mode")
self.update_refresh_mode()
self.logger.debug("Refresh mode initialization complete")
def mark_initial_load_complete(self):
"""Mark that initial loading is complete"""
self.is_initial_load = False
# Enable notifications on the timeline
if hasattr(self.timeline, "enable_notifications"):
self.timeline.enable_notifications()
def keyPressEvent(self, event):
"""Track keyboard activity for auto-refresh"""
self.last_activity_time = time.time()
super().keyPressEvent(event)
def check_auto_refresh(self):
"""Check if we should auto-refresh the timeline or manage streaming"""
# Check if refresh mode has changed (settings updated)
self.update_refresh_mode()
# Skip if auto-refresh is disabled
if not self.settings.get_bool("general", "auto_refresh_enabled", True):
return
# Skip if no account is active
if not self.account_manager.get_active_account():
return
# If we're in streaming mode, no periodic refresh needed
if self.streaming_mode:
return
# Get refresh interval from settings
refresh_interval = self.settings.get_int(
"general", "timeline_refresh_interval", 300
)
# Skip if streaming mode (interval = 0)
if refresh_interval == 0:
return
# Check if enough time has passed since last activity
time_since_activity = time.time() - self.last_activity_time
required_idle_time = refresh_interval + 10 # refresh_rate + 10 seconds
self.logger.debug(
f"Auto-refresh check: {time_since_activity:.1f}s since activity, need {required_idle_time}s idle"
)
if time_since_activity >= required_idle_time:
self.logger.debug("Auto-refresh condition met, triggering refresh")
self.auto_refresh_timeline()
else:
self.logger.debug(
f"Auto-refresh skipped: need {required_idle_time - time_since_activity:.1f}s more idle time"
)
def auto_refresh_timeline(self):
"""Automatically refresh the timeline - DELEGATED TO TIMELINE"""
self.logger.debug("auto_refresh_timeline() called")
# Check if timeline can safely auto-refresh
if not self.timeline.can_auto_refresh():
self.logger.debug("Timeline cannot auto-refresh, skipping")
return
self.logger.debug("Timeline can auto-refresh, calling request_auto_refresh()")
# Use centralized refresh method
success = self.timeline.request_auto_refresh()
self.logger.debug(f"request_auto_refresh() returned: {success}")
if success:
# Reset activity timer to prevent immediate re-refresh
self.last_activity_time = time.time()
self.logger.debug("Auto-refresh completed, activity timer reset")
def update_refresh_mode(self):
"""Update refresh mode based on settings (0 = streaming, >0 = polling)"""
# Prevent infinite recursion
if hasattr(self, "_updating_refresh_mode") and self._updating_refresh_mode:
return
self._updating_refresh_mode = True
try:
refresh_interval = self.settings.get_int(
"general", "timeline_refresh_interval", 300
)
should_stream = refresh_interval == 0
# Check if server supports streaming
if should_stream:
active_account = self.account_manager.get_active_account()
if active_account:
# Disable streaming for known non-supporting servers
server_supports_streaming = self.check_server_streaming_support(
active_account.instance_url
)
if not server_supports_streaming:
self.logger.info(
"Server does not support streaming, switching to polling"
)
should_stream = False
# Set a reasonable polling interval instead
if refresh_interval == 0:
self.logger.debug(
"Using 2-minute polling instead of streaming"
)
# Don't save this change to settings, just use it temporarily
refresh_interval = 120 # 2 minutes
# Only log refresh interval when it changes
if (
refresh_interval != self._last_logged_refresh_interval
or should_stream != self._last_logged_stream_mode
):
self.logger.debug(
f"Refresh interval = {refresh_interval} seconds, should_stream = {should_stream}"
)
self._last_logged_refresh_interval = refresh_interval
self._last_logged_stream_mode = should_stream
# Check if mode changed
if should_stream != self.streaming_mode:
if should_stream:
self.start_streaming_mode()
else:
self.stop_streaming_mode()
finally:
self._updating_refresh_mode = False
def check_server_streaming_support(self, instance_url: str) -> bool:
"""Check if the server supports real-time streaming APIs"""
try:
# Quick URL-based checks for known non-streaming servers
url_lower = instance_url.lower()
if "gotosocial" in url_lower:
self.logger.debug("GoToSocial detected in URL - no streaming support")
return False
# Check if we've already determined this server doesn't support streaming
active_account = self.account_manager.get_active_account()
if active_account:
client = self.account_manager.get_client_for_active_account()
if (
client
and hasattr(client, "streaming_supported")
and not client.streaming_supported
):
self.logger.debug("Server previously failed streaming attempts")
return False
# Check instance info via API to detect server software
if client:
try:
instance_info = client.get_instance_info()
version = instance_info.get("version", "").lower()
if "gotosocial" in version:
self.logger.debug(f"GoToSocial detected via API: {version}")
return False
except Exception as e:
self.logger.warning(f"Could not fetch instance info: {e}")
# Default: assume streaming is supported (Mastodon, Pleroma, etc.)
return True
except Exception as e:
self.logger.warning(f"Could not detect server streaming support: {e}")
# Default to no streaming if we can't determine
return False
def start_streaming_mode(self):
"""Start real-time streaming mode"""
self.logger.debug("start_streaming_mode() called")
active_account = self.account_manager.get_active_account()
if not active_account:
self.logger.warning("No active account, cannot start streaming")
return
self.logger.debug(
f"Active account: {active_account.username}@{active_account.instance_url}"
)
try:
# Stop any existing streaming
self.stop_streaming_mode()
# Create streaming client if needed
if (
not self.streaming_client
or self.streaming_client.instance_url != active_account.instance_url
):
self.streaming_client = (
self.account_manager.get_client_for_active_account()
)
if not self.streaming_client:
return
# Start streaming for current timeline type
timeline_type = self.timeline.timeline_type
if timeline_type in ["home", "local", "federated", "notifications"]:
self.streaming_client.start_streaming(
timeline_type, callback=self.handle_streaming_event
)
self.streaming_mode = True
self.logger.info(f"Started streaming for {timeline_type} timeline")
except Exception as e:
self.logger.error(f"Failed to start streaming: {e}")
# Fall back to polling mode
self.streaming_mode = False
def stop_streaming_mode(self):
"""Stop streaming mode and fall back to polling"""
if self.streaming_client:
try:
self.streaming_client.stop_streaming()
except Exception as e:
self.logger.error(f"Error stopping streaming: {e}")
self.streaming_mode = False
self.logger.info("Stopped streaming mode")
def handle_streaming_event(self, event_type: str, data):
"""Handle real-time streaming events"""
try:
if event_type == "new_post":
# New post received via streaming
self.add_streaming_post(data)
elif event_type == "new_notification":
# New notification received
self.handle_streaming_notification(data)
elif event_type == "delete_post":
# Post deleted
self.remove_streaming_post(data)
except Exception as e:
self.logger.error(f"Error handling streaming event: {e}")
def add_streaming_post(self, post_data):
"""Add a new post received via streaming to the timeline"""
# Only add to timeline if we're on the right timeline type
current_timeline = self.timeline.timeline_type
# Trigger a refresh to show new content
# In future, could add the post directly to avoid full refresh
if not self.is_initial_load:
self.logger.debug(
f"New streaming post received for {current_timeline} timeline"
)
# Show notification of new content
timeline_name = {
"home": "home timeline",
"local": "local timeline",
"federated": "federated timeline",
}.get(current_timeline, "timeline")
if (
hasattr(self.timeline, "notification_manager")
and not self.timeline.should_suppress_notifications()
):
self.timeline.notification_manager.notify_new_content(timeline_name)
# Play sound for new content
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_timeline_update()
# Try to add streaming post directly instead of full refresh
self.logger.debug("Adding streaming post directly to timeline")
try:
from models.post import Post
streaming_post = Post.from_api_dict(post_data)
self.timeline.add_streaming_post_to_timeline(streaming_post)
except Exception as e:
self.logger.warning(
f"Failed to add streaming post directly, falling back to refresh: {e}"
)
self.timeline.refresh(preserve_position=True)
def handle_streaming_notification(self, notification_data):
"""Handle new notifications received via streaming"""
self.logger.debug(
f"New streaming notification received: {notification_data.get('type', 'unknown')}"
)
# If we're on the notifications timeline, refresh to show the new notification
if self.timeline.timeline_type == "notifications":
self.logger.debug("Refreshing notifications timeline for new notification")
self.timeline.refresh(preserve_position=True)
# Play appropriate notification sound based on type
if (
hasattr(self.timeline, "sound_manager")
and not self.timeline.should_suppress_notifications()
):
notification_type = notification_data.get("type", "notification")
if notification_type == "mention":
self.timeline.sound_manager.play_mention()
elif notification_type == "reblog":
self.timeline.sound_manager.play_boost()
elif notification_type == "favourite":
self.timeline.sound_manager.play_favorite()
elif notification_type == "follow":
self.timeline.sound_manager.play_follow()
else:
self.timeline.sound_manager.play_notification()
def remove_streaming_post(self, status_id):
"""Remove a deleted post from the timeline"""
# For now, this is a no-op - could implement post removal in future
pass
def show_compose_dialog(self):
"""Show the compose post dialog"""
dialog = ComposeDialog(self.account_manager, self)
dialog.post_sent.connect(self.on_post_sent)
dialog.exec()
def open_search_dialog(self):
"""Open the search dialog"""
if not self.account_manager.get_active_account():
self.logger.warning("No account available for search")
return
client = self.account_manager.get_client_for_active_account()
if not client:
self.logger.warning("No client available for search")
return
self.logger.debug("Opening search dialog")
dialog = SearchDialog(client, self.sound_coordinator.sound_manager, self)
dialog.exec()
def open_timeline_filter_dialog(self):
"""Open the timeline filter dialog"""
current_timeline = self.timeline
if current_timeline:
self.logger.debug("Opening timeline filter dialog")
dialog = TimelineFilterDialog(current_timeline, self)
dialog.exec()
else:
self.logger.warning("No timeline available for filtering")
def open_list_manager(self):
"""Open the list manager dialog"""
if not self.account_manager.get_active_account():
self.logger.warning("No account available for list management")
AccessibleTextDialog.show_warning(
"No Account",
"Please log in to an account before managing lists.",
self
)
return
self.logger.debug("Opening list manager dialog")
dialog = ListManagerDialog(self.account_manager, self)
dialog.list_updated.connect(self.on_lists_updated)
dialog.exec()
def on_lists_updated(self):
"""Handle when lists are updated"""
# Refresh timeline if currently viewing a list
if hasattr(self, 'current_list_id') and self.current_list_id:
self.timeline.refresh()
self.logger.info("Lists updated")
def refresh_lists_menu(self):
"""Refresh the lists submenu with current user lists"""
self.lists_menu.clear()
if not self.account_manager.get_active_account():
no_account_action = QAction("(No account logged in)", self)
no_account_action.setEnabled(False)
self.lists_menu.addAction(no_account_action)
return
try:
client = self.account_manager.get_client_for_active_account()
if not client:
no_client_action = QAction("(No client available)", self)
no_client_action.setEnabled(False)
self.lists_menu.addAction(no_client_action)
return
# Load user's lists
lists = client.get_lists()
if not lists:
no_lists_action = QAction("(No lists created)", self)
no_lists_action.setEnabled(False)
self.lists_menu.addAction(no_lists_action)
else:
for list_data in lists:
list_title = list_data['title']
list_id = list_data['id']
list_action = QAction(list_title, self)
list_action.triggered.connect(
lambda checked, lid=list_id, title=list_title: self.switch_to_list_timeline(lid, title)
)
self.lists_menu.addAction(list_action)
self.lists_menu.addSeparator()
# Add management option
manage_action = QAction("&Manage Lists...", self)
manage_action.triggered.connect(self.open_list_manager)
self.lists_menu.addAction(manage_action)
except Exception as e:
self.logger.error(f"Failed to load lists menu: {e}")
error_action = QAction("(Error loading lists)", self)
error_action.setEnabled(False)
self.lists_menu.addAction(error_action)
def switch_to_list_timeline(self, list_id: str, list_title: str):
"""Switch to a specific list timeline"""
self.logger.info(f"Switching to list timeline: {list_title} (ID: {list_id})")
# Store current list info
self.current_list_id = list_id
self.current_list_title = list_title
# Update timeline to show list
if self.timeline:
self.timeline.set_timeline_type("list", list_id)
# Update window title to show list
account = self.account_manager.get_active_account()
if account:
account_display = account.get_display_text()
self.setWindowTitle(f"Bifrost - {account_display} - List: {list_title}")
else:
self.setWindowTitle(f"Bifrost - List: {list_title}")
# Update status
self.status_bar.showMessage(f"Viewing list: {list_title}", 3000)
def on_post_sent(self, post_data):
"""Handle post data from compose dialog - USING CENTRALIZED POSTMANAGER"""
self.status_bar.showMessage("Sending post...", 2000)
# Use centralized PostManager instead of duplicate logic
success = self.post_manager.create_post(
content=post_data.get("content", ""),
visibility=post_data.get("visibility", "public"),
content_type=post_data.get("content_type", "text/plain"),
content_warning=post_data.get("content_warning"),
in_reply_to_id=post_data.get("in_reply_to_id"),
poll=post_data.get("poll"),
media_ids=post_data.get("media_ids"),
)
if not success:
self.error_manager.handle_validation_error(
"Failed to start post submission", context="post_creation"
)
def on_post_success(self, result_data):
"""Handle successful post submission - CENTRALIZED VIA POSTMANAGER"""
# Note: Sound is handled by PostManager to avoid duplication
self.error_manager.show_success_message(
"Post sent successfully!",
context="post_creation",
play_sound=False, # PostManager already plays sound
)
# Refresh timeline to show the new post
self.timeline.request_post_action_refresh("post_sent")
def on_post_failed(self, error_message: str):
"""Handle failed post submission - CENTRALIZED VIA POSTMANAGER"""
# Note: Error sound is handled by PostManager to avoid duplication
self.error_manager.handle_api_error(
f"Post failed: {error_message}", context="post_creation"
)
def on_action_success(self, action_type: str, message: str):
"""Handle successful post actions from centralized manager"""
self.status_bar.showMessage(message, 2000)
def on_action_failed(self, action_type: str, error_message: str):
"""Handle failed post actions from centralized manager"""
self.status_bar.showMessage(error_message, 3000)
def on_action_refresh_requested(self, action_type: str):
"""Handle timeline refresh requests from post actions"""
self.timeline.request_post_action_refresh(action_type)
def show_settings(self):
"""Show the settings dialog"""
dialog = SettingsDialog(self)
dialog.settings_changed.connect(self.on_settings_changed)
dialog.exec()
def on_settings_changed(self):
"""Handle settings changes"""
# Reload sound manager with new settings
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.reload_settings()
# Check if refresh mode changed
self.update_refresh_mode()
self.status_bar.showMessage("Settings saved successfully", 2000)
def show_edit_profile(self):
"""Show the profile editing dialog"""
try:
# Check if we have an active account
if not self.account_manager.has_accounts():
AccessibleTextDialog.show_error(
"No Account",
"You need to be logged in to edit your profile.",
"",
self
)
return
active_account = self.account_manager.get_active_account()
if not active_account:
AccessibleTextDialog.show_error(
"No Active Account",
"Please select an active account to edit your profile.",
"",
self
)
return
# Create and show dialog
dialog = ProfileEditDialog(
self.account_manager,
self.timeline.sound_manager if hasattr(self.timeline, 'sound_manager') else None,
self
)
dialog.profile_updated.connect(self.on_profile_updated)
dialog.exec()
except Exception as e:
self.logger.error(f"Failed to show profile edit dialog: {e}")
AccessibleTextDialog.show_error(
"Profile Edit Error",
"Failed to open profile editor.",
str(e),
self
)
if hasattr(self.timeline, 'sound_manager'):
self.timeline.sound_manager.play_error()
def on_profile_updated(self, updated_user):
"""Handle successful profile update"""
self.logger.info(f"Profile updated for user: {updated_user.username}")
self.status_bar.showMessage("Profile updated successfully", 3000)
# Refresh timeline to show any changes (display name, etc.)
if hasattr(self.timeline, 'refresh_timeline'):
self.timeline.refresh_timeline()
def show_soundpack_manager(self):
"""Show the soundpack manager dialog"""
dialog = SoundpackManagerDialog(self.settings, self)
dialog.exec()
def refresh_timeline(self):
"""Refresh the current timeline - DELEGATED TO TIMELINE"""
self.timeline.request_manual_refresh()
self.status_bar.showMessage("Timeline refreshed", 2000)
def on_timeline_tab_changed(self, index):
"""Handle timeline tab change"""
self.switch_timeline(index, from_tab_change=True)
def switch_timeline(self, index, from_tab_change=False):
"""Switch to timeline by index with loading feedback"""
timeline_names = [
"Home",
"Messages",
"Favorites",
"Notifications",
"Local",
"Federated",
"Bookmarks",
"Followers",
"Following",
"Blocked",
"Muted",
]
timeline_types = [
"home",
"conversations",
"favorites",
"notifications",
"local",
"federated",
"bookmarks",
"followers",
"following",
"blocked",
"muted",
]
if 0 <= index < len(timeline_names):
timeline_name = timeline_names[index]
timeline_type = timeline_types[index]
# Prevent duplicate calls for the same timeline only
if (
hasattr(self, "_current_timeline_switching_index")
and self._current_timeline_switching_index == index
):
return
self._current_timeline_switching_index = index
# Set tab to match if called from keyboard shortcut (but not if already from tab change)
if not from_tab_change and self.timeline_tabs.currentIndex() != index:
self.timeline_tabs.setCurrentIndex(index)
# Announce loading
self.status_bar.showMessage(f"Loading {timeline_name} timeline...")
# Switch timeline type
try:
self.timeline.set_timeline_type(timeline_type)
# Restart streaming if in streaming mode
if self.streaming_mode:
self.start_streaming_mode()
# Success feedback through coordinator
if self.sound_coordinator:
self.sound_coordinator.play_success("timeline_switch")
self.status_bar.showMessage(f"Loaded {timeline_name} timeline", 2000)
except Exception as e:
# Error feedback
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_error()
self.status_bar.showMessage(
f"Failed to load {timeline_name} timeline: {str(e)}", 3000
)
finally:
# Reset the index after a brief delay to allow the operation to complete
from PySide6.QtCore import QTimer
QTimer.singleShot(
100, lambda: setattr(self, "_current_timeline_switching_index", None)
)
def get_selected_post(self):
"""Get the currently selected post from timeline"""
current_item = self.timeline.currentItem()
if current_item:
return current_item.data(0, Qt.UserRole)
return None
def reply_to_current_post(self):
"""Reply to the currently selected post"""
post = self.get_selected_post()
if post:
self.reply_to_post(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def boost_current_post(self):
"""Boost the currently selected post"""
post = self.get_selected_post()
if post:
self.boost_post(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def favorite_current_post(self):
"""Favorite the currently selected post"""
post = self.get_selected_post()
if post:
self.favorite_post(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def copy_current_post(self):
"""Copy the currently selected post to clipboard"""
post = self.get_selected_post()
if post:
self.timeline.copy_post_to_clipboard(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def open_current_post_urls(self):
"""Open URLs from the currently selected post"""
post = self.get_selected_post()
if post:
self.timeline.open_urls_in_browser(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def show_first_time_setup(self):
"""Show first-time setup dialog"""
from PySide6.QtWidgets import QMessageBox
result = QMessageBox.question(
self,
"Welcome to Bifrost",
"Welcome to Bifrost! You need to add a fediverse account to get started.\n\n"
"Would you like to add an account now?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes,
)
if result == QMessageBox.Yes:
self.show_login_dialog()
def show_login_dialog(self):
"""Show the login dialog"""
dialog = LoginDialog(self)
dialog.account_added.connect(self.on_account_added)
dialog.exec()
def on_account_added(self, account_data):
"""Handle new account being added"""
self.account_selector.add_account(account_data)
self.update_status_label()
self.status_bar.showMessage(f"Added account: {account_data['username']}", 3000)
# Refresh timeline with new account
self.timeline.request_post_action_refresh("account_action")
def on_account_changed(self, account_id):
"""Handle account switching"""
account = self.account_manager.get_account_by_id(account_id)
if account:
self.update_status_label()
self.status_bar.showMessage(
f"Switched to {account.get_display_text()}", 2000
)
# Handle account switch with proper notification suppression
self.timeline.handle_account_switch()
def reply_to_post(self, post):
"""Reply to a specific post or conversation"""
dialog = ComposeDialog(self.account_manager, self)
# Use the new setup_reply method to handle visibility and text
dialog.setup_reply(post)
# Handle different types of replies
if hasattr(post, "conversation") and post.conversation:
# This is a conversation - send as direct message to participants
dialog.post_sent.connect(
lambda data: self.on_conversation_reply_sent(post, data)
)
else:
# This is a regular post - reply normally
dialog.post_sent.connect(
lambda data: self.on_post_sent({**data, "in_reply_to_id": post.id})
)
dialog.exec()
def on_conversation_reply_sent(self, conversation_post, data):
"""Handle sending a reply to a conversation"""
try:
active_account = self.account_manager.get_active_account()
if not active_account:
return
client = self.account_manager.get_client_for_active_account()
if not client:
return
# Get conversation participants
participants = []
if (
hasattr(conversation_post, "conversation")
and conversation_post.conversation
):
for account in conversation_post.conversation.accounts:
# Don't include ourselves in the mention
if account.acct != active_account.username:
participants.append(f"@{account.acct}")
# Add participants to the message content if not already mentioned
content = data.get("content", "")
for participant in participants:
if participant not in content:
content = f"{participant} {content}"
# Check if this is a Pleroma chat conversation
if (
hasattr(conversation_post.conversation, "chat_id")
and conversation_post.conversation.chat_id
):
# Use Pleroma chat API
try:
result = client.send_pleroma_chat_message(
conversation_post.conversation.chat_id, content.strip()
)
self.logger.info(
f"Sent Pleroma chat message to conversation {conversation_post.conversation.chat_id}"
)
self.post_manager._handle_post_success(result)
# Refresh timeline to show the new message
self.timeline.request_post_action_refresh("conversation_reply")
except Exception as e:
self.logger.error(f"Failed to send Pleroma chat message: {e}")
# Fall back to regular direct message
self.send_direct_message_reply(client, content, data)
else:
# Send as regular direct message
self.send_direct_message_reply(client, content, data)
except Exception as e:
self.logger.error(f"Failed to send conversation reply: {e}")
self.post_manager._handle_post_failed(str(e))
def send_direct_message_reply(self, client, content, data):
"""Send a direct message reply to conversation participants"""
try:
# Create a direct message post
result = client.post_status(
content=content,
visibility="direct",
content_warning=data.get("content_warning"),
media_ids=data.get("media_ids", []),
)
self.logger.info("Sent direct message reply to conversation")
self.post_manager._handle_post_success(result)
# Refresh timeline to show the new message
self.timeline.request_post_action_refresh("conversation_reply")
except Exception as e:
self.logger.error(f"Failed to send direct message reply: {e}")
self.post_manager._handle_post_failed(str(e))
def boost_post(self, post):
"""Boost/unboost a post - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.boost_post(post)
def favorite_post(self, post):
"""Favorite/unfavorite a post - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.favorite_post(post)
def view_profile(self, post):
"""View user profile"""
try:
# Convert Post.account to User-compatible data and open profile dialog
from models.user import User
# Create User object from Account data
account = post.account
account_data = {
"id": account.id,
"username": account.username,
"acct": account.acct,
"display_name": account.display_name,
"note": account.note,
"url": account.url,
"avatar": account.avatar,
"avatar_static": account.avatar_static,
"header": account.header,
"header_static": account.header_static,
"locked": account.locked,
"bot": account.bot,
"discoverable": account.discoverable,
"group": account.group,
"created_at": (
account.created_at.isoformat() if account.created_at else None
),
"followers_count": account.followers_count,
"following_count": account.following_count,
"statuses_count": account.statuses_count,
"fields": [], # Will be loaded from API
"emojis": [], # Will be loaded from API
}
user = User.from_api_dict(account_data)
dialog = ProfileDialog(
user_id=user.id,
account_manager=self.account_manager,
sound_manager=self.timeline.sound_manager,
initial_user=user,
parent=self,
)
dialog.exec()
except Exception as e:
self.status_bar.showMessage(f"Error opening profile: {str(e)}", 3000)
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_error()
def update_status_label(self):
"""Update the status label with current account info"""
active_account = self.account_manager.get_active_account()
if active_account:
self.status_label.setText(
f"Connected as {active_account.get_display_text()}"
)
else:
self.status_label.setText("No account connected")
def quit_application(self):
"""Quit the application with shutdown sound"""
self._shutdown_sound_played = (
True # Mark that we're handling the shutdown sound
)
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_shutdown()
# Wait briefly for sound to start playing
from PySide6.QtCore import QTimer
QTimer.singleShot(500, self.close)
else:
self.close()
def delete_current_post(self):
"""Delete the currently selected post"""
post = self.get_selected_post()
if post:
self.delete_post(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def edit_current_post(self):
"""Edit the currently selected post"""
post = self.get_selected_post()
if post:
self.edit_post(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def follow_current_user(self):
"""Follow the user of the currently selected post"""
post = self.get_selected_post()
if post:
self.follow_user(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def unfollow_current_user(self):
"""Unfollow the user of the currently selected post"""
post = self.get_selected_post()
if post:
self.unfollow_user(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def delete_post(self, post):
"""Delete a post with confirmation dialog - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.delete_post(post, parent_widget=self)
def edit_post(self, post):
"""Edit a post - USING CENTRALIZED POST ACTIONS MANAGER"""
# Check permissions using centralized manager
if not self.post_actions_manager.edit_post(post, parent_widget=self):
return
# Open compose dialog with current post content
dialog = ComposeDialog(self.account_manager, self)
dialog.text_edit.setPlainText(post.get_content_text())
# Move cursor to end
cursor = dialog.text_edit.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
dialog.text_edit.setTextCursor(cursor)
def handle_edit_sent(data):
# Use centralized edit operation
self.post_actions_manager.perform_edit(
post,
new_content=data["content"],
visibility=data["visibility"],
content_type=data.get("content_type", "text/plain"),
content_warning=data["content_warning"]
)
dialog.post_sent.connect(handle_edit_sent)
dialog.exec()
def follow_user(self, post):
"""Follow a user - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.follow_user(post)
def unfollow_user(self, post):
"""Unfollow a user - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.unfollow_user(post)
def show_manual_follow_dialog(self):
"""Show dialog to manually follow a user by @username@instance"""
from PySide6.QtWidgets import (
QDialog,
QVBoxLayout,
QLineEdit,
QLabel,
QDialogButtonBox,
QPushButton,
)
dialog = QDialog(self)
dialog.setWindowTitle("Follow User")
dialog.setMinimumSize(400, 150)
dialog.setModal(True)
layout = QVBoxLayout(dialog)
# Label with clear example
label = QLabel("Enter the user to follow.\nExample: @stormux@social.stormux.org")
label.setAccessibleName("Follow User Instructions")
layout.addWidget(label)
# Input field
self.follow_input = QLineEdit()
self.follow_input.setAccessibleName("Username to Follow")
self.follow_input.setPlaceholderText("@stormux@social.stormux.org")
layout.addWidget(self.follow_input)
# Buttons
button_box = QDialogButtonBox()
follow_button = QPushButton("Follow")
follow_button.setDefault(True)
cancel_button = QPushButton("Cancel")
button_box.addButton(follow_button, QDialogButtonBox.AcceptRole)
button_box.addButton(cancel_button, QDialogButtonBox.RejectRole)
button_box.accepted.connect(dialog.accept)
button_box.rejected.connect(dialog.reject)
layout.addWidget(button_box)
# Show dialog
if dialog.exec() == QDialog.Accepted:
username = self.follow_input.text().strip()
if username:
self.manual_follow_user(username)
else:
self.status_bar.showMessage("Please enter a username", 2000)
def manual_follow_user(self, username):
"""Follow a user by username"""
self.logger.debug(f"manual_follow_user() called with username: {username}")
active_account = self.account_manager.get_active_account()
if not active_account:
self.logger.warning("No active account for manual follow")
self.status_bar.showMessage("Cannot follow: No active account", 2000)
return
# Remove @ prefix if present
if username.startswith("@"):
username = username[1:]
self.logger.debug(f"Processing username after @ removal: {username}")
try:
client = self.account_manager.get_client_for_active_account()
if not client:
self.logger.error("No client available for manual follow")
self.status_bar.showMessage("Cannot follow: No client connection", 2000)
return
self.logger.debug(f"Searching for accounts matching: {username}")
# Search for the account first
accounts = client.search_accounts(username)
self.logger.debug(f"Search returned {len(accounts) if accounts else 0} accounts")
if not accounts:
self.logger.warning(f"No accounts found for search: {username}")
self.status_bar.showMessage(f"User not found: @{username}", 3000)
return
# Find exact match
target_account = None
for account in accounts:
self.logger.debug(f"Checking account: {account.get('acct', 'unknown')} / {account.get('username', 'unknown')}")
if (
account.get("acct") == username
or account.get("username") == username.split("@")[0]
):
target_account = account
self.logger.debug(f"Found matching account: {account.get('acct')}")
break
if not target_account:
self.logger.warning(f"No exact match found for: {username}")
self.status_bar.showMessage(f"Exact user not found: @{username}", 3000)
return
self.logger.debug(f"Attempting to follow account ID: {target_account.get('id')}")
# Follow the account
follow_result = client.follow_account(target_account["id"])
self.logger.info(f"Follow API call completed for {target_account.get('acct')}")
display_name = (
target_account.get("display_name") or target_account["username"]
)
# Check if the follow was successful or if already following
if follow_result and hasattr(follow_result, 'get'):
following_status = follow_result.get('following', False)
requested_status = follow_result.get('requested', False)
if following_status:
# Already following or now following
self.show_detailed_success_dialog(
"Follow Successful",
f"Successfully followed {display_name}",
f"You are now following @{target_account['acct']}"
)
elif requested_status:
# Follow request sent (locked account)
self.show_detailed_success_dialog(
"Follow Request Sent",
f"Follow request sent to {display_name}",
f"Your follow request has been sent to @{target_account['acct']}.\nThey will need to approve it since their account is locked."
)
else:
# Unknown state - show basic success
self.show_detailed_success_dialog(
"Follow Action Completed",
f"Follow action completed for {display_name}",
f"Follow action completed for @{target_account['acct']}"
)
else:
# Fallback for servers that don't return detailed response
self.show_detailed_success_dialog(
"Follow Successful",
f"Successfully followed {display_name}",
f"You are now following @{target_account['acct']}"
)
self.status_bar.showMessage(f"Followed {display_name}", 2000)
# Play follow sound for successful follow
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_follow()
except Exception as e:
self.logger.error(f"Manual follow failed for {username}: {e}")
# Play error sound BEFORE showing dialog
if hasattr(self.timeline, "sound_manager"):
self.timeline.sound_manager.play_error()
# Show detailed error dialog
self.show_detailed_error_dialog(
"Follow Failed",
f"Could not follow @{username}",
str(e)
)
def show_detailed_error_dialog(self, title, message, details):
"""Show a detailed error dialog using reusable AccessibleTextDialog"""
AccessibleTextDialog.show_error(title, message, details, self)
def show_detailed_success_dialog(self, title, message, details):
"""Show a detailed success dialog using reusable AccessibleTextDialog"""
AccessibleTextDialog.show_success(title, message, details, self)
def block_current_user(self):
"""Block the user of the currently selected post"""
post = self.get_selected_post()
if post:
self.block_user(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def mute_current_user(self):
"""Mute the user of the currently selected post"""
post = self.get_selected_post()
if post:
self.mute_user(post)
else:
self.status_bar.showMessage("No post selected", 2000)
def block_user(self, post):
"""Block a user with confirmation dialog - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.block_user(post, parent_widget=self)
def mute_user(self, post):
"""Mute a user - USING CENTRALIZED POST ACTIONS MANAGER"""
self.post_actions_manager.mute_user(post)
def closeEvent(self, event):
"""Handle window close event"""
# Stop streaming before closing
self.stop_streaming_mode()
# Only play shutdown sound if not already played through quit_application
if not hasattr(self, "_shutdown_sound_played") and hasattr(
self.timeline, "sound_manager"
):
self.timeline.sound_manager.play_shutdown()
# Wait briefly for sound to complete
from PySide6.QtCore import QTimer, QEventLoop
loop = QEventLoop()
QTimer.singleShot(500, loop.quit)
loop.exec()
event.accept()