Initial commit.

This commit is contained in:
Storm Dragon
2025-11-11 01:02:38 -05:00
commit 87ac31fafe
36 changed files with 5118 additions and 0 deletions

72
README.md Normal file
View File

@@ -0,0 +1,72 @@
# StormIRC
A completely accessible GUI IRC client designed for users who value both accessibility and good design.
## Features
- **Full Accessibility**: Built with GTK4 for excellent screen reader support
- **Keyboard Navigation**: Complete keyboard control with logical tab order
- **Smart Notifications**: System notifications with customizable highlight patterns
- **IRC Protocol Support**: Full IRC client with join/part, messaging, nick changes
- **Configurable**: Comprehensive settings for servers, accessibility, and UI preferences
- **Chat History Navigation**: Enhanced navigation for screen reader users
## Keyboard Shortcuts
- `F6`: Cycle through main areas (channel list, chat, input)
- `Ctrl+1`: Focus channel list
- `Ctrl+2`: Focus chat area
- `Ctrl+3`: Focus message input
- `Ctrl+J`: Quick join channel
- `Ctrl+W`: Leave current channel
- `Ctrl+N`: Next channel
- `Ctrl+P`: Previous channel
- `Ctrl+Home`: Jump to chat beginning
- `Ctrl+End`: Jump to chat end
- `Alt+Up/Down`: Scroll chat
- `Ctrl+R`: Read last messages
## Installation
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run the application:
```bash
python run_stormirc.py
```
## IRC Commands
- `/join #channel` - Join a channel
- `/part` - Leave current channel
- `/nick newnick` - Change nickname
- `/quit` - Disconnect and quit
## Configuration
StormIRC stores its configuration in `~/.config/stormirc/config.json`. You can:
- Add multiple IRC servers
- Configure accessibility preferences
- Set custom highlight patterns
- Adjust UI settings
Access settings through the gear icon in the header bar.
## Accessibility Features
- Screen reader announcements for important events
- Logical keyboard navigation
- Proper widget labeling and descriptions
- High contrast support ready
- Customizable notification preferences
- Unread message indicators
## Building for those panzies who need GUI
This IRC client proves that accessibility and good UX aren't mutually exclusive. While terminal purists have irssi, this gives everyone else a proper IRC experience that works beautifully with assistive technologies.
Built with Python, GTK4, and lots of care for the user experience.

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
PyGObject>=3.44.0
pycairo>=1.20.0
python-speechd>=0.11.0

BIN
sounds/highlight.wav Executable file

Binary file not shown.

BIN
sounds/private_message.wav Normal file

Binary file not shown.

BIN
sounds/public_message.wav Normal file

Binary file not shown.

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""StormIRC - Accessible IRC Client."""

Binary file not shown.

1
src/config/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Configuration management modules."""

Binary file not shown.

Binary file not shown.

334
src/config/settings.py Normal file
View File

@@ -0,0 +1,334 @@
"""Configuration management for StormIRC."""
import json
import logging
import os
from pathlib import Path
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, asdict, field
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""Server configuration."""
name: str = ""
host: str = ""
port: int = 6667
use_ssl: bool = False
nickname: str = ""
username: str = ""
realname: str = ""
password: str = ""
auto_connect: bool = False
auto_join_channels: List[str] = field(default_factory=list)
# SASL authentication
use_sasl: bool = False
sasl_username: str = ""
sasl_password: str = ""
@dataclass
class ChannelSpeechSettings:
"""Per-channel or per-PM speech settings."""
voice: str = "" # Voice name (empty = use default)
rate: int = 0 # Speech rate offset (-100 to 100, 0 = use default)
output_module: str = "" # Output module (empty = use default)
@dataclass
class AccessibilityConfig:
"""Accessibility configuration."""
enable_notifications: bool = True
enable_sounds: bool = True # Changed to True by default
announce_joins_parts: bool = True
announce_nick_changes: bool = True
mention_highlights: bool = True
chat_history_lines: int = 1000
font_size: int = 12
high_contrast: bool = False
screen_reader_announcements: bool = True
keyboard_shortcuts_enabled: bool = True
# Sound settings (individual control)
sound_public_message: bool = True
sound_private_message: bool = True
sound_highlight: bool = True
# Speech settings
speech_enabled: bool = True
speech_rate: int = 0
speech_pitch: int = 0
speech_volume: int = 0
speech_voice: str = ""
speech_output_module: str = "" # Global output module (espeak-ng, voxin, etc.)
auto_read_messages: bool = True # Automatically read incoming messages
speech_read_own_messages: bool = False # Read your own messages aloud when sent
speech_read_timestamps: bool = False # Read timestamps aloud when show_timestamps is enabled
# Default speech settings for new channels/PMs
default_channel_voice: str = ""
default_channel_rate: int = 0
default_channel_module: str = ""
default_pm_voice: str = ""
default_pm_rate: int = 0
default_pm_module: str = ""
@dataclass
class UIConfig:
"""UI configuration."""
window_width: int = 800
window_height: int = 600
channel_list_width: int = 200
show_timestamps: bool = True
timestamp_format: str = "[%H:%M:%S]"
theme: str = "default"
channel_list_position: str = "left"
@dataclass
class StormIRCConfig:
"""Main configuration class."""
servers: List[ServerConfig] = field(default_factory=list)
accessibility: AccessibilityConfig = field(default_factory=AccessibilityConfig)
ui: UIConfig = field(default_factory=UIConfig)
highlight_patterns: List[str] = field(default_factory=list)
last_connected_server: str = ""
function_keys: Dict[str, str] = field(default_factory=dict) # F1-F12 -> command mapping
channel_speech_settings: Dict[str, Dict[str, Any]] = field(default_factory=dict) # server:channel -> settings
class ConfigManager:
"""Manages configuration loading and saving."""
def __init__(self):
self.config_dir = Path.home() / ".config" / "stormirc"
self.config_file = self.config_dir / "config.json"
self.config = StormIRCConfig()
self.ensure_config_dir()
self.load_config()
def ensure_config_dir(self):
"""Ensure configuration directory exists."""
self.config_dir.mkdir(parents=True, exist_ok=True)
def load_config(self) -> StormIRCConfig:
"""Load configuration from file."""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
data = json.load(f)
self.config = self._dict_to_config(data)
except Exception as e:
logger.error(f"Error loading config: {e}", exc_info=True)
self.config = StormIRCConfig()
else:
self.create_default_config()
return self.config
def save_config(self):
"""Save configuration to file."""
try:
config_dict = self._config_to_dict(self.config)
with open(self.config_file, 'w') as f:
json.dump(config_dict, f, indent=2)
except Exception as e:
logger.error(f"Error saving config: {e}", exc_info=True)
def create_default_config(self):
"""Create default configuration."""
self.config = StormIRCConfig()
default_server = ServerConfig(
name="Libera.Chat",
host="irc.libera.chat",
port=6697,
use_ssl=True,
nickname="StormUser",
username="stormuser",
realname="Storm User"
)
self.config.servers.append(default_server)
self.save_config()
def _config_to_dict(self, config: StormIRCConfig) -> Dict[str, Any]:
"""Convert config to dictionary."""
return {
'servers': [asdict(server) for server in config.servers],
'accessibility': asdict(config.accessibility),
'ui': asdict(config.ui),
'highlight_patterns': config.highlight_patterns,
'last_connected_server': config.last_connected_server,
'function_keys': config.function_keys,
'channel_speech_settings': config.channel_speech_settings
}
def _dict_to_config(self, data: Dict[str, Any]) -> StormIRCConfig:
"""Convert dictionary to config."""
config = StormIRCConfig()
if 'servers' in data:
config.servers = [
ServerConfig(**server_data)
for server_data in data['servers']
]
if 'accessibility' in data:
# Handle backward compatibility - remove deprecated fields
accessibility_data = data['accessibility'].copy()
# Remove old speech_include_timestamps field if it exists
accessibility_data.pop('speech_include_timestamps', None)
config.accessibility = AccessibilityConfig(**accessibility_data)
if 'ui' in data:
config.ui = UIConfig(**data['ui'])
if 'highlight_patterns' in data:
config.highlight_patterns = data['highlight_patterns']
if 'last_connected_server' in data:
config.last_connected_server = data['last_connected_server']
if 'function_keys' in data:
config.function_keys = data['function_keys']
if 'channel_speech_settings' in data:
config.channel_speech_settings = data['channel_speech_settings']
return config
def add_server(self, server: ServerConfig):
"""Add server to configuration."""
self.config.servers.append(server)
self.save_config()
def remove_server(self, server_name: str):
"""Remove server from configuration."""
self.config.servers = [
s for s in self.config.servers
if s.name != server_name
]
self.save_config()
def get_server(self, server_name: str) -> Optional[ServerConfig]:
"""Get server configuration by name."""
for server in self.config.servers:
if server.name == server_name:
return server
return None
def update_server(self, server_name: str, updated_server: ServerConfig):
"""Update server configuration."""
for i, server in enumerate(self.config.servers):
if server.name == server_name:
self.config.servers[i] = updated_server
self.save_config()
break
def get_accessibility_config(self) -> AccessibilityConfig:
"""Get accessibility configuration."""
return self.config.accessibility
def update_accessibility_config(self, accessibility: AccessibilityConfig):
"""Update accessibility configuration."""
self.config.accessibility = accessibility
self.save_config()
def get_ui_config(self) -> UIConfig:
"""Get UI configuration."""
return self.config.ui
def update_ui_config(self, ui: UIConfig):
"""Update UI configuration."""
self.config.ui = ui
self.save_config()
def add_highlight_pattern(self, pattern: str):
"""Add highlight pattern."""
if pattern not in self.config.highlight_patterns:
self.config.highlight_patterns.append(pattern)
self.save_config()
def remove_highlight_pattern(self, pattern: str):
"""Remove highlight pattern."""
if pattern in self.config.highlight_patterns:
self.config.highlight_patterns.remove(pattern)
self.save_config()
def export_config(self, file_path: str):
"""Export configuration to file."""
try:
config_dict = self._config_to_dict(self.config)
with open(file_path, 'w') as f:
json.dump(config_dict, f, indent=2)
return True
except Exception as e:
logger.error(f"Error exporting config: {e}", exc_info=True)
return False
def import_config(self, file_path: str):
"""Import configuration from file."""
try:
with open(file_path, 'r') as f:
data = json.load(f)
self.config = self._dict_to_config(data)
self.save_config()
return True
except Exception as e:
logger.error(f"Error importing config: {e}", exc_info=True)
return False
def get_channel_speech_settings(self, server_name: str, target: str) -> ChannelSpeechSettings:
"""Get speech settings for a specific channel or PM."""
key = f"{server_name}:{target}"
if key in self.config.channel_speech_settings:
settings_dict = self.config.channel_speech_settings[key]
return ChannelSpeechSettings(**settings_dict)
# Return defaults based on whether it's a channel or PM
is_channel = target.startswith('#') or target.startswith('&')
if is_channel:
return ChannelSpeechSettings(
voice=self.config.accessibility.default_channel_voice,
rate=self.config.accessibility.default_channel_rate,
output_module=self.config.accessibility.default_channel_module
)
else:
return ChannelSpeechSettings(
voice=self.config.accessibility.default_pm_voice,
rate=self.config.accessibility.default_pm_rate,
output_module=self.config.accessibility.default_pm_module
)
def set_channel_speech_settings(self, server_name: str, target: str, settings: ChannelSpeechSettings):
"""Set speech settings for a specific channel or PM."""
key = f"{server_name}:{target}"
self.config.channel_speech_settings[key] = asdict(settings)
self.save_config()
def remove_channel_speech_settings(self, server_name: str, target: str):
"""Remove custom speech settings for a channel/PM (revert to defaults)."""
key = f"{server_name}:{target}"
if key in self.config.channel_speech_settings:
del self.config.channel_speech_settings[key]
self.save_config()
def is_valid_text(text: str) -> bool:
"""
Check if text contains meaningful content worth speaking.
Returns True if text has alphanumeric characters.
"""
if not text:
return False
# Check if there's at least one alphanumeric character
return any(c.isalnum() for c in text)

1
src/irc/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""IRC protocol handling modules."""

Binary file not shown.

Binary file not shown.

574
src/irc/client.py Normal file
View File

@@ -0,0 +1,574 @@
"""IRC client implementation."""
import socket
import ssl
import threading
import base64
import logging
from typing import Optional, Callable, Dict, List
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class IRCMessage:
"""Represents an IRC message."""
prefix: Optional[str] = None
command: str = ""
params: List[str] = None
def __post_init__(self):
if self.params is None:
self.params = []
class IRCClient:
"""IRC client implementation."""
def __init__(self):
self.socket: Optional[socket.socket] = None
self.connected = False
self.nickname = ""
self.username = ""
self.realname = ""
self.host = ""
self.port = 6667
self.use_ssl = False
self.channels: Dict[str, bool] = {}
self.channel_topics: Dict[str, str] = {}
self.channel_users: Dict[str, List[str]] = {}
self.channel_list: List[Dict[str, str]] = [] # List of available channels
self.message_handlers: Dict[str, List[Callable]] = {}
self._receive_thread: Optional[threading.Thread] = None
self.auto_reconnect = True
self.reconnect_delay = 5 # seconds
self.max_reconnect_delay = 300 # 5 minutes max
# SASL/CAP negotiation
self.sasl_username = ""
self.sasl_password = ""
self.cap_negotiating = False
self.cap_acknowledged: List[str] = []
self.registration_complete = False
def connect(self, host: str, port: int = 6667, use_ssl: bool = False) -> bool:
"""Connect to IRC server."""
try:
logger.info(f"Attempting to connect to {host}:{port} (SSL: {use_ssl})")
self.host = host
self.port = port
self.use_ssl = use_ssl
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logger.debug("Socket created")
if use_ssl:
context = ssl.create_default_context()
self.socket = context.wrap_socket(self.socket, server_hostname=host)
logger.debug("SSL context wrapped")
self.socket.connect((host, port))
self.connected = True
logger.info(f"Successfully connected to {host}:{port}")
self._receive_thread = threading.Thread(target=self._receive_messages)
self._receive_thread.daemon = True
self._receive_thread.start()
logger.debug("Receive thread started")
return True
except Exception as e:
logger.error(f"Connection failed: {e}", exc_info=True)
self.emit_event('error', f"Connection failed: {e}")
return False
def disconnect(self):
"""Disconnect from IRC server."""
if self.connected and self.socket:
self.send_raw("QUIT :StormIRC shutting down")
self.socket.close()
self.connected = False
def send_raw(self, message: str):
"""Send raw IRC message."""
if not self.connected or not self.socket:
logger.warning(f"Cannot send message - not connected: {message}")
return False
try:
# Don't log passwords
if 'PASS' in message or 'AUTHENTICATE' in message:
logger.debug(">>> [authentication message redacted]")
else:
logger.debug(f">>> {message}")
encoded_msg = (message + '\r\n').encode('utf-8')
self.socket.send(encoded_msg)
return True
except Exception as e:
logger.error(f"Send failed: {e}", exc_info=True)
self.emit_event('error', f"Send failed: {e}")
return False
def _receive_messages(self):
"""Receive messages from server (runs in thread)."""
logger.debug("Receive thread running")
buffer = ""
while self.connected and self.socket:
try:
data = self.socket.recv(4096).decode('utf-8', errors='ignore')
if not data:
logger.warning("Received empty data, connection closed by server")
break
buffer += data
while '\r\n' in buffer:
line, buffer = buffer.split('\r\n', 1)
if line:
self._handle_message(line)
except Exception as e:
if self.connected:
logger.error(f"Receive error: {e}", exc_info=True)
self.emit_event('error', f"Receive error: {e}")
break
was_connected = self.connected
self.connected = False
logger.info("Receive thread ending, was_connected: %s", was_connected)
if was_connected:
self.emit_event('disconnected', "Connection lost")
def _handle_message(self, raw_message: str):
"""Parse and handle IRC message."""
logger.debug(f"<<< {raw_message}")
message = self._parse_message(raw_message)
if message.command == "PING":
self.send_raw(f"PONG :{message.params[0]}")
return
# Handle CAP negotiation
if message.command == "CAP":
self._handle_cap(message)
return
# Handle AUTHENTICATE for SASL
if message.command == "AUTHENTICATE":
self._handle_authenticate(message)
return
# Handle 903 (SASL success)
if message.command == "903":
self._handle_sasl_success()
return
# Handle 904/905 (SASL failure)
if message.command in ["904", "905"]:
self._handle_sasl_failure(message)
return
# Handle 353 (NAMES reply)
if message.command == "353":
self._handle_names_reply(message)
# Handle 366 (End of NAMES)
if message.command == "366":
self._handle_names_end(message)
# Handle JOIN to track users
if message.command == "JOIN":
self._handle_user_join(message)
# Handle PART to track users
if message.command == "PART":
self._handle_user_part(message)
# Handle QUIT to track users
if message.command == "QUIT":
self._handle_user_quit(message)
# Handle NICK to track nick changes
if message.command == "NICK":
self._handle_nick_change(message)
# Handle 321 (List start)
if message.command == "321":
self._handle_list_start(message)
# Handle 322 (List item)
if message.command == "322":
self._handle_list_item(message)
# Handle 323 (List end)
if message.command == "323":
self._handle_list_end(message)
# Handle 331 (No topic set)
if message.command == "331":
self._handle_no_topic(message)
# Handle 332 (Topic)
if message.command == "332":
self._handle_topic(message)
# Handle 333 (Topic info - who set it and when)
if message.command == "333":
self._handle_topic_info(message)
# Handle TOPIC command
if message.command == "TOPIC":
self._handle_topic_change(message)
self.emit_event('raw_message', raw_message)
self.emit_event(message.command.lower(), message)
def _parse_message(self, raw: str) -> IRCMessage:
"""Parse raw IRC message."""
message = IRCMessage()
if raw.startswith(':'):
prefix_end = raw.find(' ')
message.prefix = raw[1:prefix_end]
raw = raw[prefix_end + 1:]
parts = raw.split(' ')
message.command = parts[0].upper()
params = parts[1:]
for i, param in enumerate(params):
if param.startswith(':'):
message.params.append(' '.join(params[i:])[1:])
break
message.params.append(param)
return message
def register(self, nickname: str, username: str, realname: str, sasl_username: str = "", sasl_password: str = ""):
"""Register with IRC server."""
logger.info(f"Registering as {nickname} (user: {username}, SASL: {bool(sasl_username)})")
self.nickname = nickname
self.username = username
self.realname = realname
self.sasl_username = sasl_username
self.sasl_password = sasl_password
# If SASL credentials provided, start CAP negotiation
if sasl_username and sasl_password:
logger.debug("Starting CAP negotiation for SASL")
self.cap_negotiating = True
self.send_raw("CAP LS 302")
self.send_raw(f"NICK {nickname}")
self.send_raw(f"USER {username} 0 * :{realname}")
def join_channel(self, channel: str, key: str = ""):
"""Join a channel."""
if key:
self.send_raw(f"JOIN {channel} {key}")
else:
self.send_raw(f"JOIN {channel}")
self.channels[channel] = True
def part_channel(self, channel: str, message: str = ""):
"""Leave a channel."""
if message:
self.send_raw(f"PART {channel} :{message}")
else:
self.send_raw(f"PART {channel}")
if channel in self.channels:
del self.channels[channel]
def send_message(self, target: str, message: str):
"""Send message to channel or user."""
logger.info(f"send_message: target={target}, message={message}")
self.send_raw(f"PRIVMSG {target} :{message}")
def change_nick(self, new_nick: str):
"""Change nickname."""
self.send_raw(f"NICK {new_nick}")
self.nickname = new_nick
def get_topic(self, channel: str):
"""Request channel topic."""
self.send_raw(f"TOPIC {channel}")
def set_topic(self, channel: str, topic: str):
"""Set channel topic."""
self.send_raw(f"TOPIC {channel} :{topic}")
def whois(self, nickname: str):
"""Request WHOIS information for a user."""
self.send_raw(f"WHOIS {nickname}")
def add_handler(self, event: str, handler: Callable):
"""Add event handler."""
if event not in self.message_handlers:
self.message_handlers[event] = []
self.message_handlers[event].append(handler)
def remove_handler(self, event: str, handler: Callable):
"""Remove event handler."""
if event in self.message_handlers:
self.message_handlers[event].remove(handler)
def emit_event(self, event: str, data):
"""Emit event to handlers."""
logger.debug(f"Emitting event '{event}' (handlers registered: {len(self.message_handlers.get(event, []))})")
if event in self.message_handlers:
for handler in self.message_handlers[event]:
try:
handler(data)
except Exception as e:
logger.error(f"Handler error for event '{event}': {e}", exc_info=True)
def _handle_cap(self, message: IRCMessage):
"""Handle CAP responses."""
if len(message.params) < 2:
return
subcommand = message.params[1]
# CAP LS - list available capabilities
if subcommand == "LS":
if len(message.params) >= 3:
available_caps = message.params[2].split()
# Request SASL if available
if "sasl" in available_caps:
self.send_raw("CAP REQ :sasl")
else:
# No SASL available, end negotiation
self.send_raw("CAP END")
self.cap_negotiating = False
# CAP ACK - capability acknowledged
elif subcommand == "ACK":
if len(message.params) >= 3:
acked_caps = message.params[2].split()
self.cap_acknowledged.extend(acked_caps)
# If SASL was acknowledged, start authentication
if "sasl" in acked_caps:
self.send_raw("AUTHENTICATE PLAIN")
# CAP NAK - capability not available
elif subcommand == "NAK":
# End CAP negotiation if request was denied
self.send_raw("CAP END")
self.cap_negotiating = False
self.emit_event('sasl_failed', "SASL capability denied")
def _handle_authenticate(self, message: IRCMessage):
"""Handle AUTHENTICATE response."""
if len(message.params) < 1:
return
# Server sends "+" to indicate ready for credentials
if message.params[0] == "+":
# Send SASL PLAIN credentials
# Format: \0username\0password
auth_string = f"\0{self.sasl_username}\0{self.sasl_password}"
auth_base64 = base64.b64encode(auth_string.encode('utf-8')).decode('ascii')
# Split into 400-byte chunks if needed
if len(auth_base64) > 400:
# Send in chunks
for i in range(0, len(auth_base64), 400):
chunk = auth_base64[i:i+400]
self.send_raw(f"AUTHENTICATE {chunk}")
# Send "+" to indicate end
if len(auth_base64) % 400 == 0:
self.send_raw("AUTHENTICATE +")
else:
self.send_raw(f"AUTHENTICATE {auth_base64}")
def _handle_sasl_success(self):
"""Handle SASL authentication success (903)."""
self.send_raw("CAP END")
self.cap_negotiating = False
self.emit_event('sasl_success', "SASL authentication successful")
def _handle_sasl_failure(self, message: IRCMessage):
"""Handle SASL authentication failure (904/905)."""
self.send_raw("CAP END")
self.cap_negotiating = False
error_msg = message.params[-1] if message.params else "SASL authentication failed"
self.emit_event('sasl_failed', error_msg)
def _handle_names_reply(self, message: IRCMessage):
"""Handle 353 NAMES reply - list of users in channel."""
# Format: :server 353 nickname = #channel :user1 @user2 +user3
if len(message.params) < 4:
return
channel = message.params[2]
names_str = message.params[3]
# Initialize channel user list if not exists
if channel not in self.channel_users:
self.channel_users[channel] = []
# Parse user list (may have prefixes like @, +, etc.)
for name in names_str.split():
# Strip mode prefixes (@, +, %, etc.) but keep the nickname
nick = name.lstrip('@+%~&!')
if nick and nick not in self.channel_users[channel]:
self.channel_users[channel].append(nick)
logger.debug(f"NAMES for {channel}: {len(self.channel_users[channel])} users")
def _handle_names_end(self, message: IRCMessage):
"""Handle 366 End of NAMES - all users have been listed."""
# Format: :server 366 nickname #channel :End of /NAMES list.
if len(message.params) < 2:
return
channel = message.params[1]
logger.info(f"End of NAMES for {channel}, total users: {len(self.channel_users.get(channel, []))}")
# Emit event so UI can update user list
self.emit_event('names_end', {'channel': channel, 'users': self.channel_users.get(channel, [])})
def _handle_user_join(self, message: IRCMessage):
"""Handle JOIN message to track users."""
if not message.prefix or len(message.params) < 1:
return
nick = message.prefix.split('!')[0]
channel = message.params[0]
# Add user to channel list
if channel not in self.channel_users:
self.channel_users[channel] = []
if nick not in self.channel_users[channel]:
self.channel_users[channel].append(nick)
logger.debug(f"{nick} joined {channel}, now {len(self.channel_users[channel])} users")
def _handle_user_part(self, message: IRCMessage):
"""Handle PART message to track users."""
if not message.prefix or len(message.params) < 1:
return
nick = message.prefix.split('!')[0]
channel = message.params[0]
# Remove user from channel list
if channel in self.channel_users and nick in self.channel_users[channel]:
self.channel_users[channel].remove(nick)
logger.debug(f"{nick} left {channel}, now {len(self.channel_users[channel])} users")
def _handle_user_quit(self, message: IRCMessage):
"""Handle QUIT message to track users leaving all channels."""
if not message.prefix:
return
nick = message.prefix.split('!')[0]
# Remove user from all channels
for channel in self.channel_users:
if nick in self.channel_users[channel]:
self.channel_users[channel].remove(nick)
logger.debug(f"{nick} quit, removed from {channel}")
def _handle_nick_change(self, message: IRCMessage):
"""Handle NICK message to track nickname changes."""
if not message.prefix or len(message.params) < 1:
return
old_nick = message.prefix.split('!')[0]
new_nick = message.params[0]
# Update nickname in all channels
for channel in self.channel_users:
if old_nick in self.channel_users[channel]:
idx = self.channel_users[channel].index(old_nick)
self.channel_users[channel][idx] = new_nick
logger.debug(f"{old_nick} changed nick to {new_nick} in {channel}")
# Update our own nickname if it's us
if old_nick == self.nickname:
self.nickname = new_nick
logger.info(f"Our nickname changed to {new_nick}")
def _handle_list_start(self, message: IRCMessage):
"""Handle 321 - Start of LIST."""
# Format: :server 321 nickname Channel :Users Name
logger.info("Starting channel list")
self.channel_list = [] # Clear previous list
def _handle_list_item(self, message: IRCMessage):
"""Handle 322 - Channel list item."""
# Format: :server 322 nickname #channel user_count :topic
if len(message.params) < 3:
return
channel = message.params[1]
user_count = message.params[2]
topic = message.params[3] if len(message.params) > 3 else ""
self.channel_list.append({
'channel': channel,
'users': user_count,
'topic': topic
})
logger.debug(f"LIST item: {channel} ({user_count} users)")
def _handle_list_end(self, message: IRCMessage):
"""Handle 323 - End of LIST."""
# Format: :server 323 nickname :End of /LIST
logger.info(f"End of LIST, total channels: {len(self.channel_list)}")
self.emit_event('list_end', self.channel_list)
def _handle_no_topic(self, message: IRCMessage):
"""Handle 331 - No topic set."""
# Format: :server 331 nickname #channel :No topic is set
if len(message.params) < 2:
return
channel = message.params[1]
self.channel_topics[channel] = ""
logger.info(f"No topic set for {channel}")
self.emit_event('topic', {'channel': channel, 'topic': '', 'setter': None, 'time': None})
def _handle_topic(self, message: IRCMessage):
"""Handle 332 - Topic."""
# Format: :server 332 nickname #channel :topic text
if len(message.params) < 3:
return
channel = message.params[1]
topic = message.params[2]
self.channel_topics[channel] = topic
logger.info(f"Topic for {channel}: {topic}")
self.emit_event('topic', {'channel': channel, 'topic': topic, 'setter': None, 'time': None})
def _handle_topic_info(self, message: IRCMessage):
"""Handle 333 - Topic info (who set it and when)."""
# Format: :server 333 nickname #channel setter timestamp
if len(message.params) < 4:
return
channel = message.params[1]
setter = message.params[2]
timestamp = message.params[3]
logger.debug(f"Topic for {channel} set by {setter} at {timestamp}")
# We already emitted the topic in 332, so just log this
def _handle_topic_change(self, message: IRCMessage):
"""Handle TOPIC command (someone changed the topic)."""
# Format: :nick!user@host TOPIC #channel :new topic
if not message.prefix or len(message.params) < 1:
return
setter = message.prefix.split('!')[0]
channel = message.params[0]
topic = message.params[1] if len(message.params) > 1 else ""
self.channel_topics[channel] = topic
logger.info(f"{setter} changed topic for {channel}: {topic}")
self.emit_event('topic', {'channel': channel, 'topic': topic, 'setter': setter, 'time': None})

1
src/ui/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""UI modules for StormIRC."""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

48
src/ui/accessible_tree.py Normal file
View File

@@ -0,0 +1,48 @@
"""
Accessible tree widget - simplified version from bifrost for IRC use
"""
from PySide6.QtWidgets import QTreeWidget, QTreeWidgetItem
from PySide6.QtCore import Qt, Signal
from PySide6.QtGui import QKeyEvent
class AccessibleTreeWidget(QTreeWidget):
"""Tree widget with enhanced accessibility"""
def __init__(self, parent=None):
super().__init__(parent)
self.setup_accessibility()
def setup_accessibility(self):
"""Set up accessibility features"""
self.setFocusPolicy(Qt.StrongFocus)
self.itemExpanded.connect(self.on_item_expanded)
self.itemCollapsed.connect(self.on_item_collapsed)
def on_item_expanded(self, item: QTreeWidgetItem):
"""Handle item expansion"""
# Make child items accessible
self.update_child_accessibility(item, True)
def on_item_collapsed(self, item: QTreeWidgetItem):
"""Handle item collapse"""
# Hide child items from screen readers
self.update_child_accessibility(item, False)
def update_child_accessibility(self, item: QTreeWidgetItem, visible: bool):
"""Update accessibility properties of child items"""
for i in range(item.childCount()):
child = item.child(i)
if visible:
# Make child accessible
child.setFlags(child.flags() | Qt.ItemIsEnabled | Qt.ItemIsSelectable)
child.setData(0, Qt.AccessibleDescriptionRole, "")
# Recursively handle nested children
self.update_child_accessibility(child, child.isExpanded())
else:
# Hide from screen readers
child.setData(0, Qt.AccessibleDescriptionRole, "hidden")
child.setFlags((child.flags() | Qt.ItemIsEnabled) & ~Qt.ItemIsSelectable)
# Hide all nested children too
self.update_child_accessibility(child, False)

View File

@@ -0,0 +1,288 @@
"""Autocomplete text edit widget for IRC channel and nickname completion."""
import logging
from PySide6.QtWidgets import QPlainTextEdit, QCompleter, QListWidget
from PySide6.QtCore import Qt, QStringListModel
from PySide6.QtGui import QKeyEvent, QTextCursor
logger = logging.getLogger(__name__)
class AutocompleteTextEdit(QPlainTextEdit):
"""QPlainTextEdit with autocomplete support for channels (#) and nicknames."""
def __init__(self, parent=None):
super().__init__(parent)
self.completer = None
self.completion_start = 0
self.completion_prefix = ""
self.completion_type = None # 'channel' or 'nickname'
# Data sources for completion
self.channels = [] # List of available channels
self.nicknames = [] # List of nicknames in current channel
# Setup completer
self.setup_completer()
def setup_completer(self):
"""Initialize the completer."""
self.completer = QCompleter()
self.completer.setWidget(self)
self.completer.setCaseSensitivity(Qt.CaseInsensitive)
self.completer.setCompletionMode(QCompleter.PopupCompletion)
# Connect activation signal
self.completer.activated.connect(self.insert_completion)
def set_channels(self, channels):
"""Update the list of available channels."""
self.channels = sorted(channels)
logger.debug(f"Autocomplete channels updated: {self.channels}")
def set_nicknames(self, nicknames):
"""Update the list of nicknames for the current channel."""
self.nicknames = sorted(nicknames)
logger.debug(f"Autocomplete nicknames updated: {len(self.nicknames)} users")
def keyPressEvent(self, event: QKeyEvent):
"""Handle key press events for autocomplete."""
key = event.key()
# Handle autocomplete navigation when popup is visible
if self.completer and self.completer.popup().isVisible():
if key in [Qt.Key_Up, Qt.Key_Down, Qt.Key_Return, Qt.Key_Enter, Qt.Key_Tab]:
self.handle_completer_key(key)
return
elif key == Qt.Key_Escape:
self.hide_completer()
return
# Handle Enter/Return key - should never insert newline (let parent handle sending)
if key in [Qt.Key_Return, Qt.Key_Enter]:
# Don't handle it here - let the parent's event filter handle it
# This prevents inserting a newline
event.ignore()
return
# Handle Tab key for smart completion or focus change
if key == Qt.Key_Tab:
full_text = self.toPlainText()
if not full_text.strip():
# Text box is empty - allow focus change to next control
event.ignore()
return
# Get the character before cursor to determine behavior
cursor = self.textCursor()
pos = cursor.position()
if pos > 0:
# Use full_text (not stripped) to get correct character position
char_before = full_text[pos - 1]
# If last character is alphanumeric or #, try autocomplete
if char_before.isalnum() or char_before == '#':
if self.try_autocomplete():
return # Completion was triggered
# No completion available, consume the event
return
else:
# Last character is punctuation or whitespace - move to next control
event.ignore()
return
else:
# At beginning, allow focus change
event.ignore()
return
# Normal key processing
super().keyPressEvent(event)
# After typing, check if we should update the completer
if self.completer and self.completer.popup().isVisible():
self.update_completer()
def handle_completer_key(self, key):
"""Handle navigation keys in completer popup."""
popup = self.completer.popup()
if key in [Qt.Key_Up, Qt.Key_Down]:
# Let the popup handle up/down navigation
if key == Qt.Key_Up:
current = popup.currentIndex().row()
if current > 0:
popup.setCurrentIndex(popup.model().index(current - 1, 0))
else:
current = popup.currentIndex().row()
if current < popup.model().rowCount() - 1:
popup.setCurrentIndex(popup.model().index(current + 1, 0))
elif key in [Qt.Key_Return, Qt.Key_Enter, Qt.Key_Tab]:
# Insert the selected completion
self.insert_completion(popup.currentIndex().data())
def try_autocomplete(self):
"""Try to trigger autocomplete at current cursor position."""
cursor = self.textCursor()
text = self.toPlainText()
pos = cursor.position()
if pos == 0:
return False
# Find the current word being typed (back to last space/newline/tab)
start_pos = pos - 1
while start_pos >= 0 and text[start_pos] not in [' ', '\n', '\t']:
start_pos -= 1
start_pos += 1
current_word = text[start_pos:pos]
if not current_word:
return False
# Determine what type of completion to show
if current_word.startswith('#'):
# Channel completion
prefix = current_word[1:] # Remove #
return self.show_channel_completer(prefix, start_pos)
else:
# Nickname completion
return self.show_nickname_completer(current_word, start_pos)
def show_channel_completer(self, prefix, start_pos):
"""Show channel completion popup."""
if not self.channels:
logger.debug("No channels available for completion")
return False
# Filter channels by prefix
matches = [ch for ch in self.channels if ch.lower().startswith(prefix.lower())]
if not matches:
logger.debug(f"No channel matches for prefix: {prefix}")
self.hide_completer()
return False
logger.debug(f"Found {len(matches)} channel matches for '{prefix}': {matches}")
# Update completer model
model = QStringListModel(matches)
self.completer.setModel(model)
# Store completion info
self.completion_start = start_pos
self.completion_prefix = prefix
self.completion_type = 'channel'
# Show popup
cursor = self.textCursor()
cursor.setPosition(start_pos)
rect = self.cursorRect(cursor)
rect.setWidth(self.completer.popup().sizeHintForColumn(0)
+ self.completer.popup().verticalScrollBar().sizeHint().width())
self.completer.complete(rect)
return True
def show_nickname_completer(self, prefix, start_pos):
"""Show nickname completion popup."""
if not self.nicknames:
logger.debug("No nicknames available for completion")
return False
# Filter nicknames by prefix
matches = [nick for nick in self.nicknames if nick.lower().startswith(prefix.lower())]
if not matches:
logger.debug(f"No nickname matches for prefix: {prefix}")
self.hide_completer()
return False
logger.debug(f"Found {len(matches)} nickname matches for '{prefix}': {matches[:5]}...")
# Update completer model
model = QStringListModel(matches)
self.completer.setModel(model)
# Store completion info
self.completion_start = start_pos
self.completion_prefix = prefix
self.completion_type = 'nickname'
# Show popup
cursor = self.textCursor()
cursor.setPosition(start_pos)
rect = self.cursorRect(cursor)
rect.setWidth(self.completer.popup().sizeHintForColumn(0)
+ self.completer.popup().verticalScrollBar().sizeHint().width())
self.completer.complete(rect)
return True
def update_completer(self):
"""Update completer matches as user types."""
cursor = self.textCursor()
text = self.toPlainText()
pos = cursor.position()
# Find current word
start_pos = pos - 1
while start_pos >= 0 and text[start_pos] not in [' ', '\n', '\t']:
start_pos -= 1
start_pos += 1
current_word = text[start_pos:pos]
# Update based on completion type
if self.completion_type == 'channel' and current_word.startswith('#'):
prefix = current_word[1:]
matches = [ch for ch in self.channels if ch.lower().startswith(prefix.lower())]
if matches:
model = QStringListModel(matches)
self.completer.setModel(model)
else:
self.hide_completer()
elif self.completion_type == 'nickname':
matches = [nick for nick in self.nicknames if nick.lower().startswith(current_word.lower())]
if matches:
model = QStringListModel(matches)
self.completer.setModel(model)
else:
self.hide_completer()
else:
self.hide_completer()
def hide_completer(self):
"""Hide the completer popup."""
if self.completer:
self.completer.popup().hide()
def insert_completion(self, completion):
"""Insert the selected completion."""
if not completion:
return
logger.debug(f"Inserting completion: {completion} (type: {self.completion_type})")
# Get cursor and select the text to replace
cursor = self.textCursor()
cursor.setPosition(self.completion_start)
cursor.setPosition(cursor.position() + len(self.completion_prefix) + (1 if self.completion_type == 'channel' else 0), QTextCursor.KeepAnchor)
# Insert completion with appropriate formatting
if self.completion_type == 'channel':
cursor.insertText(f"#{completion} ")
elif self.completion_type == 'nickname':
# Check if we're at the beginning of a line (for nickname: format)
line_start = cursor.position() == 0 or self.toPlainText()[self.completion_start - 1] in ['\n']
if line_start:
cursor.insertText(f"{completion}: ")
else:
cursor.insertText(f"{completion} ")
# Update cursor position
self.setTextCursor(cursor)
self.hide_completer()

151
src/ui/logger.py Normal file
View File

@@ -0,0 +1,151 @@
"""Message logging for StormIRC."""
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class MessageLogger:
"""Handles logging IRC messages to disk."""
def __init__(self, log_dir: Optional[str] = None):
"""Initialize message logger.
Args:
log_dir: Directory to store logs. Defaults to ~/.config/stormirc/logs/
"""
if log_dir is None:
log_dir = os.path.expanduser("~/.config/stormirc/logs")
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.enabled = True
self.log_files = {} # Cache of open log file handles
def set_enabled(self, enabled: bool):
"""Enable or disable logging."""
self.enabled = enabled
if not enabled:
self._close_all_files()
def log_message(self, server: str, target: str, message: str, timestamp: Optional[datetime] = None):
"""Log a message to disk.
Args:
server: Server name/host
target: Channel or nickname
message: The message to log
timestamp: Optional timestamp (defaults to now)
"""
if not self.enabled:
return
if timestamp is None:
timestamp = datetime.now()
# Sanitize server and target for filenames
safe_server = self._sanitize_filename(server)
safe_target = self._sanitize_filename(target)
# Create server directory if needed
server_dir = self.log_dir / safe_server
server_dir.mkdir(exist_ok=True)
# Create log file path: server/target-YYYY-MM-DD.log
date_str = timestamp.strftime("%Y-%m-%d")
log_file = server_dir / f"{safe_target}-{date_str}.log"
# Format: [HH:MM:SS] message
time_str = timestamp.strftime("%H:%M:%S")
log_line = f"[{time_str}] {message}\n"
# Append to log file
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_line)
except Exception as e:
logger.error(f"Error writing to log file: {e}", exc_info=True)
def read_recent_messages(self, server: str, target: str, limit: int = 100):
"""Read recent messages from log.
Args:
server: Server name/host
target: Channel or nickname
limit: Maximum number of messages to return
Returns:
List of message strings (without timestamps)
"""
safe_server = self._sanitize_filename(server)
safe_target = self._sanitize_filename(target)
server_dir = self.log_dir / safe_server
if not server_dir.exists():
return []
# Get today's log file
date_str = datetime.now().strftime("%Y-%m-%d")
log_file = server_dir / f"{safe_target}-{date_str}.log"
if not log_file.exists():
# Try yesterday's file if today's doesn't exist
yesterday = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = yesterday.replace(day=yesterday.day - 1)
date_str = yesterday.strftime("%Y-%m-%d")
log_file = server_dir / f"{safe_target}-{date_str}.log"
if not log_file.exists():
return []
try:
with open(log_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Return last 'limit' lines
return [line.rstrip('\n') for line in lines[-limit:]]
except Exception as e:
logger.error(f"Error reading log file: {e}", exc_info=True)
return []
def _sanitize_filename(self, name: str) -> str:
"""Sanitize a string for use as a filename.
Args:
name: The string to sanitize
Returns:
Sanitized filename-safe string
"""
# Remove or replace characters that aren't safe for filenames
safe = name.replace('/', '_').replace('\\', '_')
safe = safe.replace(':', '_').replace('*', '_')
safe = safe.replace('?', '_').replace('"', '_')
safe = safe.replace('<', '_').replace('>', '_')
safe = safe.replace('|', '_')
# Remove # prefix from channels
if safe.startswith('#'):
safe = safe[1:]
return safe
def _close_all_files(self):
"""Close all cached file handles."""
for f in self.log_files.values():
try:
f.close()
except Exception:
pass
self.log_files.clear()
def get_log_directory(self) -> str:
"""Get the log directory path.
Returns:
Path to log directory
"""
return str(self.log_dir)

1392
src/ui/main_window.py Normal file

File diff suppressed because it is too large Load Diff

200
src/ui/pm_window.py Normal file
View File

@@ -0,0 +1,200 @@
"""Private message window for StormIRC."""
import logging
from PySide6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QTextEdit,
QPushButton, QHBoxLayout, QLabel, QPlainTextEdit
)
from PySide6.QtCore import Qt, Signal
from PySide6.QtGui import QAccessible
from src.ui.autocomplete_textedit import AutocompleteTextEdit
from src.config.settings import is_valid_text
from src.ui.ui_utils import (
format_message_with_timestamp,
apply_speech_settings,
DEFAULT_PM_WINDOW_WIDTH,
DEFAULT_PM_WINDOW_HEIGHT,
PM_MESSAGE_ENTRY_MAX_HEIGHT
)
logger = logging.getLogger(__name__)
class PMWindow(QMainWindow):
"""Private message window."""
message_send = Signal(str, str) # Signal(nickname, message)
window_closed = Signal(str) # Signal(nickname) when window closes
def __init__(self, nickname, irc_client, speech_manager, config_manager, server_name, parent=None):
super().__init__(parent)
self.nickname = nickname
self.irc_client = irc_client
self.speech_manager = speech_manager
self.config_manager = config_manager
self.server_name = server_name
self.message_buffer = []
# Set window title with format: "nickname - Private Message"
self.setWindowTitle(f"{nickname} - Private Message")
self.resize(DEFAULT_PM_WINDOW_WIDTH, DEFAULT_PM_WINDOW_HEIGHT)
self.setup_ui()
def setup_ui(self):
"""Set up the user interface."""
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
# Header with nickname
header_label = QLabel(f"Private conversation with {self.nickname}")
header_label.setStyleSheet("font-weight: bold; padding: 5px;")
layout.addWidget(header_label)
# Chat display
self.chat_display = QTextEdit()
self.chat_display.setReadOnly(True)
# Enable cursor for screen reader navigation (same as main window)
self.chat_display.setTextInteractionFlags(
Qt.TextSelectableByMouse |
Qt.TextSelectableByKeyboard |
Qt.LinksAccessibleByMouse |
Qt.LinksAccessibleByKeyboard
)
self.chat_display.setFocusPolicy(Qt.StrongFocus)
self.chat_display.setAccessibleName(f"Private message conversation with {self.nickname}")
self.chat_display.setAccessibleDescription(
f"Chat history with {self.nickname}. Use arrow keys to navigate. Press Tab to move to message input."
)
layout.addWidget(self.chat_display)
# Message input area
input_layout = QVBoxLayout()
input_label = QLabel("Message:")
input_layout.addWidget(input_label)
# Use autocomplete text edit for nickname completion
self.message_entry = AutocompleteTextEdit()
self.message_entry.setAccessibleName(f"Message input for {self.nickname}")
self.message_entry.setAccessibleDescription(
f"Type your message to {self.nickname} here. Press Enter to send, Shift+Enter for new line."
)
self.message_entry.setPlaceholderText(f"Type message to {self.nickname}...")
self.message_entry.setMaximumHeight(PM_MESSAGE_ENTRY_MAX_HEIGHT)
# Autocomplete will be set up by parent when needed
# Connect Enter key handling
self.message_entry.installEventFilter(self)
input_layout.addWidget(self.message_entry)
# Send button
button_layout = QHBoxLayout()
self.send_button = QPushButton("Send")
self.send_button.setAccessibleName("Send message button")
self.send_button.clicked.connect(self.on_send_message)
button_layout.addWidget(self.send_button)
button_layout.addStretch()
input_layout.addLayout(button_layout)
layout.addLayout(input_layout)
# Set focus to message entry
self.message_entry.setFocus()
def eventFilter(self, obj, event):
"""Event filter to handle Enter key in message entry."""
from PySide6.QtCore import QEvent
if obj == self.message_entry and event.type() == QEvent.KeyPress:
# Enter without Shift sends message, Shift+Enter inserts newline
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
if not (event.modifiers() & Qt.ShiftModifier):
self.on_send_message()
return True # Event handled
return super().eventFilter(obj, event)
def keyPressEvent(self, event):
"""Handle key press events - Ctrl stops all speech, Escape closes window."""
if event.key() == Qt.Key_Control:
self.speech_manager.stop()
elif event.key() == Qt.Key_Escape:
self.close()
return
super().keyPressEvent(event)
def speak_message(self, text: str):
"""Speak message with PM-specific settings."""
accessibility = self.config_manager.get_accessibility_config()
# Check if speech is enabled and auto-read is on
if not accessibility.speech_enabled or not accessibility.auto_read_messages:
return
# Validate text has meaningful content
if not is_valid_text(text):
return
# Get PM-specific settings
channel_settings = self.config_manager.get_channel_speech_settings(self.server_name, self.nickname)
# Apply PM-specific settings using shared utility
apply_speech_settings(self.speech_manager, channel_settings, self.nickname)
# Speak the text
self.speech_manager.speak(text, priority="message", interrupt=False)
def on_send_message(self):
"""Handle send message."""
message = self.message_entry.toPlainText().strip()
if not message:
return
logger.debug(f"Sending PM to {self.nickname}: {message}")
# Emit signal to send message
self.message_send.emit(self.nickname, message)
# Display our message locally
self.add_message(self.irc_client.nickname, message)
# Clear input
self.message_entry.clear()
self.message_entry.setFocus()
def add_message(self, sender, message):
"""Add message to chat display."""
formatted_msg = f"<{sender}> {message}"
self.message_buffer.append(formatted_msg)
# Update display
self.chat_display.append(formatted_msg)
# Speak the message
is_own_message = (sender == self.irc_client.nickname)
if is_own_message:
# Check if "read my own messages" is enabled
accessibility = self.config_manager.get_accessibility_config()
if accessibility.speech_read_own_messages:
self.speak_message(formatted_msg)
else:
# Always speak incoming messages
self.speak_message(formatted_msg)
logger.debug(f"PM window [{self.nickname}]: Added message from {sender}")
def closeEvent(self, event):
"""Handle window close."""
logger.info(f"PM window closing for {self.nickname}")
self.window_closed.emit(self.nickname)
super().closeEvent(event)
def show_and_raise(self):
"""Show window and bring to front."""
self.show()
self.raise_()
self.activateWindow()
self.message_entry.setFocus()

1426
src/ui/settings_dialog.py Normal file

File diff suppressed because it is too large Load Diff

132
src/ui/sound.py Normal file
View File

@@ -0,0 +1,132 @@
"""Sound notification system for StormIRC."""
import logging
from pathlib import Path
from PySide6.QtMultimedia import QMediaPlayer, QAudioOutput
from PySide6.QtCore import QUrl
logger = logging.getLogger(__name__)
class SoundPlayer:
"""Handles sound notifications for IRC events."""
def __init__(self, config_manager=None):
"""Initialize sound player.
Args:
config_manager: ConfigManager instance for reading sound settings
"""
# Get sounds directory relative to this file
self.sounds_dir = Path(__file__).parent.parent.parent / "sounds"
logger.info(f"Sound directory: {self.sounds_dir}")
self.config_manager = config_manager
# Initialize sound effects
self.sounds = {}
self._load_sounds()
def _load_sounds(self):
"""Load sound files."""
sound_files = {
'public_message': 'public_message.wav',
'private_message': 'private_message.wav',
'highlight': 'highlight.wav'
}
for sound_name, filename in sound_files.items():
sound_path = self.sounds_dir / filename
if sound_path.exists():
# Create media player and audio output for each sound
player = QMediaPlayer()
audio_output = QAudioOutput()
player.setAudioOutput(audio_output)
player.setSource(QUrl.fromLocalFile(str(sound_path)))
audio_output.setVolume(1.0)
self.sounds[sound_name] = (player, audio_output)
logger.info(f"Loaded sound: {sound_name} from {sound_path}")
else:
logger.warning(f"Sound file not found: {sound_path}")
def _is_sound_enabled(self, sound_type):
"""Check if a specific sound type is enabled in settings.
Args:
sound_type: One of 'public_message', 'private_message', 'highlight'
Returns:
True if sound should play, False otherwise
"""
if not self.config_manager:
logger.debug(f"No config manager, defaulting {sound_type} to enabled")
return True # Default to enabled if no config manager
accessibility_config = self.config_manager.get_accessibility_config()
# Map sound types to config attributes
sound_settings = {
'public_message': accessibility_config.sound_public_message,
'private_message': accessibility_config.sound_private_message,
'highlight': accessibility_config.sound_highlight
}
enabled = sound_settings.get(sound_type, True)
logger.debug(f"Sound {sound_type} enabled: {enabled}")
return enabled
def play_public_message(self):
"""Play sound for public channel messages."""
logger.debug("play_public_message called")
if self._is_sound_enabled('public_message'):
self._play_sound('public_message')
else:
logger.debug("Public message sound disabled in settings")
def play_private_message(self):
"""Play sound for private messages."""
logger.debug("play_private_message called")
if self._is_sound_enabled('private_message'):
self._play_sound('private_message')
else:
logger.debug("Private message sound disabled in settings")
def play_highlight(self):
"""Play sound for nickname highlights/mentions."""
logger.debug("play_highlight called")
if self._is_sound_enabled('highlight'):
self._play_sound('highlight')
else:
logger.debug("Highlight sound disabled in settings")
def _play_sound(self, sound_name):
"""Play a specific sound."""
if sound_name in self.sounds:
try:
player, audio_output = self.sounds[sound_name]
player.stop() # Stop if already playing
player.setPosition(0) # Rewind to beginning
player.play()
logger.debug(f"Playing sound: {sound_name}")
except Exception as e:
logger.error(f"Failed to play sound {sound_name}: {e}")
else:
logger.warning(f"Sound not loaded: {sound_name}")
def is_highlight(self, message, nickname):
"""Check if a message contains a highlight/mention of the nickname.
Args:
message: The message text to check
nickname: The user's nickname to look for
Returns:
True if the nickname appears in the message (case-insensitive)
"""
if not message or not nickname:
return False
# Case-insensitive check for nickname anywhere in message
return nickname.lower() in message.lower()

360
src/ui/speech.py Normal file
View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Speech Dispatcher integration for self-voicing functionality.
Provides text-to-speech output for IRC messages and events.
"""
import logging
import threading
from typing import Optional, Dict
try:
import speechd
SPEECHD_AVAILABLE = True
except ImportError:
SPEECHD_AVAILABLE = False
logging.warning("Speech Dispatcher not available. Install python3-speechd for self-voicing support.")
class SpeechManager:
"""Manages text-to-speech output via Speech Dispatcher."""
def __init__(self, app_name: str = "StormIRC"):
"""
Initialize the speech manager.
Args:
app_name: Application name for Speech Dispatcher connection
"""
self.app_name = app_name
self.client: Optional[speechd.SSIPClient] = None
self.enabled = False
self.speechLock = threading.Lock()
self.uiSpeechThread = None
# Per-window speech settings
self.window_speech_settings = {} # Per-window speech enable/disable
self.window_voice_settings: Dict[str, str] = {} # Per-window voice
self.window_module_settings: Dict[str, str] = {} # Per-window output module
self.window_rate_settings: Dict[str, int] = {} # Per-window rate
# Global default settings
self.default_voice = None
self.default_module = None
self.default_rate = 0
self.default_pitch = 0
self.default_volume = 0
if SPEECHD_AVAILABLE:
self._connect()
def _connect(self) -> bool:
"""
Connect to Speech Dispatcher.
Returns:
True if connection successful, False otherwise
"""
try:
self.client = speechd.SSIPClient(self.app_name)
logging.info("Connected to Speech Dispatcher")
return True
except Exception as e:
logging.error(f"Failed to connect to Speech Dispatcher: {e}")
self.client = None
return False
def set_enabled(self, enabled: bool):
"""
Enable or disable speech globally.
Args:
enabled: True to enable speech, False to disable
"""
self.enabled = enabled
logging.debug(f"Global speech {'enabled' if enabled else 'disabled'}")
def is_enabled(self) -> bool:
"""
Check if speech is globally enabled.
Returns:
True if speech is enabled and available
"""
return self.enabled and SPEECHD_AVAILABLE and self.client is not None
def set_window_speech(self, window_id: str, enabled: bool):
"""
Enable or disable speech for a specific window/channel.
Args:
window_id: Window identifier (e.g., channel name or server)
enabled: True to enable speech for this window, False to disable
"""
self.window_speech_settings[window_id] = enabled
logging.debug(f"Speech for window '{window_id}' {'enabled' if enabled else 'disabled'}")
def is_window_speech_enabled(self, window_id: str) -> bool:
"""
Check if speech is enabled for a specific window.
Args:
window_id: Window identifier
Returns:
True if speech is enabled for this window (defaults to True if not set)
"""
return self.window_speech_settings.get(window_id, True)
def toggle_window_speech(self, window_id: str) -> bool:
"""
Toggle speech for a specific window.
Args:
window_id: Window identifier
Returns:
New state (True if now enabled, False if now disabled)
"""
current = self.is_window_speech_enabled(window_id)
new_state = not current
self.set_window_speech(window_id, new_state)
return new_state
def speak(self, text: str, window_id: Optional[str] = None, priority: str = "message", interrupt: bool = True):
"""
Speak text if speech is enabled (globally and for the window).
Args:
text: Text to speak
window_id: Optional window identifier to check window-specific settings
priority: Speech priority - "important", "message", or "text"
interrupt: If True, stop current speech first (default: True)
"""
if not self.is_enabled():
return
# Check window-specific settings if window_id provided
if window_id and not self.is_window_speech_enabled(window_id):
return
if not text or not text.strip() or not self.client:
return
# Safety: Wait for previous UI speech thread to finish if still running
if self.uiSpeechThread and self.uiSpeechThread.is_alive():
self.uiSpeechThread.join(timeout=0.1)
def speak_thread():
with self.speechLock:
try:
if interrupt:
self.client.stop()
# Apply window-specific settings if provided, otherwise use defaults
# CRITICAL: Set output module BEFORE voice, because changing module resets voice
# Set output module
if window_id and window_id in self.window_module_settings:
self.client.set_output_module(self.window_module_settings[window_id])
elif self.default_module:
self.client.set_output_module(self.default_module)
# Set voice (must come AFTER module)
if window_id and window_id in self.window_voice_settings:
self.client.set_synthesis_voice(self.window_voice_settings[window_id])
elif self.default_voice:
self.client.set_synthesis_voice(self.default_voice)
# Set rate (always apply, even if 0)
if window_id and window_id in self.window_rate_settings:
self.client.set_rate(self.window_rate_settings[window_id])
else:
self.client.set_rate(self.default_rate)
# Set pitch (always apply)
self.client.set_pitch(self.default_pitch)
# Set volume (always apply)
self.client.set_volume(self.default_volume)
# Map priority to Speech Dispatcher priority
priority_map = {
"important": speechd.Priority.IMPORTANT,
"message": speechd.Priority.MESSAGE,
"text": speechd.Priority.TEXT
}
spd_priority = priority_map.get(priority, speechd.Priority.MESSAGE)
self.client.set_priority(spd_priority)
self.client.speak(str(text))
logging.debug(f"Speaking: {text[:50]}...")
except Exception as e:
logging.error(f"Failed to speak text: {e}")
self.uiSpeechThread = threading.Thread(target=speak_thread)
self.uiSpeechThread.daemon = True
self.uiSpeechThread.start()
def cancel(self):
"""Cancel current speech output."""
if self.client:
try:
self.client.stop()
except Exception as e:
logging.error(f"Failed to cancel speech: {e}")
def stop(self):
"""Stop current speech (alias for cancel)."""
self.cancel()
def set_rate(self, rate: int, window_id: Optional[str] = None):
"""
Set speech rate globally or for a specific window.
Args:
rate: Speech rate from -100 (slowest) to 100 (fastest), 0 is default
window_id: Optional window identifier. If None, sets global default.
"""
rate = max(-100, min(100, rate))
if window_id:
self.window_rate_settings[window_id] = rate
else:
self.default_rate = rate
if self.client:
try:
self.client.set_rate(rate)
except Exception as e:
logging.error(f"Failed to set speech rate: {e}")
def set_pitch(self, pitch: int):
"""
Set speech pitch globally.
Args:
pitch: Speech pitch from -100 (lowest) to 100 (highest), 0 is default
"""
pitch = max(-100, min(100, pitch))
self.default_pitch = pitch
if self.client:
try:
self.client.set_pitch(pitch)
except Exception as e:
logging.error(f"Failed to set speech pitch: {e}")
def set_volume(self, volume: int):
"""
Set speech volume globally.
Args:
volume: Speech volume from -100 (quietest) to 100 (loudest), 0 is default
"""
volume = max(-100, min(100, volume))
self.default_volume = volume
if self.client:
try:
self.client.set_volume(volume)
except Exception as e:
logging.error(f"Failed to set speech volume: {e}")
def set_voice(self, voice_name: str, window_id: Optional[str] = None):
"""
Set the voice/synthesizer to use globally or for a specific window.
Args:
voice_name: Name of the voice/synthesizer
window_id: Optional window identifier. If None, sets global default.
"""
if window_id:
self.window_voice_settings[window_id] = voice_name
else:
self.default_voice = voice_name
if self.client:
try:
self.client.set_synthesis_voice(voice_name)
except Exception as e:
logging.error(f"Failed to set voice: {e}")
def set_output_module(self, module_name: str, window_id: Optional[str] = None):
"""
Set the output module (speech synthesizer) globally or for a specific window.
Args:
module_name: Name of the output module (e.g., 'espeak-ng', 'festival')
window_id: Optional window identifier. If None, sets global default.
"""
if window_id:
self.window_module_settings[window_id] = module_name
else:
self.default_module = module_name
if self.client:
try:
self.client.set_output_module(module_name)
except Exception as e:
logging.error(f"Failed to set output module: {e}")
def list_voices(self) -> list:
"""
Get list of available voices.
Returns:
List of voice tuples (name, language, variant), empty if not available
"""
if self.client:
try:
voices = self.client.list_synthesis_voices()
return list(voices) if voices else []
except Exception as e:
logging.error(f"Failed to list voices: {e}")
return []
def list_output_modules(self) -> list:
"""
List available output modules (speech synthesizers).
Returns:
List of output module names (e.g., 'espeak-ng', 'festival', 'flite')
"""
if self.client:
try:
modules = self.client.list_output_modules()
return list(modules) if modules else []
except Exception as e:
logging.error(f"Failed to list output modules: {e}")
return []
def get_window_voice(self, window_id: str) -> Optional[str]:
"""Get the voice set for a specific window."""
return self.window_voice_settings.get(window_id)
def get_window_module(self, window_id: str) -> Optional[str]:
"""Get the output module set for a specific window."""
return self.window_module_settings.get(window_id)
def get_window_rate(self, window_id: str) -> Optional[int]:
"""Get the speech rate set for a specific window."""
return self.window_rate_settings.get(window_id)
def clear_window_settings(self, window_id: str):
"""Clear all custom settings for a specific window."""
self.window_voice_settings.pop(window_id, None)
self.window_module_settings.pop(window_id, None)
self.window_rate_settings.pop(window_id, None)
def close(self):
"""Close connection to Speech Dispatcher."""
if self.client:
try:
self.client.close()
logging.info("Disconnected from Speech Dispatcher")
except Exception as e:
logging.error(f"Error closing Speech Dispatcher connection: {e}")
finally:
self.client = None
def is_available(self) -> bool:
"""Check if speech-dispatcher is available."""
return SPEECHD_AVAILABLE and self.client is not None

68
src/ui/ui_utils.py Normal file
View File

@@ -0,0 +1,68 @@
"""Shared UI utilities for StormIRC.
This module contains common utility functions used across UI components
to maintain DRY principles and consistency.
"""
import logging
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.config.settings import UIConfig, ChannelSpeechSettings
from src.ui.speech import SpeechManager
logger = logging.getLogger(__name__)
# UI Constants
DEFAULT_WINDOW_WIDTH = 800
DEFAULT_WINDOW_HEIGHT = 600
DEFAULT_PM_WINDOW_WIDTH = 600
DEFAULT_PM_WINDOW_HEIGHT = 400
DEFAULT_TOPIC_BAR_HEIGHT = 60
MESSAGE_ENTRY_MAX_HEIGHT = 60 # Keep main window message entry compact
PM_MESSAGE_ENTRY_MAX_HEIGHT = 100 # PM window has slightly more space
MIN_WINDOW_WIDTH = 400
MIN_WINDOW_HEIGHT = 300
def format_message_with_timestamp(message: str, ui_config: 'UIConfig') -> str:
"""Format a message with an optional timestamp.
Args:
message: The message text to format
ui_config: UI configuration containing timestamp preferences
Returns:
Formatted message string with timestamp if enabled
"""
if ui_config.show_timestamps:
timestamp = datetime.now().strftime(ui_config.timestamp_format)
return f"{message} {timestamp}"
return message
def apply_speech_settings(speech_manager: 'SpeechManager',
settings: 'ChannelSpeechSettings',
window_id: str) -> None:
"""Apply channel/PM speech settings to the speech manager.
This consolidates the common pattern of applying per-channel/PM
speech configuration (voice, rate, output module) to avoid duplication.
Args:
speech_manager: The SpeechManager instance to configure
settings: Channel-specific speech settings
window_id: Unique identifier for the window (channel name or PM nick)
"""
if settings.voice:
speech_manager.set_voice(settings.voice, window_id=window_id)
if settings.rate != 0:
speech_manager.set_rate(settings.rate, window_id=window_id)
if settings.output_module:
speech_manager.set_output_module(settings.output_module, window_id=window_id)
logger.debug(f"Applied speech settings for window '{window_id}': "
f"voice={settings.voice}, rate={settings.rate}, "
f"module={settings.output_module}")

66
stormirc Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""StormIRC - Accessible IRC Client."""
import sys
import argparse
import logging
from pathlib import Path
# Add the project root to the path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from src.ui.main_window import MainWindow
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import Qt
def setup_logging(debug=False):
"""Set up logging configuration."""
if debug:
log_file = project_root / "stormirc.log"
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] %(name)s: %(message)s [%(asctime)s]',
datefmt='%a %b %d %I:%M:%S %p %Z %Y',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
logging.info(f"Debug logging enabled, writing to {log_file}")
else:
logging.basicConfig(
level=logging.WARNING,
format='%(levelname)s: %(message)s'
)
def main():
"""Main entry point for StormIRC."""
parser = argparse.ArgumentParser(description="StormIRC - Accessible IRC Client")
parser.add_argument('-d', '--debug', action='store_true',
help='Enable debug logging to stormirc.log')
args = parser.parse_args()
setup_logging(args.debug)
# Enable high DPI scaling
QApplication.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)
app = QApplication(sys.argv)
app.setApplicationName("StormIRC")
app.setOrganizationName("Stormux")
# Create and show main window
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()