Speech history plugin added. Code and documentation audit completed. Preparing for tagged release.

This commit is contained in:
Storm Dragon
2025-12-22 19:43:41 -05:00
parent 10b3592173
commit 200faa9e36
19 changed files with 773 additions and 367 deletions

View File

@@ -75,6 +75,7 @@ cthulhu_python_sources = files([
'sound.py',
'sound_generator.py',
'speech_and_verbosity_manager.py',
'speech_history.py',
'speech.py',
'spellcheck.py',
'speechdispatcherfactory.py',
@@ -154,4 +155,4 @@ install_data(
# Subdirectories
subdir('backends')
subdir('scripts')
subdir('plugins')
subdir('plugins')

View File

@@ -1 +0,0 @@
from .plugin import SpeechHistory

View File

@@ -1,8 +0,0 @@
name = Speech History
version = 1.0.0
description = Keeps a history of all speech output with navigation and clipboard support
authors = Cthulhu Plugin System
website = https://git.stormux.org/storm/cthulhu
copyright = Copyright 2024 Stormux
builtin = true
hidden = false

View File

@@ -1,235 +0,0 @@
#!/usr/bin/env python3
import logging
from collections import deque
from cthulhu.plugin import Plugin, cthulhu_hookimpl
from cthulhu import settings_manager
from cthulhu import debug
logger = logging.getLogger(__name__)
class SpeechHistory(Plugin):
"""Speech History plugin - SAFE manual-only version (no automatic capture)."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory SAFE plugin initialized", True)
# History storage - start with some sample items
self._max_history_size = 50
self._history = deque([
"Welcome to safe speech history",
"This version doesn't auto-capture to prevent crashes",
"Use add_to_history() method to manually add items",
"Navigate with Cthulhu+Control+Shift+H (previous)",
"Navigate with Cthulhu+Control+H (next)",
"Copy with Cthulhu+Control+Y"
], maxlen=self._max_history_size)
self._current_history_index = -1
# Keybinding storage
self._kb_nav_prev = None
self._kb_nav_next = None
self._kb_copy_last = None
# Settings integration
self._settings_manager = settings_manager.getManager()
@cthulhu_hookimpl
def activate(self, plugin=None):
"""Activate the plugin."""
if plugin is not None and plugin is not self:
return
try:
debug.printMessage(debug.LEVEL_INFO, "=== SpeechHistory SAFE activation starting ===", True)
# Load settings
self._load_settings()
# Register keybindings only - NO speech capture
self._register_keybindings()
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory SAFE plugin activated successfully", True)
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error activating SpeechHistory SAFE: {e}", True)
return False
@cthulhu_hookimpl
def deactivate(self, plugin=None):
"""Deactivate the plugin."""
if plugin is not None and plugin is not self:
return
debug.printMessage(debug.LEVEL_INFO, "Deactivating SpeechHistory SAFE plugin", True)
# Clear keybindings
self._kb_nav_prev = None
self._kb_nav_next = None
self._kb_copy_last = None
return True
def _load_settings(self):
"""Load plugin settings."""
try:
self._max_history_size = self._settings_manager.getSetting('speechHistorySize') or 50
# Update deque maxlen if needed
if self._history.maxlen != self._max_history_size:
old_history = list(self._history)
self._history = deque(old_history[-self._max_history_size:], maxlen=self._max_history_size)
debug.printMessage(debug.LEVEL_INFO, f"Speech history size: {self._max_history_size}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error loading settings: {e}", True)
self._max_history_size = 50
def _register_keybindings(self):
"""Register plugin keybindings."""
try:
# Cthulhu+Control+Shift+H (History previous)
self._kb_nav_prev = self.registerGestureByString(
self._navigate_history_prev,
"Speech history previous",
'kb:cthulhu+control+shift+h'
)
# Cthulhu+Control+H (History next)
self._kb_nav_next = self.registerGestureByString(
self._navigate_history_next,
"Speech history next",
'kb:cthulhu+control+h'
)
# Cthulhu+Control+Y (Copy history)
self._kb_copy_last = self.registerGestureByString(
self._copy_last_spoken,
"Copy speech history item to clipboard",
'kb:cthulhu+control+y'
)
debug.printMessage(debug.LEVEL_INFO, f"Registered keybindings: {bool(self._kb_nav_prev)}, {bool(self._kb_nav_next)}, {bool(self._kb_copy_last)}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error registering keybindings: {e}", True)
def _navigate_history_prev(self, script=None, inputEvent=None):
"""Navigate to previous item in speech history."""
try:
if not self._history:
self._present_message("Speech history is empty")
return True
# Move backward in history (to older items)
if self._current_history_index == -1:
self._current_history_index = len(self._history) - 1
elif self._current_history_index > 0:
self._current_history_index -= 1
else:
self._current_history_index = len(self._history) - 1
# Present the history item
history_item = self._history[self._current_history_index]
position = self._current_history_index + 1
self._present_message(f"History {position} of {len(self._history)}: {history_item}")
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error navigating to previous: {e}", True)
return False
def _navigate_history_next(self, script=None, inputEvent=None):
"""Navigate to next item in speech history."""
try:
if not self._history:
self._present_message("Speech history is empty")
return True
# Move forward in history (to newer items)
if self._current_history_index == -1:
self._current_history_index = 0
elif self._current_history_index < len(self._history) - 1:
self._current_history_index += 1
else:
self._current_history_index = 0
# Present the history item
history_item = self._history[self._current_history_index]
position = self._current_history_index + 1
self._present_message(f"History {position} of {len(self._history)}: {history_item}")
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error navigating to next: {e}", True)
return False
def _copy_last_spoken(self, script=None, inputEvent=None):
"""Copy the last spoken text to clipboard."""
try:
if not self._history:
self._present_message("No speech history to copy")
return True
# Copy the most recent speech
last_spoken = self._history[-1]
try:
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Gdk
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(last_spoken, -1)
clipboard.store()
# Show confirmation
preview = last_spoken[:50] + ('...' if len(last_spoken) > 50 else '')
self._present_message(f"Copied to clipboard: {preview}")
except Exception as clipboard_error:
debug.printMessage(debug.LEVEL_INFO, f"Clipboard error: {clipboard_error}", True)
self._present_message("Error copying to clipboard")
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error copying: {e}", True)
return False
def _present_message(self, message):
"""Present a message to the user via speech."""
try:
if self.app:
state = self.app.getDynamicApiManager().getAPI('CthulhuState')
if state and state.activeScript:
state.activeScript.presentMessage(message, resetStyles=False)
else:
debug.printMessage(debug.LEVEL_INFO, f"Message: {message}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error presenting message: {e}", True)
def add_to_history(self, text):
"""Public method to safely add items to history."""
try:
if not text or not text.strip():
return
clean_text = text.strip()
if len(clean_text) < 2:
return
# Simple duplicate prevention
if self._history and self._history[-1] == clean_text:
return
# Add to history
self._history.append(clean_text)
self._current_history_index = -1
debug.printMessage(debug.LEVEL_INFO, f"Manually added to history: {clean_text[:50]}{'...' if len(clean_text) > 50 else ''}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"Error adding to history: {e}", True)

View File

@@ -0,0 +1,2 @@
from .plugin import SpeechHistory

View File

@@ -11,4 +11,5 @@ python3.install_sources(
install_data(
'plugin.info',
install_dir: python3.get_install_dir() / 'cthulhu' / 'plugins' / 'SpeechHistory'
)
)

View File

@@ -0,0 +1,9 @@
name = Speech History
version = 1.0.0
description = Shows a searchable history of the last 50 unique utterances spoken by Cthulhu
authors = Stormux
website = https://git.stormux.org/storm/cthulhu
copyright = Copyright 2025 Stormux
builtin = false
hidden = false

View File

@@ -0,0 +1,406 @@
#!/usr/bin/env python3
#
# Copyright (c) 2025 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
"""Speech History plugin for Cthulhu."""
import logging
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Gdk
from cthulhu.plugin import Plugin, cthulhu_hookimpl
from cthulhu import debug
from cthulhu import speech_history
logger = logging.getLogger(__name__)
class SpeechHistory(Plugin):
"""Plugin that displays a window containing recent spoken output."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._activated = False
self._kbOpenWindow = None
self._window = None
self._filterEntry = None
self._filterText = ""
self._listStore = None
self._filterModel = None
self._treeView = None
self._capturePaused = False
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Plugin initialized", True)
@cthulhu_hookimpl
def activate(self, plugin=None):
if plugin is not None and plugin is not self:
return
if self._activated:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Already activated, skipping", True)
return True
try:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Activating plugin", True)
self._register_keybinding()
self._activated = True
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Activated successfully", True)
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR during activate: {e}", True)
logger.exception("Error activating SpeechHistory plugin")
return False
@cthulhu_hookimpl
def deactivate(self, plugin=None):
if plugin is not None and plugin is not self:
return
try:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Deactivating plugin", True)
self._close_window()
self._kbOpenWindow = None
self._activated = False
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Deactivated successfully", True)
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR during deactivate: {e}", True)
logger.exception("Error deactivating SpeechHistory plugin")
return False
def _register_keybinding(self):
try:
if not self.app:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: No app reference; cannot register keybinding", True)
return
gestureString = "kb:cthulhu+control+h"
description = "Open speech history"
self._kbOpenWindow = self.registerGestureByString(
self._open_window,
description,
gestureString,
)
if self._kbOpenWindow:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: Registered keybinding {gestureString}", True)
else:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: Failed to register keybinding {gestureString}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR registering keybinding: {e}", True)
logger.exception("Error registering keybinding for SpeechHistory")
def _open_window(self, script=None, inputEvent=None):
try:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Open window requested", True)
if self._window:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Window already open; presenting", True)
self._window.present()
return True
self._pause_capture()
self._create_window()
self._window.show_all()
if self._filterEntry:
self._filterEntry.grab_focus()
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Window shown", True)
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR opening window: {e}", True)
logger.exception("Error opening SpeechHistory window")
self._resume_capture()
return False
def _create_window(self):
self._window = Gtk.Window(title="Speech History - Cthulhu")
self._window.set_default_size(700, 420)
self._window.set_modal(True)
self._window.set_border_width(10)
self._window.connect("destroy", self._on_window_destroy)
self._window.connect("key-press-event", self._on_window_key_press)
mainBox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=10)
# Filter row
filterRow = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
filterLabel = Gtk.Label(label="_Filter:")
filterLabel.set_use_underline(True)
filterLabel.set_halign(Gtk.Align.START)
self._filterEntry = Gtk.Entry()
self._filterEntry.set_hexpand(True)
filterLabel.set_mnemonic_widget(self._filterEntry)
self._filterEntry.connect("changed", self._on_filter_changed)
filterRow.pack_start(filterLabel, False, False, 0)
filterRow.pack_start(self._filterEntry, True, True, 0)
mainBox.pack_start(filterRow, False, False, 0)
# List
self._listStore = Gtk.ListStore(int, str)
self._filterModel = self._listStore.filter_new()
self._filterModel.set_visible_func(self._filter_visible_func)
self._treeView = Gtk.TreeView(model=self._filterModel)
self._treeView.set_headers_visible(True)
selection = self._treeView.get_selection()
selection.set_mode(Gtk.SelectionMode.SINGLE)
idxRenderer = Gtk.CellRendererText()
idxColumn = Gtk.TreeViewColumn("Item", idxRenderer, text=0)
idxColumn.set_resizable(False)
idxColumn.set_sizing(Gtk.TreeViewColumnSizing.AUTOSIZE)
self._treeView.append_column(idxColumn)
textRenderer = Gtk.CellRendererText()
textRenderer.set_property("wrap-width", 640)
textRenderer.set_property("wrap-mode", 2) # Pango.WrapMode.WORD_CHAR
textColumn = Gtk.TreeViewColumn("Spoken Text", textRenderer, text=1)
textColumn.set_resizable(True)
textColumn.set_expand(True)
self._treeView.append_column(textColumn)
scrolled = Gtk.ScrolledWindow()
scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scrolled.add(self._treeView)
mainBox.pack_start(scrolled, True, True, 0)
# Buttons
buttonRow = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
buttonRow.set_halign(Gtk.Align.END)
copyButton = Gtk.Button(label="Copy to clipboard")
removeButton = Gtk.Button(label="Remove from history")
closeButton = Gtk.Button(label="Close")
copyButton.connect("clicked", self._on_copy_clicked)
removeButton.connect("clicked", self._on_remove_clicked)
closeButton.connect("clicked", self._on_close_clicked)
buttonRow.pack_start(copyButton, False, False, 0)
buttonRow.pack_start(removeButton, False, False, 0)
buttonRow.pack_start(closeButton, False, False, 0)
mainBox.pack_start(buttonRow, False, False, 0)
self._window.add(mainBox)
self._refresh_list(selectFirst=True)
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Window created", True)
def _on_filter_changed(self, entry):
try:
self._filterText = entry.get_text() or ""
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: Filter changed '{self._filterText}'", True)
if self._filterModel:
self._filterModel.refilter()
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR filtering: {e}", True)
logger.exception("Error updating speech history filter")
def _filter_visible_func(self, model, treeIter, data=None):
try:
filterText = (self._filterText or "").strip().lower()
if not filterText:
return True
spokenText = model[treeIter][1] or ""
return spokenText.lower().startswith(filterText)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR in filter func: {e}", True)
return True
def _refresh_list(self, selectFirst=False):
try:
if not self._listStore:
return
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Refreshing list", True)
self._listStore.clear()
items = speech_history.get_items()
for idx, item in enumerate(items, start=1):
self._listStore.append([idx, item])
if self._filterModel:
self._filterModel.refilter()
if selectFirst and self._treeView and len(self._filterModel) > 0:
selection = self._treeView.get_selection()
selection.select_path(Gtk.TreePath.new_first())
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR refreshing list: {e}", True)
logger.exception("Error refreshing speech history list")
def _get_selected_text(self):
try:
if not self._treeView:
return None
selection = self._treeView.get_selection()
model, treeIter = selection.get_selected()
if not treeIter:
return None
return model[treeIter][1]
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR getting selection: {e}", True)
logger.exception("Error getting selected speech history item")
return None
def _on_copy_clicked(self, button):
try:
selectedText = self._get_selected_text()
if not selectedText:
self._present_message("No speech history item selected.")
return
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(selectedText, -1)
clipboard.store()
preview = selectedText[:60] + ("..." if len(selectedText) > 60 else "")
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: Copied to clipboard '{preview}'", True)
self._present_message("Copied to clipboard.")
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR copying: {e}", True)
logger.exception("Error copying speech history item to clipboard")
self._present_message("Error copying to clipboard.")
def _on_remove_clicked(self, button):
try:
selectedText = self._get_selected_text()
if not selectedText:
self._present_message("No speech history item selected.")
return
removed = speech_history.remove(selectedText)
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: Remove requested removed={removed}", True)
if removed:
self._refresh_list(selectFirst=True)
self._present_message("Removed from history.")
else:
self._present_message("Item not found in history.")
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR removing: {e}", True)
logger.exception("Error removing speech history item")
self._present_message("Error removing item from history.")
def _on_close_clicked(self, button):
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Close button clicked", True)
self._close_window()
def _close_window(self):
try:
if self._window:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Closing window", True)
self._window.destroy()
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR closing window: {e}", True)
logger.exception("Error closing SpeechHistory window")
self._resume_capture()
def _on_window_destroy(self, widget):
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Window destroyed", True)
self._window = None
self._filterEntry = None
self._listStore = None
self._filterModel = None
self._treeView = None
self._filterText = ""
self._resume_capture()
def _on_window_key_press(self, widget, event):
try:
if event.keyval == Gdk.KEY_Escape:
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Escape pressed; closing window", True)
self._close_window()
return True
if not self._filterEntry or self._filterEntry.is_focus():
return False
# If user starts typing anywhere, move focus to filter and update it.
if event.keyval == Gdk.KEY_BackSpace:
currentText = self._filterEntry.get_text() or ""
if currentText:
self._filterEntry.set_text(currentText[:-1])
self._filterEntry.set_position(-1)
return True
return False
modifierMask = (
Gdk.ModifierType.CONTROL_MASK
| Gdk.ModifierType.MOD1_MASK
| Gdk.ModifierType.SUPER_MASK
| Gdk.ModifierType.META_MASK
)
if event.state & modifierMask:
return False
keyUnicode = Gdk.keyval_to_unicode(event.keyval)
if not keyUnicode:
return False
charTyped = chr(keyUnicode)
if not charTyped.isprintable():
return False
self._filterEntry.grab_focus()
currentText = self._filterEntry.get_text() or ""
self._filterEntry.set_text(currentText + charTyped)
self._filterEntry.set_position(-1)
return True
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR in key handler: {e}", True)
logger.exception("Error handling key press in SpeechHistory window")
return False
def _pause_capture(self):
if self._capturePaused:
return
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Pausing capture while window is open", True)
speech_history.pause_capture(reason="SpeechHistory window open")
self._capturePaused = True
def _resume_capture(self):
if not self._capturePaused:
return
debug.printMessage(debug.LEVEL_INFO, "SpeechHistory: Resuming capture (window closed)", True)
speech_history.resume_capture(reason="SpeechHistory window closed")
self._capturePaused = False
def _present_message(self, message):
try:
if not self.app:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: {message}", True)
return
state = self.app.getDynamicApiManager().getAPI("CthulhuState")
if state and state.activeScript:
state.activeScript.presentMessage(message, resetStyles=False)
else:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: {message}", True)
except Exception as e:
debug.printMessage(debug.LEVEL_INFO, f"SpeechHistory: ERROR presenting message: {e}", True)
logger.exception("Error presenting message from SpeechHistory")

View File

@@ -7,6 +7,7 @@ subdir('HelloCthulhu')
subdir('IndentationAudio')
subdir('OCR')
subdir('PluginManager')
subdir('SpeechHistory')
subdir('SimplePluginSystem')
subdir('hello_world')
subdir('self_voice')
subdir('self_voice')

View File

@@ -42,6 +42,7 @@ from . import speech_generator
from .speechserver import VoiceFamily
from .acss import ACSS
from . import speech_history
_logger = logger.getLogger()
log = _logger.newLog("speech")
@@ -143,12 +144,28 @@ def sayAll(utteranceIterator, progressCallback):
if settings.silenceSpeech:
return
if _speechserver:
_speechserver.sayAll(utteranceIterator, progressCallback)
def _speechHistorySayAllWrapper():
for [context, acss] in utteranceIterator:
try:
utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll")
except Exception:
debug.printException(debug.LEVEL_INFO)
yield [context, acss]
_speechserver.sayAll(_speechHistorySayAllWrapper(), progressCallback)
else:
for [context, acss] in utteranceIterator:
logLine = f"SPEECH OUTPUT: '{context.utterance}'"
debug.printMessage(debug.LEVEL_INFO, logLine, True)
log.info(logLine)
try:
utterance = getattr(context, "utterance", None)
if isinstance(utterance, str) and utterance.strip():
speech_history.add(utterance, source="sayAll-fallback")
except Exception:
debug.printException(debug.LEVEL_INFO)
def _speak(text, acss, interrupt):
"""Speaks the individual string using the given ACSS."""
@@ -166,6 +183,11 @@ def _speak(text, acss, interrupt):
debug.printMessage(debug.LEVEL_INFO, f"SPEECH: Blocked by sleep mode: '{text}'", True)
return
try:
speech_history.add(text, source="speak")
except Exception:
debug.printException(debug.LEVEL_INFO)
if not _speechserver:
logLine = f"SPEECH OUTPUT: '{text}' {acss}"
debug.printMessage(debug.LEVEL_INFO, logLine, True)

View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
#
# Copyright (c) 2025 Stormux
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
"""Shared speech history buffer.
This module records the last N unique utterances spoken by Cthulhu.
Uniqueness is enforced only within the current history window. If an item
falls off the end, it may be added again later.
"""
from __future__ import annotations
import threading
from collections import deque
from . import debug
_loggerPrefix = "SpeechHistory:"
_maxHistorySize = 50
_historyItems = deque()
_historySet = set()
_lock = threading.Lock()
_pauseCount = 0
_pausedIgnoreCount = 0
def pause_capture(reason: str = "") -> None:
"""Pause capture so speech produced while paused is not recorded."""
global _pauseCount
with _lock:
_pauseCount += 1
global _pausedIgnoreCount
_pausedIgnoreCount = 0
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} capture paused (count={_pauseCount}) reason='{reason}'",
True,
)
def resume_capture(reason: str = "") -> None:
"""Resume capture after a pause_capture()."""
global _pauseCount
with _lock:
if _pauseCount <= 0:
_pauseCount = 0
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} resume requested while not paused reason='{reason}'",
True,
)
return
_pauseCount -= 1
if _pauseCount == 0:
global _pausedIgnoreCount
_pausedIgnoreCount = 0
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} capture resumed (count={_pauseCount}) reason='{reason}'",
True,
)
def is_capture_paused() -> bool:
with _lock:
return _pauseCount > 0
def add(text: str | None, source: str = "") -> bool:
"""Add text to speech history if it's not already present.
Returns True if the item was added; False otherwise.
"""
if text is None:
return False
try:
cleanText = text.strip()
except Exception:
return False
if not cleanText:
return False
with _lock:
if _pauseCount > 0:
global _pausedIgnoreCount
_pausedIgnoreCount += 1
if _pausedIgnoreCount in (1, 25, 50, 100):
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} ignoring speech while paused (ignored={_pausedIgnoreCount}) source='{source}' text='{cleanText[:80]}'",
True,
)
return False
if cleanText in _historySet:
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} duplicate ignored source='{source}' text='{cleanText[:80]}'",
True,
)
return False
_historyItems.appendleft(cleanText)
_historySet.add(cleanText)
evictedText = None
if len(_historyItems) > _maxHistorySize:
evictedText = _historyItems.pop()
_historySet.discard(evictedText)
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} added source='{source}' size={len(_historyItems)} text='{cleanText[:80]}'",
True,
)
if evictedText:
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} evicted size={len(_historyItems)} text='{evictedText[:80]}'",
True,
)
return True
def get_items() -> list[str]:
"""Return a snapshot of history items, newest-first."""
with _lock:
return list(_historyItems)
def remove(text: str | None) -> bool:
"""Remove an item from the history (if present)."""
if text is None:
return False
try:
cleanText = text.strip()
except Exception:
return False
if not cleanText:
return False
with _lock:
if cleanText not in _historySet:
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} remove ignored (not found) text='{cleanText[:80]}'",
True,
)
return False
global _historyItems
_historyItems = deque(item for item in _historyItems if item != cleanText)
_historySet.discard(cleanText)
debug.printMessage(
debug.LEVEL_INFO,
f"{_loggerPrefix} removed size={len(_historyItems)} text='{cleanText[:80]}'",
True,
)
return True