2025-04-11 13:17:26 -04:00

190 lines
6.8 KiB
Python

#!/usr/bin/env python3
#
# Copyright (c) 2024 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., Franklin Street, Fifth Floor,
# Boston MA 02110-1301 USA.
#
"""Self Voice plugin for Cthulhu screen reader."""
import os
import socket
import select
import logging
import threading
from threading import Thread, Lock
from cthulhu.plugin import Plugin, cthulhu_hookimpl
logger = logging.getLogger(__name__)
# Special codes for message handling
APPEND_CODE = '<#APPEND#>'
PERSISTENT_CODE = '<#PERSISTENT#>'
class SelfVoice(Plugin):
"""Plugin that provides a socket interface for external applications to send text to Cthulhu."""
def __init__(self):
"""Initialize the plugin."""
super().__init__()
self.lock = Lock()
self.active = False
self.voiceThread = Thread(target=self.voiceWorker)
self.voiceThread.daemon = True # Make thread exit when main thread exits
@cthulhu_hookimpl
def activate(self):
"""Activate the self-voice plugin."""
super().activate()
logger.info("Activating Self Voice Plugin")
self.activateWorker()
@cthulhu_hookimpl
def deactivate(self):
"""Deactivate the self-voice plugin."""
logger.info("Deactivating Self Voice Plugin")
self.deactivateWorker()
super().deactivate()
def activateWorker(self):
"""Start the voice worker thread."""
with self.lock:
self.active = True
# Only start if not already running
if not self.voiceThread.is_alive():
self.voiceThread = Thread(target=self.voiceWorker)
self.voiceThread.daemon = True
self.voiceThread.start()
def deactivateWorker(self):
"""Stop the voice worker thread."""
with self.lock:
self.active = False
# Try to join the thread if it's alive, with a timeout
if self.voiceThread.is_alive():
try:
self.voiceThread.join(timeout=2.0)
except Exception as e:
logger.warning(f"Error stopping voice worker thread: {e}")
def isActive(self):
"""Check if the worker is active."""
with self.lock:
return self.active
def outputMessage(self, message):
"""Output a message through Cthulhu's speech and braille systems.
Args:
message: The message to output. May include special codes.
"""
# Process special codes
append = message.startswith(APPEND_CODE)
if append:
message = message[len(APPEND_CODE):]
persistent = False
if message.endswith(PERSISTENT_CODE):
message = message[:len(message)-len(PERSISTENT_CODE)]
persistent = True
# Output through appropriate channel
if persistent:
# Use the APIHelper for persistent messages
self.app.getAPIHelper().outputMessage(message, not append)
else:
# Use the script manager for standard messages
script_manager = self.app.getDynamicApiManager().getAPI('ScriptManager')
scriptManager = script_manager.getManager()
scriptManager.getDefaultScript().presentMessage(message, resetStyles=False)
def voiceWorker(self):
"""Worker thread that listens on a socket for messages to speak."""
socketFile = '/tmp/cthulhu.sock'
# For testing purposes
# socketFile = '/tmp/cthulhu-plugin.sock'
# Clean up any existing socket file
if os.path.exists(socketFile):
try:
os.unlink(socketFile)
except Exception as e:
logger.error(f"Error removing existing socket file: {e}")
return
try:
# Create and set up the socket
cthulhu_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
cthulhu_sock.bind(socketFile)
os.chmod(socketFile, 0o222) # Write-only for everyone
cthulhu_sock.listen(1)
logger.info(f"Self Voice plugin listening on {socketFile}")
# Main loop - listen for connections
while self.isActive():
# Check if data is available with a timeout
try:
r, _, _ = select.select([cthulhu_sock], [], [], 0.8)
except select.error as e:
logger.error(f"Select error: {e}")
break
if not r: # No data available
continue
# Accept connection
if cthulhu_sock in r:
try:
client_sock, _ = cthulhu_sock.accept()
client_sock.settimeout(0.5) # Set a timeout for receiving data
try:
# Receive and process data
raw_data = client_sock.recv(8192)
if raw_data:
data = raw_data.decode("utf-8").strip()
if data:
self.outputMessage(data)
except socket.timeout:
pass
except Exception as e:
logger.error(f"Error receiving data: {e}")
finally:
client_sock.close()
except Exception as e:
logger.error(f"Error accepting connection: {e}")
except Exception as e:
logger.error(f"Socket error: {e}")
finally:
# Clean up
if 'cthulhu_sock' in locals():
try:
cthulhu_sock.close()
except Exception:
pass
if os.path.exists(socketFile):
try:
os.unlink(socketFile)
except Exception as e:
logger.error(f"Error removing socket file: {e}")
logger.info("Self Voice plugin socket closed")