Files
fenrir/src/fenrirscreenreader/speechDriver/genericDriver.py

251 lines
8.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
# generic driver
import shlex
import subprocess
from queue import Empty
from queue import Queue
from subprocess import Popen
from threading import Lock
from threading import Thread
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.speechDriver import speech_driver
class SpeakQueue(Queue):
def clear(self):
try:
while True:
self.get_nowait()
except Empty:
pass
class driver(speech_driver):
def __init__(self):
speech_driver.__init__(self)
self.proc = None
self.speechThread = Thread(target=self.worker)
self.lock = Lock()
self.textQueue = SpeakQueue()
def initialize(self, environment):
self.env = environment
self.minVolume = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMinVolume")
self.maxVolume = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMaxVolume")
self.minPitch = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMinPitch")
self.maxPitch = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMaxPitch")
self.minRate = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMinRate")
self.maxRate = self.env["runtime"][
"SettingsManager"
].get_setting_as_int("speech", "fenrirMaxRate")
self.speechCommand = self.env["runtime"][
"SettingsManager"
].get_setting("speech", "genericSpeechCommand")
if self.speechCommand == "":
self.speechCommand = 'espeak -a fenrirVolume -s fenrirRate -p fenrirPitch -v fenrirVoice -- "fenrirText"'
if False: # for debugging overwrite here
# self.speechCommand = 'spd-say --wait -r 100 -i 100 "fenrirText"'
self.speechCommand = 'flite -t "fenrirText"'
self._is_initialized = True
if self._is_initialized:
self.speechThread.start()
def shutdown(self):
if not self._is_initialized:
return
self.cancel()
self.textQueue.put(-1)
def speak(self, text, queueable=True, ignore_punctuation=False):
if not self._is_initialized:
return
if not queueable:
self.cancel()
utterance = {
"text": text,
"volume": self.volume,
"rate": self.rate,
"pitch": self.pitch,
"module": self.module,
"language": self.language,
"voice": self.voice,
}
self.textQueue.put(utterance.copy())
def cancel(self):
if not self._is_initialized:
return
self.clear_buffer()
self.lock.acquire(True)
try:
if self.proc:
try:
self.proc.terminate()
# Wait for process to finish to prevent zombies
try:
self.proc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
# If terminate didn't work, force kill
self.proc.kill()
self.proc.wait(timeout=1.0)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"speech_driver:Cancel:self.proc.terminate():" + str(e),
debug.DebugLevel.WARNING,
)
try:
self.proc.kill()
# Wait after kill to prevent zombies
self.proc.wait(timeout=1.0)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"speech_driver:Cancel:self.proc.kill():" + str(e),
debug.DebugLevel.WARNING,
)
self.proc = None
finally:
# Ensure lock is always released, even if process termination fails
self.lock.release()
def clear_buffer(self):
if not self._is_initialized:
return
self.textQueue.clear()
def set_voice(self, voice):
if not self._is_initialized:
return
self.voice = str(voice)
def set_pitch(self, pitch):
if not self._is_initialized:
return
self.pitch = str(
self.minPitch + pitch * (self.maxPitch - self.minPitch)
)
def set_rate(self, rate):
if not self._is_initialized:
return
self.rate = str(self.minRate + rate * (self.maxRate - self.minRate))
def set_module(self, module):
if not self._is_initialized:
return
self.module = str(module)
def set_language(self, language):
if not self._is_initialized:
return
self.language = str(language)
def set_volume(self, volume):
if not self._is_initialized:
return
self.volume = str(
self.minVolume + volume * (self.maxVolume - self.minVolume)
)
def worker(self):
while True:
utterance = self.textQueue.get()
if isinstance(utterance, int):
if utterance == -1:
return
else:
continue
elif not isinstance(utterance, dict):
continue
# no text means nothing to speak
if "text" not in utterance:
continue
if not isinstance(utterance["text"], str):
continue
if utterance["text"] == "":
continue
# check for valid data fields
if "volume" not in utterance:
utterance["volume"] = ""
if not isinstance(utterance["volume"], str):
utterance["volume"] = ""
if "module" not in utterance:
utterance["module"] = ""
if not isinstance(utterance["module"], str):
utterance["module"] = ""
if "language" not in utterance:
utterance["language"] = ""
if not isinstance(utterance["language"], str):
utterance["language"] = ""
if "voice" not in utterance:
utterance["voice"] = ""
if not isinstance(utterance["voice"], str):
utterance["voice"] = ""
if "pitch" not in utterance:
utterance["pitch"] = ""
if not isinstance(utterance["pitch"], str):
utterance["pitch"] = ""
if "rate" not in utterance:
utterance["rate"] = ""
if not isinstance(utterance["rate"], str):
utterance["rate"] = ""
popen_speech_command = shlex.split(self.speechCommand)
for idx, word in enumerate(popen_speech_command):
word = word.replace("fenrirVolume", str(utterance["volume"]))
word = word.replace("fenrirModule", str(utterance["module"]))
word = word.replace(
"fenrirLanguage", str(utterance["language"])
)
word = word.replace("fenrirVoice", str(utterance["voice"]))
word = word.replace("fenrirPitch", str(utterance["pitch"]))
word = word.replace("fenrirRate", str(utterance["rate"]))
# Properly quote text to prevent command injection
word = word.replace(
"fenrirText", shlex.quote(str(utterance["text"]))
)
popen_speech_command[idx] = word
try:
self.env["runtime"]["DebugManager"].write_debug_out(
"speech_driver:worker:" + " ".join(popen_speech_command),
debug.DebugLevel.INFO,
)
self.lock.acquire(True)
self.proc = Popen(
popen_speech_command,
stdin=None,
stdout=None,
stderr=None,
shell=False,
)
self.lock.release()
self.proc.wait()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"speech_driver:worker:" + str(e), debug.DebugLevel.ERROR
)
self.lock.acquire(True)
self.proc = None
self.lock.release()