32 Commits

Author SHA1 Message Date
Storm Dragon fd5fe5b328 More progressbar updates. Removed Claud specific progress bar detection, hopefully caught now by generic progress bar updates. They do change it all the time, so it may work, but shouldn't be expected to do so. 2026-06-01 03:04:47 -04:00
Storm Dragon 4ed3f4d6ab Fixed version. 2026-05-31 22:37:59 -04:00
Storm Dragon 2cb83632f9 Fixed keyboard handling regression. 2026-05-31 22:36:35 -04:00
Storm Dragon 0c4fe50606 Updated log names. Should be easier to find if you forget to delete old logs. Now just called fenrir.log, we don't tend to need to have multiple logs anyway. 2026-05-30 14:16:23 -04:00
Storm Dragon 15f2435749 Hopefully fix some weirdness on tab completeion where it would read the entire screen instead of suggested tab completions. 2026-05-30 13:56:25 -04:00
Storm Dragon 3897b63068 Iproved logging for startup flags. 2026-05-29 20:22:06 -04:00
Storm Dragon f1a8e6af21 Fixed long standing bug where bottom of screen played for both top and bottom, found a couple other things that were off in the process. 2026-05-29 19:50:38 -04:00
Storm Dragon bd54ec0edb Fixed version. 2026-05-24 17:14:58 -04:00
Storm Dragon b9518f52ec Vmenu fixed I think. Hopefully last thing before new version. 2026-05-24 17:13:38 -04:00
Storm Dragon c143c9a561 Found a vmenu bug in -x. I thought we were close to a new release... 2026-05-24 17:03:41 -04:00
Storm Dragon 7e2f927596 fixed version. 2026-05-24 14:15:02 -04:00
Storm Dragon 788e678ed6 Attempted fix for some progress bars that were being skipped by progress bar detection. 2026-05-24 14:13:29 -04:00
Storm Dragon ea89e90c2f Merge branch 'testing' Hopefully final release candidate for the new version. 2026-05-23 18:59:14 -04:00
Storm Dragon ce43d64e77 Removed auto as a hardware synth device option. It was too flakey. 2026-05-23 18:58:55 -04:00
Storm Dragon 618987546a Adjust timeout for auto detection. I forgot these devices would be slow because most of them are very old with much less speed than would be expected today. 2026-05-23 18:41:42 -04:00
Storm Dragon 604221a29d Attempt to make auto at least somewhat more reliable. Recommend that device be explicitly set if possible. 2026-05-23 18:23:58 -04:00
Storm Dragon 89b85c6f17 Hardware synth code now verified working. New release candidate. 2026-05-23 18:05:55 -04:00
Storm Dragon 6e3d7fee94 Parse the settings correctly lol. 2026-05-23 17:57:02 -04:00
Storm Dragon 089850ac18 More hw synth refinement. 2026-05-23 17:39:16 -04:00
Storm Dragon 5b7c08260a Another iteration based on feedback from hardware synth testing. 2026-05-23 17:23:52 -04:00
Storm Dragon d4b2fec1db speculative fixes for hardware speech. 2026-05-23 17:10:46 -04:00
Storm Dragon 1f7aa99cc0 Release candidate. 2026-05-23 16:13:51 -04:00
Storm Dragon d853e1b24d Fixed the -x keyboard problem for real this time I'm pretty sure. 2026-05-22 20:23:31 -04:00
Storm Dragon e8bc34eaf5 Merge bug fixes, fix version. 2026-05-21 01:08:27 -04:00
Storm Dragon f84167a7fb Of course, soon as I feel things are stable enough to merge to master bugs come crawling out of the woodwork. Fix for being sure Fenrir switches out of its modal mode completely when leaving speech history. 2026-05-21 01:07:01 -04:00
Storm Dragon 29a6c3eb42 A few teaks. Maybe close to actual release. 2026-05-21 00:50:20 -04:00
Storm Dragon ac7348895f Speech history added, bound to fenrir+control+h. 2026-05-20 21:02:56 -04:00
Storm Dragon 4caef89f6b More work on sockets attempt to get socket command to standard daemon.sock to a running instance. Also, fixed a long standing misspelling in daemon, was deamon, so if your scripts that self-voice or whatever with fenrir no longer work, this is why, please update scripts to the new, correct, daemon.sock. 2026-05-20 20:11:21 -04:00
Storm Dragon 8467bd74c3 New hardware synth support added. Untested, so consider this experimental. 2026-05-20 18:02:51 -04:00
Storm Dragon f09437ea60 Speculative fix for a sometimes speech crash bug. 2026-05-19 01:54:09 -04:00
Storm Dragon 19194e73fc Fenrir was not cleaning up after itself properly. Fixed several possible stale file bugs and hopefully this problem is now gone. 2026-05-18 17:59:47 -04:00
Storm Dragon 096919a2da Hopefully fixed bug where keys like super weren't being forwarded with bypass mode. 2026-05-14 21:45:30 -04:00
63 changed files with 3012 additions and 333 deletions
+3 -3
View File
@@ -88,12 +88,12 @@ Application-specific menus in `vmenu-profiles/KEY/{app}/`:
## Remote Control
Unix socket: `/tmp/fenrirscreenreader-deamon.sock`
Unix socket: `/tmp/fenrirscreenreader-daemon.sock`
TCP: localhost:22447
```bash
echo "command say Hello" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say Hello" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#rate=0.8" | nc localhost 22447
```
+64 -40
View File
@@ -79,7 +79,15 @@ Fenrir is a Linux screen reader. Linux is the only officially supported platform
- python-speechd
2. **genericDriver** - Generic subprocess speech driver
- espeak or espeak-ng (or any TTS command)
3. **debugDriver** - Debug speech driver for testing
3. **dectalkDriver** - Serial DECtalk-compatible hardware speech driver
- RPITalk gadget mode or a DECtalk-compatible serial device
4. **litetalkDriver** - Serial LiteTalk-compatible hardware speech driver
- RPITalk gadget mode or a LiteTalk-compatible serial device
5. **doubletalkDriver** - Serial DoubleTalk LT-compatible hardware speech driver
- DoubleTalk LT; does not support the internal DoubleTalk PC card
6. **tripletalkDriver** - Serial TripleTalk-compatible hardware speech driver
- External DB9 serial TripleTalk devices, or USB models that expose a tty serial device
7. **debugDriver** - Debug speech driver for testing
- No dependencies
@@ -208,6 +216,7 @@ sudo /usr/share/fenrirscreenreader/tools/configure_pipewire.sh
2. **Basic Navigation**:
- **Fenrir Key**: By default `Insert`, `Keypad Insert`, or `Meta/Super` key
- **Tutorial Mode**: `Fenrir + H` to learn all commands interactively
- **Speech History**: `Fenrir + Ctrl + H` to review recent speech
- **Quit Fenrir**: `Fenrir + Q`
3. **Essential Commands**:
@@ -273,7 +282,7 @@ enable_command_remote=True # allow command execution
### Remote Drivers
1. **unixDriver** (recommended): Uses Unix domain sockets
- Socket location: `/tmp/fenrirscreenreader-deamon.sock` for the standard control socket
- Socket location: `/tmp/fenrirscreenreader-daemon.sock` for the standard control socket
- `fenrir -x` instances also create private sockets: `/tmp/fenrirscreenreader-<pid>.sock`
- More secure, local-only access
- Works with `socat`
@@ -290,99 +299,102 @@ The `socat` command provides the easiest way to send commands to Fenrir:
#### Instance Discovery
```bash
# List registered Fenrir instances and their socket paths
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
In X terminal mode (`fenrir -x`), multiple Fenrir instances can run at the same
time. Each instance has a private socket, and one instance may also own the
standard control socket. Use `ls` or `command ls` on the standard socket to find
the private socket for a specific instance. Untargeted commands sent through a
shared or broadcast path are claimed by one instance so duplicate instances do
not all perform the same action.
the private socket for a specific instance. Commands sent to the standard socket
are handled by its owner when possible; otherwise they are forwarded to a
registered private socket, preferring the sender's Fenrir ancestor when one can
be found. Untargeted commands sent through a shared or broadcast path are
claimed by one instance so duplicate instances do not all perform the same
action.
#### Basic Speech Control
```bash
# Interrupt current speech
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Speak custom text
echo "command say Hello, this is a test message" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say Hello, this is a test message" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Temporarily disable speech (until next keystroke)
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Settings Control
```bash
# Enable highlight tracking mode
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change speech parameters
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#pitch=0.6" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#volume=0.9" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#pitch=0.6" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#volume=0.9" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change punctuation level (none/some/most/all)
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set general#punctuation_level=none" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set general#punctuation_level=none" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Voice and TTS engine control
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#module=espeak-ng" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#module=espeak-ng" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Disable sound temporarily
echo "setting set sound#enabled=False" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set sound#volume=0.5" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set sound#enabled=False" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set sound#volume=0.5" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Keyboard and input settings
echo "setting set keyboard#char_echo_mode=1" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set keyboard#word_echo=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set keyboard#char_echo_mode=1" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set keyboard#word_echo=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Screen control (ignore specific TTYs)
echo "setting set screen#ignore_screen=1,2,3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set screen#ignore_screen=1,2,3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Multiple settings at once
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset all settings to defaults
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Save current settings
echo "setting save" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting saveas /tmp/my-fenrir-settings.conf" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting save" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting saveas /tmp/my-fenrir-settings.conf" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Clipboard Operations
```bash
# Place text into clipboard
echo "command clipboard This text will be copied to clipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command clipboard This text will be copied to clipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Export clipboard to file
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Window Management
```bash
# Define a window area (x1 y1 x2 y2)
echo "command window 0 0 80 24" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command window 0 0 80 24" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset window to full screen
echo "command resetwindow" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command resetwindow" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### VMenu Control
```bash
# Set virtual menu context
echo "command vmenu nano/file" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command vmenu nano/file" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset virtual menu
echo "command resetvmenu" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command resetvmenu" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Application Control
```bash
# Quit Fenrir
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
### Using TCP Driver
@@ -447,7 +459,15 @@ setting <action> [parameters]
- `speech#volume=0.1-1.0` - Speech volume
- `speech#voice=voice_name` - Voice selection (e.g., "en-us+f3")
- `speech#module=module_name` - TTS module (e.g., "espeak-ng")
- `speech#driver=driver_name` - Speech driver (speechdDriver/genericDriver)
- `speech#driver=driver_name` - Speech driver (speechdDriver/genericDriver/dectalkDriver/litetalkDriver/doubletalkDriver/tripletalkDriver)
- `speech#hardware_device=/dev/ttyS0` - Hardware synth serial device for dectalkDriver/litetalkDriver
- `speech#hardware_baud_rate=9600` - Hardware synth serial baud rate
- `speech#history_size=50` - Number of spoken items kept in runtime speech history
USB hardware synths are supported only when Linux exposes them as a serial tty
such as `/dev/ttyACM0` or `/dev/ttyUSB0`. A USB-only TripleTalk with no tty
device would require a separate USB protocol driver. Use an explicit
`speech#hardware_device` path for hardware speech.
- `speech#auto_read_incoming=True/False` - Auto-read new text
*Sound Settings:*
@@ -576,7 +596,7 @@ Fenrir provides intelligent progress bar detection and audio feedback for variou
To enable progress monitoring:
1. Add a key binding in your keyboard layout file
2. Or use the remote control system: `echo "command progress_bar_monitor" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock`
2. Or use the remote control system: `echo "command progress_bar_monitor" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock`
### Progress Detection Patterns
@@ -633,11 +653,15 @@ Building...
- **Non-blocking**: Progress tones don't interrupt speech or other functionality
- **Configurable**: Can be enabled/disabled as needed
Fenrir detects stable progress structures rather than application-specific
status formats. Application-specific formats change too frequently to support
reliably.
### Usage Examples
```bash
# Enable progress monitoring
echo "command progress_bar_monitor" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command progress_bar_monitor" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Common scenarios where progress monitoring is useful:
wget https://example.com/large-file.zip # Download progress
@@ -661,7 +685,7 @@ Progress monitoring can be configured through settings:
#!/bin/bash
# notify_fenrir.sh - Send notifications to Fenrir
SOCKET="/tmp/fenrirscreenreader-deamon.sock"
SOCKET="/tmp/fenrirscreenreader-daemon.sock"
fenrir_say() {
echo "command say $1" | socat - UNIX-CLIENT:$SOCKET
@@ -684,7 +708,7 @@ import os
def send_fenrir_command(command):
"""Send command to Fenrir via Unix socket"""
socket_path = "/tmp/fenrirscreenreader-deamon.sock"
socket_path = "/tmp/fenrirscreenreader-daemon.sock"
if os.path.exists(socket_path):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
@@ -716,8 +740,8 @@ send_fenrir_command("setting set speech#rate=0.9")
**Commands not working:**
- Verify `enable_command_remote=True` in settings
- Check Fenrir debug logs: `/var/log/fenrir.log`
- Test with simple command: `echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock`
- Check Fenrir debug logs: `/tmp/fenrir.log`
- Test with simple command: `echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock`
## Command Line Options
+5 -5
View File
@@ -3,13 +3,13 @@ https://git.stormux.org/storm/fenrir/issues
For bugs, please provide a debug file that shows the issue.
How to create a debug file:
1. first delete old stuff:
sudo rm /var/log/fenrir.log
2. start fenrir in debug mode
1. start fenrir in debug mode
sudo fenrir -d
<do your stuff>
3.
2.
stop fenrir (fenrirKey + q)
the debug file is in /var/log/fenrir.log
the debug file is in /tmp/fenrir.log
if another Fenrir debug instance is already using it, check /tmp/fenrir2.log,
/tmp/fenrir3.log, etc.
please be as precise as possible to make it easy to solve the problem.
+6
View File
@@ -26,6 +26,12 @@ This directory contains keyboard layout files for Fenrir screen reader.
- **Exit review**: `Fenrir + Keypad .`
- **Screen reading**: `Fenrir + Keypad 5` (current screen)
### Speech History
- **Open speech history**: `Fenrir + Ctrl + H`
- **Navigate history**: `Up` and `Down`
- **Copy current item**: `Enter`
- **Exit speech history**: `Escape`
## Configuration
To change keyboard layout, edit `/etc/fenrirscreenreader/settings/settings.conf`
+1
View File
@@ -1,4 +1,5 @@
KEY_FENRIR,KEY_H=toggle_tutorial_mode
KEY_FENRIR,KEY_CTRL,KEY_H=speech_history
KEY_CTRL=shut_up
KEY_FENRIR,KEY_KP9=review_bottom
KEY_FENRIR,KEY_KP7=review_top
+1
View File
@@ -1,4 +1,5 @@
KEY_FENRIR,KEY_H=toggle_tutorial_mode
KEY_FENRIR,KEY_CTRL,KEY_H=speech_history
KEY_CTRL=shut_up
KEY_FENRIR,KEY_SHIFT,KEY_O=review_bottom
KEY_FENRIR,KEY_SHIFT,KEY_U=review_top
+25 -2
View File
@@ -35,9 +35,14 @@ progress_monitoring=True
# Turn speech on or off:
enabled=True
# Select speech driver, options are speechdDriver or genericDriver:
# Select speech driver, options are speechdDriver, genericDriver,
# dectalkDriver, litetalkDriver, doubletalkDriver, or tripletalkDriver:
driver=speechdDriver
#driver=genericDriver
#driver=dectalkDriver
#driver=litetalkDriver
#driver=doubletalkDriver
#driver=tripletalkDriver
# The rate selects how fast Fenrir will speak. Options range from 0, slowest, to 1.0, fastest.
rate=0.5
@@ -70,12 +75,29 @@ volume=1.0
# Select the language you want Fenrir to use.
#language=en
# Hardware speech synthesizer serial device.
# Used by dectalkDriver, litetalkDriver, doubletalkDriver, and tripletalkDriver.
# USB serial devices are supported if Linux exposes them as /dev/ttyACM*
# or /dev/ttyUSB*. USB-only synths with no tty device need a separate driver.
# Set an explicit device for hardware speech.
# Examples:
# hardware_device=/dev/ttyACM0 # RPITalk USB gadget mode
# hardware_device=/dev/ttyUSB0 # USB serial adapter
# hardware_device=/dev/ttyS0 # built-in serial port
hardware_device=/dev/ttyS0
# Serial baud rate for hardware speech synthesizers.
hardware_baud_rate=9600
# Read new text as it happens?
auto_read_incoming=True
# Speak individual numbers instead of whole string.
read_numbers_as_digits = False
# Number of spoken items kept in runtime speech history.
history_size=50
# Flood control: batch rapid updates instead of speaking each one
# Number of updates within rapid_update_window to trigger batching
rapid_update_threshold=5
@@ -161,7 +183,8 @@ double_tap_timeout=0.2
# The default is 0, no logging.
debug_level=0
# debugMode sets where the debug output should send to:
# debugMode=File writes to debug_file (Default:/tmp/fenrir-PID.log)
# debugMode=File writes to debug_file (Default:/tmp/fenrir.log)
# If the default log is already in use, Fenrir uses /tmp/fenrir2.log, etc.
# debugMode=Print just prints on the screen
debug_mode=File
debug_file=
+1 -1
View File
@@ -114,7 +114,7 @@ sudo ./fenrir -f -d -p
# Debug output goes to:
# - Console (with -p flag)
# - /var/log/fenrir.log
# - /tmp/fenrir.log
```
## Creating Commands
+28 -15
View File
@@ -50,7 +50,9 @@ Multiple settings can be separated by semicolons.
.TP
.BR \-d ", " \-\-debug
Enable debug mode. Debug information will be logged to /var/log/fenrir.log.
Enable debug mode. Debug information will be logged to /tmp/fenrir.log by
default. If another Fenrir debug instance is already using it, Fenrir uses
/tmp/fenrir2.log, /tmp/fenrir3.log, etc.
.TP
.BR \-p ", " \-\-print
@@ -238,6 +240,14 @@ speechdDriver - Speech-dispatcher (recommended)
.IP \[bu] 4
genericDriver - Command-line TTS (espeak, etc.)
.IP \[bu] 4
dectalkDriver - DECtalk-compatible serial hardware speech
.IP \[bu] 4
litetalkDriver - LiteTalk-compatible serial hardware speech
.IP \[bu] 4
doubletalkDriver - DoubleTalk LT-compatible serial hardware speech
.IP \[bu] 4
tripletalkDriver - TripleTalk-compatible serial hardware speech
.IP \[bu] 4
debugDriver - Debug/testing
.TP
@@ -313,58 +323,61 @@ enable_command_remote=True
.B Instance Discovery:
.EX
# List registered Fenrir instances and their socket paths
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
.EE
In X terminal mode (fenrir -x), multiple Fenrir instances can run at the
same time. Each instance has a private socket at
/tmp/fenrirscreenreader-<pid>.sock, and one instance may also own the
standard control socket. Use ls or "command ls" on the standard socket to
find the private socket for a specific instance.
find the private socket for a specific instance. Commands sent to the standard
socket are handled by its owner when possible; otherwise they are forwarded to a
registered private socket, preferring the sender's Fenrir ancestor when one can
be found.
.TP
.B Basic Speech Control:
.EX
# Interrupt current speech
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Speak custom text
echo "command say Hello, this is a test" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say Hello, this is a test" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Temporarily disable speech
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
.EE
.TP
.B Settings Control:
.EX
# Enable highlight tracking
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change speech rate
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change punctuation level (none/some/most/all)
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Voice and TTS control
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Multiple settings at once
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset all settings
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
.EE
.TP
.B Clipboard Operations:
.EX
# Add text to clipboard
echo "command clipboard Text to copy" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command clipboard Text to copy" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Export clipboard to file
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
.EE
.SS Command Reference
@@ -465,7 +478,7 @@ User sound themes
User scripts
.TP
.B /var/log/fenrir.log
.B /tmp/fenrir.log
Debug log file
.TP
+45 -18
View File
@@ -1278,65 +1278,68 @@ The `+socat+` command provides the easiest way to send commands to Fenrir:
....
# List registered Fenrir instances and their socket paths
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
....
In X terminal mode (`+fenrir -x+`), multiple Fenrir instances can run at the
same time. Each instance has a private socket at
`+/tmp/fenrirscreenreader-<pid>.sock+`, and one instance may also own the
standard control socket. Use `+ls+` or `+command ls+` on the standard socket to
find the private socket for a specific instance.
find the private socket for a specific instance. Commands sent to the standard
socket are handled by its owner when possible; otherwise they are forwarded to a
registered private socket, preferring the sender's Fenrir ancestor when one can
be found.
===== Basic Speech Control
....
# Interrupt current speech
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Speak custom text
echo "command say Hello, this is a test message" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say Hello, this is a test message" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Temporarily disable speech (until next keystroke)
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
....
===== Settings Control
....
# Enable highlight tracking mode
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change speech rate
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change punctuation level (none/some/most/all)
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Voice and TTS control
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Multiple settings at once
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset all settings to defaults
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
....
===== Clipboard Operations
....
# Place text into clipboard
echo "command clipboard This text will be copied to clipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command clipboard This text will be copied to clipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Export clipboard to file
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
....
===== Application Control
....
# Quit Fenrir
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
....
==== Command Reference
@@ -1548,8 +1551,12 @@ enabled=True
Values: on=`+True+`, off=`+False+`
# Select speech driver, options are speechdDriver (default),
genericDriver or espeakDriver: driver=speechdDriver #driver=espeakDriver
genericDriver, dectalkDriver, litetalkDriver, doubletalkDriver or tripletalkDriver: driver=speechdDriver
#driver=genericDriver
#driver=dectalkDriver
#driver=litetalkDriver
#driver=doubletalkDriver
#driver=tripletalkDriver
This Selects the driver used to generate speech output.
@@ -1677,6 +1684,26 @@ the pico module:
language=de-DE
....
Hardware speech drivers use a serial device. Set an explicit path.
....
hardware_device=/dev/ttyACM0
hardware_device=/dev/ttyUSB0
hardware_device=/dev/ttyS0
....
Hardware speech drivers use 9600 baud by default.
....
hardware_baud_rate=9600
....
The `+doubletalkDriver+` targets DoubleTalk LT-style serial devices. It does
not support the internal DoubleTalk PC ISA card.
USB hardware speech synthesizers are supported only when Linux exposes them as
a serial tty such as `+/dev/ttyACM0+` or `+/dev/ttyUSB0+`. USB-only TripleTalk
models with no tty device need a separate driver.
Read new text as it occurs auto_read_incoming=True Values: on=`+True+`,
off=`+False+`
@@ -2249,13 +2276,13 @@ that shows the issue.
==== How-to create a debug file
. Delete old debug stuff +
`+sudo rm /var/log/fenrir.log+`
. Start fenrir in debug mode +
`+sudo fenrir -d+`
. Do your stuff to reproduce the problem
. Stop fenrir (`+fenrirKey + q+`)
the debug file is located in `+/var/log/fenrir.log+`
the debug file is located in `+/tmp/fenrir.log+`. If another Fenrir debug
instance is already using it, check `+/tmp/fenrir2.log+`,
`+/tmp/fenrir3.log+`, etc.
Please be as precise as possible to make it easy to solve the problem.
+44 -22
View File
@@ -38,6 +38,7 @@ Navigate the screen without moving the text cursor. Essential for examining cont
### Navigation (Desktop Layout)
- `Ctrl` - Stop speech (shut up)
- `Fenrir + H` - Tutorial mode
- `Fenrir + Ctrl + H` - Speech history
- `Fenrir + Q` - Quit Fenrir
- `Fenrir + Keypad 5` - Read current screen
- `Keypad 8` - Read current line
@@ -100,6 +101,9 @@ driver=speechdDriver
rate=0.5
pitch=0.5
volume=1.0
hardware_device=/dev/ttyS0
hardware_baud_rate=9600
history_size=50
[sound]
enabled=True
@@ -141,68 +145,70 @@ enable_command_remote=True # allow command execution
#### Instance Discovery
```bash
# List registered Fenrir instances and their socket paths
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "ls" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
In X terminal mode (`fenrir -x`), multiple Fenrir instances can run at the same
time. Each instance has a private socket at `/tmp/fenrirscreenreader-<pid>.sock`,
and one instance may also own the standard control socket. Use `ls` or
`command ls` on the standard socket to find the private socket for a specific
instance.
instance. Commands sent to the standard socket are handled by its owner when
possible; otherwise they are forwarded to a registered private socket,
preferring the sender's Fenrir ancestor when one can be found.
#### Speech Control
```bash
# Interrupt current speech
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command interrupt" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Speak custom text
echo "command say Hello, this is a test" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say Hello, this is a test" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Temporarily disable speech
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command tempdisablespeech" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Settings Control
```bash
# Enable highlight tracking
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set focus#highlight=True" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change speech parameters
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#pitch=0.6" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#volume=0.9" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#pitch=0.6" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#volume=0.9" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Change punctuation level (none/some/most/all)
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set general#punctuation_level=all" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Voice and TTS control
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#module=espeak-ng" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#voice=en-us+f3" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting set speech#module=espeak-ng" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Multiple settings at once
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting set speech#rate=0.8;sound#volume=0.7;general#punctuation_level=most" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Reset all settings
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting reset" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Save settings
echo "setting save" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting saveas /tmp/my-settings.conf" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "setting save" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
echo "setting saveas /tmp/my-settings.conf" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Clipboard Operations
```bash
# Add text to clipboard
echo "command clipboard Text to copy" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command clipboard Text to copy" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Export clipboard to file
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command exportclipboard" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
#### Application Control
```bash
# Quit Fenrir
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
```
### Command Reference
@@ -238,7 +244,7 @@ echo "command quitapplication" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-dea
#### Bash Helper Function
```bash
fenrir_say() {
echo "command say $1" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo "command say $1" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
}
# Usage
@@ -251,7 +257,7 @@ import socket
import os
def send_fenrir_command(command):
socket_path = "/tmp/fenrirscreenreader-deamon.sock"
socket_path = "/tmp/fenrirscreenreader-daemon.sock"
if os.path.exists(socket_path):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
@@ -306,6 +312,9 @@ Fenrir automatically detects and provides audio feedback for progress indicators
- **Automatic**: Works with downloads, compilations, installations
- **Remote control**: Enable via socket commands
Fenrir detects stable progress structures rather than application-specific
status formats, which change too frequently to support reliably.
### Spell Checking
- `Fenrir + S` - Spell check current word
- `Fenrir + S S` - Add word to dictionary
@@ -330,6 +339,19 @@ Fenrir automatically detects and provides audio feedback for progress indicators
### Speech Drivers
- **speechdDriver** - Speech-dispatcher (recommended)
- **genericDriver** - Command-line TTS (espeak, etc.)
- **dectalkDriver** - Serial DECtalk-compatible hardware speech
- **litetalkDriver** - Serial LiteTalk-compatible hardware speech
- **doubletalkDriver** - Serial DoubleTalk LT-compatible hardware speech
- **tripletalkDriver** - Serial TripleTalk-compatible hardware speech
For hardware speech, set `speech#hardware_device` to an explicit serial path.
RPITalk gadget mode usually appears as `/dev/ttyACM0`; USB serial adapters
usually appear as `/dev/ttyUSB0`; built-in serial ports may be `/dev/ttyS0`.
The default baud rate is `9600`. `doubletalkDriver` targets
DoubleTalk LT-style serial devices, not the internal DoubleTalk PC ISA card.
USB TripleTalk devices work only if Linux exposes them as a serial tty such as
`/dev/ttyACM0` or `/dev/ttyUSB0`; USB-only models with no tty device need a
separate driver.
### Sound Drivers
- **genericDriver** - Sox-based (default)
@@ -409,7 +431,7 @@ For a dedicated PTY/terminal screen reader, see TDSR: https://github.com/tspivey
### Debug Mode
```bash
sudo fenrir -f -d
# Debug output goes to /var/log/fenrir.log
# Debug output goes to /tmp/fenrir.log
```
## Getting Help
+23 -5
View File
@@ -878,10 +878,13 @@ Turn speech on or off:
enabled=True
Values: on=''True'', off=''False''
# Select speech driver, options are speechdDriver (default), genericDriver or espeakDriver:
# Select speech driver, options are speechdDriver (default), genericDriver, dectalkDriver, litetalkDriver, doubletalkDriver or tripletalkDriver:
driver=speechdDriver
#driver=espeakDriver
#driver=genericDriver
#driver=dectalkDriver
#driver=litetalkDriver
#driver=doubletalkDriver
#driver=tripletalkDriver
Select the driver used to generate speech output.
@@ -890,7 +893,10 @@ Select the driver used to generate speech output.
Available Drivers:
* ''genericDriver'' using the generic driver, for Fenrir <1.5 this is not available
* ''speechdDriver'' using speech-dispatcher, for Fenrir <1.5 just use ''speechd''
* ''espeakDriver'' using the espeak directly, for Fenrir <1.5 just use ''espeak''
* ''dectalkDriver'' using DECtalk-compatible serial hardware or RPITalk
* ''litetalkDriver'' using LiteTalk-compatible serial hardware or RPITalk
* ''doubletalkDriver'' using DoubleTalk LT-compatible serial hardware
* ''tripletalkDriver'' using TripleTalk-compatible serial hardware
The rate selects how fast Fenrir will speak.
rate=0.65
@@ -921,6 +927,17 @@ Select the language you want Fenrir to use.
language=english-us
Values: Text, see your TTS synths documentation what is available.
Hardware speech drivers use a serial device. Set an explicit path.
hardware_device=/dev/ttyACM0
hardware_device=/dev/ttyUSB0
hardware_device=/dev/ttyS0
Hardware speech drivers use 9600 baud by default.
hardware_baud_rate=9600
The doubletalkDriver targets DoubleTalk LT-style serial devices. It does not support the internal DoubleTalk PC ISA card.
USB hardware speech synthesizers are supported only when Linux exposes them as a serial tty such as /dev/ttyACM0 or /dev/ttyUSB0. USB-only TripleTalk models with no tty device need a separate driver.
Read new text as it occurs
auto_read_incoming=True
Values: on=''True'', off=''False''
@@ -1298,10 +1315,11 @@ Please report Bugs and feature requests to:
for bugs please provide a [[#Howto create a debug file|debug]] file that shows the issue.
==== How-to create a debug file ====
- Delete old debug stuff\\ ''sudo rm /var/log/fenrir.log''
- Start fenrir in debug mode\\ ''sudo fenrir -d''
- Do your stuff to reproduce the problem
- Stop fenrir (''fenrirKey + q'')
the debug file is located in ''/var/log/fenrir.log''
the debug file is located in ''/tmp/fenrir.log''. If another Fenrir debug
instance is already using it, check ''/tmp/fenrir2.log'',
''/tmp/fenrir3.log'', etc.
Please be as precise as possible to make it easy to solve the problem.
+5 -2
View File
@@ -106,14 +106,17 @@ def run_fenrir():
fenrirApp.proceed()
except Exception as e:
print(f"Error starting Fenrir: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(1)
finally:
if fenrirApp and hasattr(fenrirApp, 'cleanup_on_error'):
try:
fenrirApp.cleanup_on_error()
except Exception as cleanup_error:
print(
f"Error during cleanup: {cleanup_error}", file=sys.stderr)
sys.exit(1)
finally:
if fenrirApp:
del fenrirApp
# Clean up PID file if it exists
@@ -112,9 +112,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=True,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
if line_break:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
@@ -49,9 +49,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=True,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
if line_break:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
@@ -50,9 +50,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=True,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
def set_callback(self, callback):
@@ -95,9 +95,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=False,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
if line_break:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
@@ -60,9 +60,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=True,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
if line_break:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
@@ -50,9 +50,9 @@ class command:
"review", "end_of_screen"
):
self.env["runtime"]["OutputManager"].present_text(
_("end of screen"),
_("start of screen"),
interrupt=True,
sound_icon="EndOfScreen",
sound_icon="StartOfScreen",
)
if line_break:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("opens speech history")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].open_history()
def set_callback(self, callback):
pass
@@ -81,12 +81,13 @@ class command:
delta_length = len(delta_text)
if (
delta_length > 200
): # Allow longer progress lines like Claude Code's status
self.env["runtime"]["DebugManager"].write_debug_out(
f"Progress filter: delta too long ({delta_length})",
debug.DebugLevel.INFO,
)
return False
): # Allow longer progress lines such as terminal status output
if not self.is_explicit_progress_delta(delta_text):
self.env["runtime"]["DebugManager"].write_debug_out(
f"Progress filter: delta too long ({delta_length})",
debug.DebugLevel.INFO,
)
return False
# If delta contains newlines and is substantial, let incoming handler
# deal with it to avoid interfering with multi-line text output
@@ -107,6 +108,25 @@ class command:
return True
def is_explicit_progress_delta(self, text):
"""Allow long single-line deltas that still look like progress output."""
import re
if "\n" in text or self.contains_url(text):
return False
has_percentage = re.search(r"(^|\s)\d+(?:\.\d+)?\s*%", text)
if not has_percentage:
return False
return bool(
re.search(
r"[|\[\]#=*>█▉▊▋▌▍▎▏▒▓░]"
r"|\b\d+(?:\.\d+)?\s*[kKmMgGtT](?:i?B)?/s\b",
text,
)
)
def reset_progress_state(self):
"""Reset progress state when a prompt is detected, allowing new progress operations to start fresh"""
self.env["runtime"]["DebugManager"].write_debug_out(
@@ -306,43 +326,27 @@ class command:
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 6: Claude Code working indicators (various symbols + activity text + "esc/ctrl+c to interrupt")
# Matches any: [symbol] [Task description] (... to interrupt ...)
# Pattern 6: Interruptible terminal activity indicators
# Matches any: [symbol] [Task description][…] (... to interrupt ...)
# Symbols include: * ✢ ✽ ✶ ✻ · • ◦ ○ ● ◆ and similar decorative characters
# Example: ✽ Reviewing script for issues… (ctrl+c to interrupt · 33s · ↑ 1.6k tokens · thought for 4s)
claude_progress_match = re.search(
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?…\s*\(.*(?:esc|ctrl\+c) to interrupt.*\)',
# Keep this structural rather than adding application-specific formats,
# which change too frequently to support reliably.
interruptible_activity_match = re.search(
r'[*✢✽✶✻·•◦○●◆]\s+\w+.*?(?:…\s*)?\(.*(?:esc|ctrl\+c) to interrupt.*\)',
text,
re.IGNORECASE,
)
if claude_progress_match:
if interruptible_activity_match:
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
self.env["runtime"]["DebugManager"].write_debug_out(
"Playing Claude Code activity beep",
"Playing interruptible activity beep",
debug.DebugLevel.INFO,
)
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 6b: Claude Code tool invocation indicators (● Tool Name(...))
# Example: ● Web Search("query here")
tool_invocation_match = re.search(
r'[●○◉•◦]\s+(?:Web\s*Search|Read|Write|Edit|Bash|Glob|Grep|Task|WebFetch)\s*\(',
text,
re.IGNORECASE,
)
if tool_invocation_match:
if current_time - self.env["commandBuffer"]["lastProgressTime"] >= 1.0:
self.env["runtime"]["DebugManager"].write_debug_out(
"Playing Claude Code tool invocation beep",
debug.DebugLevel.INFO,
)
self.play_activity_beep()
self.env["commandBuffer"]["lastProgressTime"] = current_time
return
# Pattern 6c: Bullet/white bullet activity lines (•/◦ ...)
# Pattern 6b: Bullet/white bullet activity lines (•/◦ ...)
bullet_activity_match = re.search(
(
r'^\s*[•◦]\s+.*(?:…|\.{3,}|\b(?:thinking|working|processing|'
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("closes speech history")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].close_history()
def set_callback(self, callback):
pass
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("copies current speech history item to the clipboard")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].copy_current_to_clipboard()
def set_callback(self, callback):
pass
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("speaks current speech history item")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].present_current()
def set_callback(self, callback):
pass
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("selects the next speech history item")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].next_entry()
def set_callback(self, callback):
pass
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class command:
def __init__(self):
pass
def initialize(self, environment):
self.env = environment
def shutdown(self):
pass
def get_description(self):
return _("selects the previous speech history item")
def run(self):
self.env["runtime"]["SpeechHistoryManager"].prev_entry()
def set_callback(self, callback):
pass
@@ -127,6 +127,8 @@ class config_command:
self.config.set("speech", "rate", "0.75")
self.config.set("speech", "pitch", "0.5")
self.config.set("speech", "volume", "1.0")
self.config.set("speech", "hardware_device", "/dev/ttyS0")
self.config.set("speech", "hardware_baud_rate", "9600")
self.config.add_section("sound")
self.config.set("sound", "driver", "genericDriver")
@@ -108,6 +108,8 @@ class command(config_command):
"rate": "0.5",
"pitch": "0.5",
"volume": "1.0",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": "9600",
"auto_read_incoming": "True",
}
+49 -9
View File
@@ -3,24 +3,22 @@
import os
import pathlib
import fcntl
from datetime import datetime
from fenrirscreenreader.core import debug
class DebugManager:
DEFAULT_LOG_DIR = "/tmp"
DEFAULT_LOG_BASENAME = "fenrir"
DEFAULT_LOG_EXTENSION = ".log"
def __init__(self, file_name=""):
self._file = None
self._fileOpened = False
self._fileName = (
"/tmp/fenrir_"
+ str(os.getpid())
+ "_"
+ str(datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S"))
+ ".log"
)
if file_name != "":
self._fileName = file_name
self._fileName = file_name
self._useDefaultLogName = file_name == ""
def initialize(self, environment):
self.env = environment
@@ -39,6 +37,10 @@ class DebugManager:
self._fileOpened = False
if file_name != "":
self._fileName = file_name
self._useDefaultLogName = False
if self._useDefaultLogName:
self._open_default_debug_file()
return
if self._fileName != "":
directory = os.path.dirname(self._fileName)
if not os.path.exists(directory):
@@ -51,6 +53,43 @@ class DebugManager:
except Exception as e:
print(e)
def _open_default_debug_file(self):
pathlib.Path(self.DEFAULT_LOG_DIR).mkdir(parents=True, exist_ok=True)
log_number = 1
while True:
log_file = self._default_log_file_name(log_number)
try:
fd = os.open(
log_file,
os.O_CREAT | os.O_RDWR | os.O_NOFOLLOW,
0o644,
)
file_obj = os.fdopen(fd, "a")
fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
file_obj.seek(0)
file_obj.truncate()
os.chmod(log_file, 0o644)
self._file = file_obj
self._fileName = log_file
self._fileOpened = True
return
except BlockingIOError:
try:
file_obj.close()
except Exception:
pass
log_number += 1
except OSError as e:
print(e)
return
def _default_log_file_name(self, log_number):
suffix = "" if log_number == 1 else str(log_number)
return os.path.join(
self.DEFAULT_LOG_DIR,
self.DEFAULT_LOG_BASENAME + suffix + self.DEFAULT_LOG_EXTENSION,
)
def write_debug_out(
self, text, level=debug.DebugLevel.DEACTIVE, on_any_level=False
):
@@ -120,3 +159,4 @@ class DebugManager:
def set_debug_file(self, file_name):
self.close_debug_file()
self._fileName = file_name
self._useDefaultLogName = file_name == ""
+51 -18
View File
@@ -5,6 +5,7 @@
# By Chrys, Storm Dragon, and contributors.
import os
import socket
import signal
import sys
import time
@@ -40,6 +41,7 @@ class FenrirManager:
# Set signal handlers after successful initialization
signal.signal(signal.SIGINT, self.capture_signal)
signal.signal(signal.SIGTERM, self.capture_signal)
signal.signal(signal.SIGHUP, self.capture_signal)
self.signal_handlers_set = True
self.is_initialized = True
@@ -101,6 +103,10 @@ class FenrirManager:
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
if self.environment["runtime"]["SpeechHistoryManager"].is_active():
self.environment["runtime"][
"InputManager"
].clear_event_buffer()
self.detect_shortcut_command()
@@ -157,6 +163,14 @@ class FenrirManager:
current_command, "vmenu-navigation"
)
return
elif self.environment["runtime"]["SpeechHistoryManager"].is_active():
if self.environment["runtime"]["CommandManager"].command_exists(
current_command, "speech-history"
):
self.environment["runtime"]["CommandManager"].execute_command(
current_command, "speech-history"
)
return
# default
self.environment["runtime"]["CommandManager"].execute_command(
@@ -296,12 +310,21 @@ class FenrirManager:
if self.command != "":
self.singleKeyCommand = True
elif (
self.environment["runtime"]["DiffReviewManager"].is_active()
(
self.environment["runtime"]["VmenuManager"].get_active()
or self.environment["runtime"][
"DiffReviewManager"
].is_active()
or self.environment["runtime"][
"SpeechHistoryManager"
].is_active()
)
and self.command != ""
):
# Diff mode uses non-Fenrir modified bindings (Shift/Ctrl).
# Modal modes use non-Fenrir modified bindings.
# Promote resolved shortcuts to executable commands so
# combinations like Shift+H and Ctrl+Right are dispatched.
# combinations like Shift+H, Ctrl+Right, and plain arrows
# are dispatched.
self.singleKeyCommand = True
if not (self.singleKeyCommand or self.modifierInput):
@@ -381,9 +404,15 @@ class FenrirManager:
time.sleep(0.6)
for currentManager in self.environment["general"]["managerList"]:
if self.environment["runtime"][currentManager]:
self.environment["runtime"][currentManager].shutdown()
del self.environment["runtime"][currentManager]
try:
if (
currentManager in self.environment["runtime"]
and self.environment["runtime"][currentManager]
):
self.environment["runtime"][currentManager].shutdown()
del self.environment["runtime"][currentManager]
except Exception:
pass # Ignore errors during individual manager shutdown
self.environment = None
@@ -394,6 +423,7 @@ class FenrirManager:
if self.signal_handlers_set:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
self.signal_handlers_set = False
# Clean up any initialized managers
@@ -454,7 +484,6 @@ class FenrirManager:
# Clean up socket files that might not be removed by the driver
try:
socket_file = None
screen_driver = None
if (
"runtime" in self.environment
and "SettingsManager" in self.environment["runtime"]
@@ -465,19 +494,9 @@ class FenrirManager:
].get_setting("remote", "socket_file")
except Exception:
pass # Use default socket file path
try:
screen_driver = self.environment["runtime"][
"SettingsManager"
].get_setting("screen", "driver")
except Exception:
pass
if not socket_file:
if screen_driver == "vcsaDriver":
socket_file = "/tmp/fenrirscreenreader-deamon.sock"
if os.path.exists(socket_file):
os.unlink(socket_file)
# Always clean up PID-specific socket
pid_socket_file = (
"/tmp/fenrirscreenreader-"
+ str(os.getpid())
@@ -485,6 +504,20 @@ class FenrirManager:
)
if os.path.exists(pid_socket_file):
os.unlink(pid_socket_file)
# Clean up main socket only if it is stale (not active)
main_socket_file = "/tmp/fenrirscreenreader-daemon.sock"
if os.path.exists(main_socket_file):
try:
test_sock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM
)
test_sock.settimeout(0.2)
test_sock.connect(main_socket_file)
except OSError:
os.unlink(main_socket_file)
finally:
test_sock.close()
elif os.path.exists(socket_file):
os.unlink(socket_file)
remoteInstanceRegistry.remove_instance()
@@ -20,6 +20,7 @@ general_data = {
"ScreenManager",
"InputManager",
"OutputManager",
"SpeechHistoryManager",
"HelpManager",
"MemoryManager",
"EventManager",
@@ -48,5 +49,6 @@ general_data = {
"onSwitchApplicationProfile",
"help",
"vmenu-navigation",
"speech-history",
],
}
+145 -112
View File
@@ -22,6 +22,8 @@ class OutputManager:
self.interrupt_thread = None
self.interrupt_done = None
self.interrupt_wait_timeout = 0.1
self.speech_driver_lock = threading.Lock()
self.speech_driver_lock_timeout = 0.05
def initialize(self, environment):
self.env = environment
@@ -73,6 +75,14 @@ class OutputManager:
return
if (len(text) > 1) and (text.strip(string.whitespace) == ""):
return
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"speech", "enabled"
):
speech_history_manager = self.env["runtime"].get(
"SpeechHistoryManager"
)
if speech_history_manager:
speech_history_manager.add_text(text)
is_capital = self._should_announce_capital(text, announce_capital)
use_pitch_for_capital = False
@@ -159,132 +169,144 @@ class OutputManager:
return
if interrupt or flush:
self.interrupt_output()
if not self.speech_driver_lock.acquire(
timeout=self.speech_driver_lock_timeout
):
self.env["runtime"]["DebugManager"].write_debug_out(
"OutputManager.speak_text: Speech driver busy, dropping speech",
debug.DebugLevel.WARNING,
)
return
try:
self.env["runtime"]["SpeechDriver"].set_language(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "language"
try:
self.env["runtime"]["SpeechDriver"].set_language(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "language"
)
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech language in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_voice(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "voice"
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech language in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"Error while setting speech voice in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
if announce_capital:
self.env["runtime"]["SpeechDriver"].set_pitch(
try:
self.env["runtime"]["SpeechDriver"].set_voice(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "voice"
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"Error while setting speech voice in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
if announce_capital:
self.env["runtime"]["SpeechDriver"].set_pitch(
self.env["runtime"][
"SettingsManager"
].get_setting_as_float("speech", "capital_pitch")
)
else:
self.env["runtime"]["SpeechDriver"].set_pitch(
self.env["runtime"][
"SettingsManager"
].get_setting_as_float("speech", "pitch")
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech pitch in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_rate(
self.env["runtime"][
"SettingsManager"
].get_setting_as_float("speech", "capital_pitch")
].get_setting_as_float("speech", "rate")
)
else:
self.env["runtime"]["SpeechDriver"].set_pitch(
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech rate in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_module(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "module"
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech module in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_volume(
self.env["runtime"][
"SettingsManager"
].get_setting_as_float("speech", "pitch")
].get_setting_as_float("speech", "volume")
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech pitch in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_rate(
self.env["runtime"]["SettingsManager"].get_setting_as_float(
"speech", "rate"
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech volume in OutputManager.speak_text ",
debug.DebugLevel.ERROR,
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech rate in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_module(
self.env["runtime"]["SettingsManager"].get_setting(
"speech", "module"
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech module in OutputManager.speak_text",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
self.env["runtime"]["SpeechDriver"].set_volume(
self.env["runtime"]["SettingsManager"].get_setting_as_float(
"speech", "volume"
try:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "new_line_pause"
):
clean_text = text.replace("\n", " , ")
else:
clean_text = text.replace("\n", " ")
clean_text = self.env["runtime"][
"TextManager"
].replace_head_lines(clean_text)
clean_text = self.process_mid_word_punctuation(clean_text)
clean_text = self.env["runtime"][
"PunctuationManager"
].proceed_punctuation(clean_text, ignore_punctuation)
clean_text = re.sub(" +$", " ", clean_text)
self.env["runtime"]["SpeechDriver"].speak(
clean_text, True, ignore_punctuation
)
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"setting speech volume in OutputManager.speak_text ",
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
try:
if self.env["runtime"]["SettingsManager"].get_setting_as_bool(
"general", "new_line_pause"
):
clean_text = text.replace("\n", " , ")
else:
clean_text = text.replace("\n", " ")
clean_text = self.env["runtime"]["TextManager"].replace_head_lines(
clean_text
)
clean_text = self.process_mid_word_punctuation(clean_text)
clean_text = self.env["runtime"][
"PunctuationManager"
].proceed_punctuation(clean_text, ignore_punctuation)
clean_text = re.sub(" +$", " ", clean_text)
self.env["runtime"]["SpeechDriver"].speak(
clean_text, True, ignore_punctuation
)
self.env["runtime"]["DebugManager"].write_debug_out(
"Speak: " + clean_text, debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
'"speak" in OutputManager.speak_text ', debug.DebugLevel.ERROR
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
self.env["runtime"]["DebugManager"].write_debug_out(
"Speak: " + clean_text, debug.DebugLevel.INFO
)
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
'"speak" in OutputManager.speak_text ',
debug.DebugLevel.ERROR,
)
self.env["runtime"]["DebugManager"].write_debug_out(
str(e), debug.DebugLevel.ERROR
)
finally:
self.speech_driver_lock.release()
def interrupt_output(self, wait=True):
interrupt_done, started = self.start_interrupt_output()
@@ -320,6 +342,15 @@ class OutputManager:
self.interrupt_running = False
def cancel_speech(self):
if not self.speech_driver_lock.acquire(
timeout=self.speech_driver_lock_timeout
):
self.env["runtime"]["DebugManager"].write_debug_out(
"OutputManager interrupt_output: Speech driver busy, "
"skipping interrupt",
debug.DebugLevel.WARNING,
)
return
try:
self.env["runtime"]["SpeechDriver"].cancel()
self.env["runtime"]["DebugManager"].write_debug_out(
@@ -331,6 +362,8 @@ class OutputManager:
+ str(e),
debug.DebugLevel.ERROR,
)
finally:
self.speech_driver_lock.release()
def play_sound_icon(self, sound_icon="", interrupt=True):
if sound_icon == "":
@@ -28,6 +28,7 @@ def write_instance(instance_data):
registry_dir = get_registry_dir()
os.makedirs(registry_dir, mode=0o755, exist_ok=True)
os.chmod(registry_dir, 0o755)
prune_stale_instances()
instance_data["updated_at"] = time.time()
instance_path = get_instance_file(instance_data.get("pid"))
with open(instance_path, "w", encoding="utf-8") as instance_file:
@@ -53,6 +54,39 @@ def process_exists(pid):
return False
def prune_stale_instances():
registry_dir = get_registry_dir()
try:
instance_files = os.listdir(registry_dir)
except FileNotFoundError:
return
now = time.time()
for instance_name in instance_files:
instance_path = os.path.join(registry_dir, instance_name)
try:
with open(instance_path, "r", encoding="utf-8") as instance_file:
instance_data = json.load(instance_file)
pid = int(instance_data.get("pid", 0))
updated_at = float(instance_data.get("updated_at", 0))
except (OSError, ValueError, TypeError, json.JSONDecodeError):
try:
os.unlink(instance_path)
except OSError:
pass
continue
if (
not pid
or not process_exists(pid)
or now - updated_at > INSTANCE_TIMEOUT_SEC
):
try:
os.unlink(instance_path)
except OSError:
pass
def list_instances():
registry_dir = get_registry_dir()
instances = []
@@ -72,7 +106,11 @@ def list_instances():
except (OSError, ValueError, TypeError, json.JSONDecodeError):
continue
if not pid or not process_exists(pid) or now - updated_at > INSTANCE_TIMEOUT_SEC:
if (
not pid
or not process_exists(pid)
or now - updated_at > INSTANCE_TIMEOUT_SEC
):
try:
os.unlink(instance_path)
except OSError:
@@ -16,6 +16,7 @@ runtime_data = {
"CommandManager": None,
"ScreenManager": None,
"OutputManager": None,
"SpeechHistoryManager": None,
"DebugManager": None,
"SettingsManager": None,
"FenrirManager": None,
@@ -28,8 +28,11 @@ settings_data = {
"module": "",
"voice": "en-us",
"language": "",
"hardware_device": "/dev/ttyS0",
"hardware_baud_rate": 9600,
"auto_read_incoming": True,
"read_numbers_as_digits": False,
"history_size": 50,
"rapid_update_threshold": 5,
"rapid_update_window": 0.3,
"batch_flush_interval": 0.5,
@@ -6,6 +6,7 @@
import inspect
import os
from argparse import Namespace
from configparser import ConfigParser
from fenrirscreenreader.core import applicationManager
@@ -29,6 +30,7 @@ from fenrirscreenreader.core import readAllManager
from fenrirscreenreader.core import remoteManager
from fenrirscreenreader.core import sayAllManager
from fenrirscreenreader.core import screenManager
from fenrirscreenreader.core import speechHistoryManager
from fenrirscreenreader.core import tableManager
from fenrirscreenreader.core import textManager
from fenrirscreenreader.core import vmenuManager
@@ -66,6 +68,15 @@ class SettingsManager:
def shutdown(self):
pass
def format_cli_args(self, cliArgs):
if cliArgs is None:
return "{}"
if isinstance(cliArgs, Namespace):
args = vars(cliArgs)
else:
args = vars(cliArgs) if hasattr(cliArgs, "__dict__") else {}
return str({key: args[key] for key in sorted(args)})
def get_binding_backup(self):
return self.bindingsBackup.copy()
@@ -508,6 +519,10 @@ class SettingsManager:
valid_drivers = [
"speechdDriver",
"genericDriver",
"dectalkDriver",
"doubletalkDriver",
"litetalkDriver",
"tripletalkDriver",
"dummyDriver",
]
if value not in valid_drivers:
@@ -639,6 +654,11 @@ class SettingsManager:
)
)
environment["runtime"]["DebugManager"].initialize(environment)
environment["runtime"]["DebugManager"].write_debug_out(
"Fenrir startup CLI arguments: " + self.format_cli_args(cliArgs),
debug.DebugLevel.INFO,
on_any_level=True,
)
if cliArgs.force_all_screens:
environment["runtime"]["force_all_screens"] = True
@@ -734,6 +754,11 @@ class SettingsManager:
environment["runtime"]["OutputManager"] = outputManager.OutputManager()
environment["runtime"]["OutputManager"].initialize(environment)
environment["runtime"][
"SpeechHistoryManager"
] = speechHistoryManager.SpeechHistoryManager()
environment["runtime"]["SpeechHistoryManager"].initialize(environment)
environment["runtime"]["InputManager"] = inputManager.InputManager()
environment["runtime"]["InputManager"].initialize(environment)
self.load_keyboard_layout(environment)
@@ -0,0 +1,203 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.core.i18n import _
class SpeechHistoryManager:
def __init__(self):
self.env = None
self.history = []
self.curr_index = -1
self.active = False
self.bindings_backup = None
self.raw_bindings_backup = None
def initialize(self, environment):
self.env = environment
def shutdown(self):
self.set_active(False)
def is_active(self):
return self.active
def add_text(self, text):
if self.active:
return False
if not isinstance(text, str):
return False
if text == "":
return False
text_key = self._get_history_key(text)
if text_key == "":
return False
if text_key in [self._get_history_key(item) for item in self.history]:
return False
history_size = self._get_history_size()
if history_size <= 0:
return False
self.history.insert(0, text)
del self.history[history_size:]
if self.curr_index >= len(self.history):
self.curr_index = len(self.history) - 1
return True
def open_history(self):
if not self.history:
self.env["runtime"]["OutputManager"].present_text(
_("speech history empty"), interrupt=True
)
return False
self.curr_index = -1
self.set_active(True)
self.env["runtime"]["OutputManager"].present_text(
_("Speech history"), interrupt=True
)
return True
def close_history(self, announce=True):
if announce:
self.env["runtime"]["OutputManager"].present_text(
_("speech history closed"), interrupt=True
)
self.set_active(False)
def next_entry(self):
if not self._has_history():
return
if self.curr_index == -1:
self.curr_index = 0
self.present_current()
return
if self.curr_index <= 0:
self.curr_index = 0
self.env["runtime"]["OutputManager"].present_text(
_("First speech history item"), interrupt=True
)
self.present_current(interrupt=False)
return
self.curr_index -= 1
self.present_current()
def prev_entry(self):
if not self._has_history():
return
if self.curr_index == -1:
self.curr_index = 0
self.present_current()
return
if self.curr_index >= len(self.history) - 1:
self.curr_index = len(self.history) - 1
self.env["runtime"]["OutputManager"].present_text(
_("Last speech history item"), interrupt=True
)
self.present_current(interrupt=False)
return
self.curr_index += 1
self.present_current()
def present_current(self, interrupt=True):
if not self._has_history():
self.env["runtime"]["OutputManager"].present_text(
_("speech history empty"), interrupt=True
)
return
self.env["runtime"]["OutputManager"].present_text(
self.history[self.curr_index], interrupt=interrupt
)
def copy_current_to_clipboard(self):
if not self._has_history():
self.close_history()
return
text = self.history[self.curr_index]
self.env["runtime"]["MemoryManager"].add_value_to_first_index(
"clipboardHistory", text
)
self.env["runtime"]["OutputManager"].present_text(
_("copied to clipboard"),
sound_icon="CopyToClipboard",
interrupt=True,
)
self.set_active(False)
def set_active(self, active):
if active == self.active:
return
self.active = active
if self.active:
self._install_bindings()
else:
self._restore_bindings()
def _has_history(self):
if not self.history:
self.curr_index = -1
return False
if self.curr_index >= len(self.history):
self.curr_index = len(self.history) - 1
return True
def _get_history_size(self):
try:
return self.env["runtime"]["SettingsManager"].get_setting_as_int(
"speech", "history_size"
)
except Exception:
return 50
def _get_history_key(self, text):
return " ".join(text.split())
def _install_bindings(self):
self.bindings_backup = self.env["bindings"].copy()
self.raw_bindings_backup = self.env["rawBindings"].copy()
self.env["bindings"] = {
str([1, ["KEY_UP"]]): "SPEECH_HISTORY_PREV",
str([1, ["KEY_DOWN"]]): "SPEECH_HISTORY_NEXT",
str([1, ["KEY_SPACE"]]): "SPEECH_HISTORY_CURRENT",
str([1, ["KEY_ENTER"]]): "SPEECH_HISTORY_COPY",
str([1, ["KEY_KPENTER"]]): "SPEECH_HISTORY_COPY",
str([1, ["KEY_ESC"]]): "SPEECH_HISTORY_CLOSE",
}
modal_raw_bindings = {
str([1, ["KEY_UP"]]): [1, ["KEY_UP"]],
str([1, ["KEY_DOWN"]]): [1, ["KEY_DOWN"]],
str([1, ["KEY_SPACE"]]): [1, ["KEY_SPACE"]],
str([1, ["KEY_ENTER"]]): [1, ["KEY_ENTER"]],
str([1, ["KEY_KPENTER"]]): [1, ["KEY_KPENTER"]],
str([1, ["KEY_ESC"]]): [1, ["KEY_ESC"]],
}
self.env["rawBindings"] = self.raw_bindings_backup.copy()
self.env["rawBindings"].update(modal_raw_bindings)
self._refresh_input_bindings()
def _restore_bindings(self):
if self.bindings_backup is not None:
self.env["bindings"] = self.bindings_backup
if self.raw_bindings_backup is not None:
self.env["rawBindings"] = self.raw_bindings_backup
self.bindings_backup = None
self.raw_bindings_backup = None
self._reset_input_state()
self._refresh_input_bindings()
def _reset_input_state(self):
try:
self.env["runtime"]["InputManager"].reset_input_state()
except Exception:
pass
def _refresh_input_bindings(self):
try:
refresh_grabs = getattr(
self.env["runtime"]["InputDriver"], "refresh_grabs", None
)
if refresh_grabs:
refresh_grabs(force=True)
except Exception:
pass
@@ -132,13 +132,6 @@ class TabCompletionManager:
if candidate_text:
return self._clean_text(candidate_text)
delta_text = self.env["screen"]["new_delta"]
if (
delta_text
and not self.env["screen"].get("new_delta_is_typing", False)
):
return self._clean_text(delta_text)
return ""
def _get_cursor_line_inserted_text(
@@ -184,26 +177,19 @@ class TabCompletionManager:
return "".join(inserted_parts)
def _get_candidate_text(self, old_lines, new_lines, cursor_y):
if len(old_lines) != len(new_lines):
return self._get_inserted_lines(old_lines, new_lines, cursor_y)
changed_lines = []
old_cursor_line = (
old_lines[cursor_y].strip() if cursor_y < len(old_lines) else ""
)
for index, old_line in enumerate(old_lines):
if index == cursor_y:
continue
if index < len(new_lines) and old_line != new_lines[index]:
if new_lines[index].strip() == old_cursor_line:
continue
changed_lines.append(new_lines[index])
return "\n".join(
line.rstrip() for line in changed_lines if line.strip()
return self._get_inserted_lines(
old_lines,
new_lines,
self.env["screen"]["new_cursor"]["y"],
old_cursor_line,
)
def _get_inserted_lines(self, old_lines, new_lines, cursor_y):
def _get_inserted_lines(
self, old_lines, new_lines, new_cursor_y, old_cursor_line
):
matcher = difflib.SequenceMatcher(
None, old_lines, new_lines, autojunk=False
)
@@ -217,10 +203,15 @@ class TabCompletionManager:
) in matcher.get_opcodes():
if tag not in ["insert", "replace"]:
continue
if new_end <= cursor_y:
if new_start > new_cursor_y:
continue
if tag == "replace" and any(
line.strip() for line in old_lines[old_start:old_end]
):
continue
for line in new_lines[new_start:new_end]:
if line.strip():
stripped_line = line.strip()
if stripped_line and stripped_line != old_cursor_line:
inserted_lines.append(line.rstrip())
return "\n".join(inserted_lines)
+2 -2
View File
@@ -4,5 +4,5 @@
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
version = "2026.05.14"
code_name = "testing"
version = "2026.06.01"
code_name = "master"
@@ -162,6 +162,7 @@ class driver(inputDriver):
self.fenrir_keys = set()
self.failed_grabs = 0
self.modifier_state = 0
self.modifier_interrupt_state = 0
def initialize(self, environment):
self.env = environment
@@ -194,6 +195,7 @@ class driver(inputDriver):
)
self.num_lock_mask = self.find_num_lock_mask()
self.refresh_modifier_state()
self.modifier_interrupt_state = self.modifier_state
self.refresh_interesting_keys()
self.refresh_grabs(force=True)
self.env["runtime"]["ProcessManager"].add_custom_event_thread(
@@ -274,6 +276,7 @@ class driver(inputDriver):
while active.value:
try:
self.refresh_grabs()
self.poll_modifier_interrupt_keys()
if not self.display.pending_events():
time.sleep(0.01)
continue
@@ -368,8 +371,59 @@ class driver(inputDriver):
"event_state": 1 if event.type == X.KeyPress else 0,
"event_type": event.type,
"event_raw_state": getattr(event, "state", 0),
"event_x_time": getattr(event, "time", X.CurrentTime),
}
def poll_modifier_interrupt_keys(self):
if not self.active or not self.should_poll_modifier_interrupt_keys():
return
try:
pointer = self.root.query_pointer()
current_state = getattr(pointer, "mask", 0)
except Exception:
return
previous_state = self.modifier_interrupt_state
self.modifier_interrupt_state = current_state
self.modifier_state = current_state
for key_name, modifier_mask in self.interrupt_modifier_masks():
if current_state & modifier_mask and not previous_state & modifier_mask:
self.interrupt_output_on_modifier_key(key_name)
def should_poll_modifier_interrupt_keys(self):
try:
settings_manager = self.env["runtime"]["SettingsManager"]
except Exception:
return False
if not settings_manager.get_setting_as_bool(
"keyboard", "interrupt_on_key_press"
):
return False
return (
settings_manager.get_setting(
"keyboard", "interrupt_on_key_press_filter"
).strip()
== ""
)
def interrupt_modifier_masks(self):
return [
("KEY_CTRL", X.ControlMask),
("KEY_SHIFT", X.ShiftMask),
("KEY_ALT", X.Mod1Mask),
]
def interrupt_output_on_modifier_key(self, key_name):
try:
self.env["runtime"]["OutputManager"].interrupt_output_async()
except Exception as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"x11Driver modifier interrupt failed for "
+ key_name
+ ": "
+ str(e),
debug.DebugLevel.ERROR,
)
def refresh_modifier_state(self):
try:
pointer = self.root.query_pointer()
@@ -650,8 +704,28 @@ class driver(inputDriver):
self.clear_event_buffer()
def write_event_buffer(self):
if self.display:
for event in self.env["input"]["event_buffer"]:
self.replay_key_event(event)
self.clear_event_buffer()
def replay_key_event(self, event):
if not isinstance(event, dict):
return
if event.get("event_type") != X.KeyPress:
return
try:
self.display.allow_events(
X.ReplayKeyboard,
event.get("event_x_time", X.CurrentTime),
)
self.display.flush()
except Exception as e:
self.write_debug(
"x11Driver replay key event failed: " + str(e),
debug.DebugLevel.ERROR,
)
def clear_event_buffer(self):
if not self._initialized:
return
@@ -8,6 +8,7 @@ import os
import os.path
import select
import socket
import struct
import time
from fenrirscreenreader.core import debug
@@ -16,7 +17,7 @@ from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.remoteDriver import RemoteDriver as remoteDriver
MAIN_SOCKET_FILE = "/tmp/fenrirscreenreader-deamon.sock"
MAIN_SOCKET_FILE = "/tmp/fenrirscreenreader-daemon.sock"
class driver(remoteDriver):
@@ -132,6 +133,131 @@ class driver(remoteDriver):
return False
return False
def _socket_file_for_socket(self, fenrir_sock):
for bound_sock, socket_file in self.bound_sockets:
if bound_sock == fenrir_sock:
return socket_file
return ""
def _get_peer_pid(self, client_sock):
so_peercred = getattr(socket, "SO_PEERCRED", 17)
try:
creds = client_sock.getsockopt(
socket.SOL_SOCKET, so_peercred, struct.calcsize("3i")
)
pid, _uid, _gid = struct.unpack("3i", creds)
return pid
except OSError:
return 0
def _get_parent_pid(self, pid):
try:
with open(f"/proc/{pid}/stat", "r", encoding="utf-8") as proc_file:
stat_text = proc_file.read()
except OSError:
return 0
end_command = stat_text.rfind(")")
if end_command == -1:
return 0
stat_fields = stat_text[end_command + 2 :].split()
if len(stat_fields) < 2:
return 0
try:
return int(stat_fields[1])
except ValueError:
return 0
def _find_ancestor_private_socket(self, pid):
seen_pids = set()
while pid > 1 and pid not in seen_pids:
seen_pids.add(pid)
socket_file = f"/tmp/fenrirscreenreader-{pid}.sock"
if self._is_registered_private_socket(socket_file):
return socket_file
pid = self._get_parent_pid(pid)
return ""
def _is_registered_private_socket(self, socket_file):
for instance in remoteInstanceRegistry.list_instances():
if socket_file in instance.get("socket_files", []):
return True
return False
def _get_registered_private_sockets(self):
socket_files = []
for instance in remoteInstanceRegistry.list_instances():
for socket_file in instance.get("socket_files", []):
if socket_file == MAIN_SOCKET_FILE:
continue
if socket_file in socket_files:
continue
socket_files.append(socket_file)
return socket_files
def _find_available_private_socket(self, preferred_socket=""):
socket_files = self._get_registered_private_sockets()
if preferred_socket and preferred_socket in socket_files:
socket_files.remove(preferred_socket)
socket_files.insert(0, preferred_socket)
for socket_file in socket_files:
if self._is_own_socket_file(socket_file):
return socket_file
if self._is_socket_active(socket_file):
return socket_file
return ""
def _is_own_socket_file(self, socket_file):
return any(
socket_file == bound_socket_file
for _bound_sock, bound_socket_file in self.bound_sockets
)
def _has_own_private_socket(self):
return any(
bound_socket_file != MAIN_SOCKET_FILE
for _bound_sock, bound_socket_file in self.bound_sockets
)
def _forward_remote_to_socket(self, data, socket_file):
forward_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
forward_sock.settimeout(0.2)
forward_sock.connect(socket_file)
forward_sock.sendall((data + "\n").encode("utf-8"))
return True
except OSError as e:
self.env["runtime"]["DebugManager"].write_debug_out(
"unixDriver watch_dog: Error forwarding remote data to "
+ socket_file
+ ": "
+ str(e),
debug.DebugLevel.ERROR,
)
return False
finally:
forward_sock.close()
def _route_main_socket_command(self, data, client_sock, socket_file):
if socket_file != MAIN_SOCKET_FILE:
return False
if not self._has_own_private_socket():
return False
peer_pid = self._get_peer_pid(client_sock)
ancestor_socket = ""
if peer_pid > 1:
ancestor_socket = self._find_ancestor_private_socket(peer_pid)
target_socket = self._find_available_private_socket(ancestor_socket)
if not target_socket:
return False
if self._is_own_socket_file(target_socket):
return False
return self._forward_remote_to_socket(data, target_socket)
def _cleanup(self):
for fenrir_sock in self.fenrirSocks:
try:
@@ -152,7 +278,7 @@ class driver(remoteDriver):
self.bound_sockets = []
remoteInstanceRegistry.remove_instance()
def _handle_client(self, client_sock, event_queue):
def _handle_client(self, client_sock, event_queue, socket_file=""):
try:
rawdata = client_sock.recv(8129)
except Exception as e:
@@ -173,6 +299,9 @@ class driver(remoteDriver):
client_sock.sendall((response["message"] + "\n").encode("utf-8"))
return
if self._route_main_socket_command(data, client_sock, socket_file):
return
event_queue.put(
{
"Type": FenrirEventType.remote_incomming,
@@ -188,7 +317,7 @@ class driver(remoteDriver):
def watch_dog(self, active, event_queue):
# echo "command say this is a test" | socat -
# UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
# UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
for socket_file, optional in self._get_socket_candidates():
fenrir_sock = self._bind_socket(socket_file, optional)
if fenrir_sock is None:
@@ -215,10 +344,11 @@ class driver(remoteDriver):
continue
for fenrir_sock in r:
client_sock, client_addr = fenrir_sock.accept()
socket_file = self._socket_file_for_socket(fenrir_sock)
# Ensure client socket is always closed to prevent resource
# leaks
try:
self._handle_client(client_sock, event_queue)
self._handle_client(client_sock, event_queue, socket_file)
finally:
# Always close client socket, even if data processing fails
try:
@@ -333,10 +333,79 @@ class driver(screenDriver):
def handle_stdin_input(self, msg_bytes, event_queue):
if self.synthesize_backspace_shortcut(msg_bytes, event_queue):
return
if self.handle_vmenu_stdin_input(msg_bytes, event_queue):
return
self.record_stdin_keypress(msg_bytes)
self.interrupt_output_on_stdin_input(msg_bytes)
self.inject_text_to_screen(msg_bytes)
def handle_vmenu_stdin_input(self, msg_bytes, event_queue):
if not self.is_vmenu_active():
return False
key_name = self.vmenu_stdin_key_name(msg_bytes)
if key_name and not self.vmenu_key_already_handled(key_name):
self.queue_keypress(key_name, event_queue)
return True
def is_vmenu_active(self):
try:
return self.env["runtime"]["VmenuManager"].get_active()
except Exception:
return False
def vmenu_stdin_key_name(self, msg_bytes):
key_map = {
b"\x1b": "KEY_ESC",
b"\x1b[A": "KEY_UP",
b"\x1b[B": "KEY_DOWN",
b"\x1b[C": "KEY_RIGHT",
b"\x1b[D": "KEY_LEFT",
b"\x1b[5~": "KEY_PAGEUP",
b"\x1b[6~": "KEY_PAGEDOWN",
b"\r": "KEY_ENTER",
b"\n": "KEY_ENTER",
b" ": "KEY_SPACE",
}
if msg_bytes in key_map:
return key_map[msg_bytes]
if len(msg_bytes) != 1:
return None
char = chr(msg_bytes[0])
if "a" <= char <= "z" or "A" <= char <= "Z":
return "KEY_" + char.upper()
return None
def vmenu_key_already_handled(self, key_name):
try:
return key_name in self.env["input"]["curr_input"]
except Exception:
return False
def queue_keypress(self, key_name, event_queue):
event_time = time.time()
for event_state in [1, 0]:
try:
event_queue.put(
{
"Type": FenrirEventType.keyboard_input,
"data": {
"event_name": key_name,
"event_value": 0,
"event_sec": int(event_time),
"event_usec": int((event_time % 1) * 1000000),
"event_state": event_state,
"event_type": 0,
},
},
block=False,
)
except Full:
self.env["runtime"]["DebugManager"].write_debug_out(
"ptyDriver queue_keypress: Event queue full, dropping "
+ key_name,
debug.DebugLevel.WARNING,
)
def record_stdin_keypress(self, msg_bytes):
if msg_bytes != b"\t":
return
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
hardware_serial_driver,
)
class driver(hardware_serial_driver):
cancel_command = b"\x18"
def _speak_bytes(self, text):
return self._clean_text(text).encode("ascii", errors="replace") + b"\x01"
def _rate_command(self, rate):
return self._setting_command("ra", self._scale(rate, 75, 650))
def _pitch_command(self, pitch):
return self._setting_command("dv ap", self._scale(pitch, 50, 180))
def _volume_command(self, volume):
return self._setting_command("vo", self._scale(volume, 0, 100))
def _setting_command(self, command, value):
return f"[:{command} {value}]".encode("ascii")
@@ -0,0 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.speechDriver.litetalkDriver import driver
@@ -0,0 +1,300 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import os
import termios
import threading
import tty
from queue import Empty
from queue import Queue
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.speechDriver import speech_driver
class SpeakQueue(Queue):
def clear(self):
try:
while True:
self.get_nowait()
except Empty:
pass
class hardware_serial_driver(speech_driver):
cancel_command = b""
default_baud_rate = 9600
def __init__(self):
speech_driver.__init__(self)
self.device = ""
self.baud_rate = self.default_baud_rate
self.serial_port = None
self.text_queue = SpeakQueue()
self.lock = threading.Lock()
self.worker_thread = None
self._stop_worker = False
def initialize(self, environment):
self.env = environment
self._is_initialized = False
settings_manager = self.env["runtime"]["SettingsManager"]
self.device = self._clean_device_setting(
settings_manager.get_setting("speech", "hardware_device")
)
self.baud_rate = settings_manager.get_setting_as_int(
"speech", "hardware_baud_rate"
)
self._debug(
"Hardware speech initialize: "
f"requested_device={self.device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._open_serial_port()
self._is_initialized = self.serial_port is not None
if not self._is_initialized:
raise RuntimeError("hardware speech device is not available")
if self._is_initialized:
self._stop_worker = False
self.worker_thread = threading.Thread(
target=self._worker, daemon=True
)
self.worker_thread.start()
def _clean_device_setting(self, device):
if not isinstance(device, str):
return ""
device = device.split("#", 1)[0].split(";", 1)[0].strip()
return device
def shutdown(self):
if not self._is_initialized:
return
self._stop_worker = True
self.clear_buffer()
self.text_queue.put(None)
if self.worker_thread:
self.worker_thread.join(timeout=0.5)
self._close_serial_port()
self._is_initialized = False
def speak(self, text, queueable=True, ignore_punctuation=False):
if not self._is_initialized:
return
if not queueable:
self.cancel()
if not isinstance(text, str) or text == "":
return
self._debug(
"Hardware speech queued text: "
f"{len(text)} chars, queue_size={self.text_queue.qsize()}",
debug.DebugLevel.INFO,
on_any_level=True,
)
self.text_queue.put(text)
def cancel(self):
if not self._is_initialized:
return
self.clear_buffer()
if self.cancel_command:
self._write_bytes(self.cancel_command, "cancel")
def clear_buffer(self):
if not self._is_initialized:
return
self.text_queue.clear()
def set_rate(self, rate):
if not self._is_initialized:
return
if not isinstance(rate, float):
return
self._write_bytes(self._rate_command(rate), "rate")
def set_pitch(self, pitch):
if not self._is_initialized:
return
if not isinstance(pitch, float):
return
self._write_bytes(self._pitch_command(pitch), "pitch")
def set_volume(self, volume):
if not self._is_initialized:
return
if not isinstance(volume, float):
return
self._write_bytes(self._volume_command(volume), "volume")
def _worker(self):
while not self._stop_worker:
text = self.text_queue.get()
if text is None:
return
try:
data = self._speak_bytes(text)
self._debug(
"Hardware speech worker prepared speech bytes: "
f"{len(data)} bytes",
debug.DebugLevel.INFO,
on_any_level=True,
)
self._write_bytes(data, "speech")
except Exception as error:
self._debug(
f"Hardware speech worker failed: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
def _open_serial_port(self):
if not self.device or self.device == "auto":
self._debug(
"Hardware speech requires an explicit serial device",
debug.DebugLevel.ERROR,
on_any_level=True,
)
return
port = self._open_configured_serial_port(self.device)
if port is not None:
self._activate_serial_port(self.device, port)
def _open_configured_serial_port(self, device):
port = None
try:
port = os.open(device, os.O_RDWR | os.O_NOCTTY)
tty.setraw(port)
attrs = termios.tcgetattr(port)
attrs[2] |= termios.CLOCAL | termios.CREAD
baud_rate = self._termios_baud_rate(self.baud_rate)
attrs[4] = baud_rate
attrs[5] = baud_rate
attrs[6][termios.VMIN] = 0
attrs[6][termios.VTIME] = 0
attrs[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
termios.tcsetattr(port, termios.TCSANOW, attrs)
return port
except (OSError, termios.error) as error:
self._close_port(port)
self._debug(
f"Hardware speech device open failed: {device}: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
return None
def _activate_serial_port(self, device, port):
self.serial_port = port
self.device = device
self._debug(
"Hardware speech device opened: "
f"{device}, baud_rate={self.baud_rate}",
debug.DebugLevel.INFO,
on_any_level=True,
)
def _close_serial_port(self):
with self.lock:
if self.serial_port is None:
return
self._close_port(self.serial_port)
self.serial_port = None
def _close_port(self, port):
if port is None:
return
try:
os.close(port)
except OSError as error:
self._debug(
f"Hardware speech device close failed: {error}",
debug.DebugLevel.WARNING,
)
def _write_bytes(self, data, description="data"):
if not data:
return
with self.lock:
if self.serial_port is None:
return
try:
total_written = 0
while total_written < len(data):
bytes_written = os.write(
self.serial_port, data[total_written:]
)
if bytes_written == 0:
raise OSError("serial write returned 0 bytes")
total_written += bytes_written
preview = self._format_bytes_preview(data)
self._debug(
"Hardware speech wrote "
f"{total_written} {description} bytes: {preview}",
debug.DebugLevel.INFO,
on_any_level=True,
)
except OSError as error:
self._debug(
f"Hardware speech write failed: {error}",
debug.DebugLevel.ERROR,
on_any_level=True,
)
def _termios_baud_rate(self, baud_rate):
baud_name = f"B{baud_rate}"
if hasattr(termios, baud_name):
return getattr(termios, baud_name)
self._debug(
f"Unsupported hardware speech baud rate {baud_rate}; using 9600",
debug.DebugLevel.WARNING,
)
return termios.B9600
def _clean_text(self, text):
text = text.replace("\r", " ").replace("\n", " ")
return "".join(
char if 0x20 <= ord(char) <= 0x7E else " "
for char in text
)
def _scale(self, value, minimum, maximum):
value = max(0.0, min(1.0, value))
return int(round(minimum + value * (maximum - minimum)))
def _format_bytes_preview(self, data, limit=32):
preview = data[:limit]
hex_preview = " ".join(f"{byte:02x}" for byte in preview)
ascii_preview = "".join(
chr(byte) if 0x20 <= byte <= 0x7E else "."
for byte in preview
)
suffix = "" if len(data) <= limit else " ..."
return (
f"hex=[{hex_preview}{suffix}] "
f"ascii=[{ascii_preview}{suffix}]"
)
def _debug(self, message, level, on_any_level=False):
try:
self.env["runtime"]["DebugManager"].write_debug_out(
message, level, on_any_level=on_any_level
)
except Exception:
pass
def _speak_bytes(self, text):
raise NotImplementedError
def _rate_command(self, rate):
return b""
def _pitch_command(self, pitch):
return b""
def _volume_command(self, volume):
return b""
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.speechDriver.hardwareSerialDriver import (
hardware_serial_driver,
)
class driver(hardware_serial_driver):
cancel_command = b"\x18"
def _speak_bytes(self, text):
return self._clean_text(text).encode("ascii", errors="replace") + b"\r"
def _rate_command(self, rate):
return self._setting_command(self._scale(rate, 0, 9), b"S")
def _pitch_command(self, pitch):
return self._setting_command(self._scale(pitch, 0, 99), b"P")
def _volume_command(self, volume):
return self._setting_command(self._scale(volume, 0, 9), b"V")
def _setting_command(self, value, command):
return b"\x01" + str(value).encode("ascii") + command
@@ -0,0 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
from fenrirscreenreader.speechDriver.litetalkDriver import driver
@@ -47,7 +47,6 @@ def get_up_char(curr_x, curr_y, curr_text):
curr_y -= 1
if curr_y < 0:
curr_y = 0
else:
end_of_screen = True
curr_char = ""
if not end_of_screen:
@@ -63,7 +62,6 @@ def get_down_char(curr_x, curr_y, curr_text):
curr_y += 1
if curr_y >= len(wrapped_lines):
curr_y = len(wrapped_lines) - 1
else:
end_of_screen = True
curr_char = ""
if not end_of_screen:
+1 -1
View File
@@ -232,7 +232,7 @@ class TestRemoteDataFormat:
"x11_window_id": "0x123",
"socket_files": [
"/tmp/fenrirscreenreader-123.sock",
"/tmp/fenrirscreenreader-deamon.sock",
"/tmp/fenrirscreenreader-daemon.sock",
],
}
],
+59
View File
@@ -0,0 +1,59 @@
from fenrirscreenreader.core.debugManager import DebugManager
def test_default_debug_file_uses_flat_name(tmp_path, monkeypatch):
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
manager = DebugManager()
try:
manager.open_debug_file()
assert manager.get_debug_file() == str(tmp_path / "fenrir.log")
assert (tmp_path / "fenrir.log").exists()
finally:
manager.close_debug_file()
def test_default_debug_file_uses_next_number_when_locked(
tmp_path, monkeypatch
):
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
first_manager = DebugManager()
second_manager = DebugManager()
try:
first_manager.open_debug_file()
second_manager.open_debug_file()
assert first_manager.get_debug_file() == str(tmp_path / "fenrir.log")
assert second_manager.get_debug_file() == str(
tmp_path / "fenrir2.log"
)
assert (tmp_path / "fenrir2.log").exists()
finally:
second_manager.close_debug_file()
first_manager.close_debug_file()
def test_default_debug_file_reuses_unlocked_flat_name(tmp_path, monkeypatch):
monkeypatch.setattr(DebugManager, "DEFAULT_LOG_DIR", str(tmp_path))
first_manager = DebugManager()
second_manager = DebugManager()
try:
first_manager.open_debug_file()
first_manager.close_debug_file()
second_manager.open_debug_file()
assert second_manager.get_debug_file() == str(tmp_path / "fenrir.log")
finally:
second_manager.close_debug_file()
def test_explicit_debug_file_uses_exact_path(tmp_path):
debug_file = tmp_path / "custom.log"
manager = DebugManager(str(debug_file))
try:
manager.open_debug_file()
assert manager.get_debug_file() == str(debug_file)
assert debug_file.exists()
finally:
manager.close_debug_file()
@@ -0,0 +1,86 @@
from unittest.mock import Mock
import pytest
from fenrirscreenreader.core.eventData import FenrirEventType
from fenrirscreenreader.core.fenrirManager import FenrirManager
@pytest.mark.unit
def test_speech_history_plain_key_modal_command_is_dispatched():
manager = FenrirManager.__new__(FenrirManager)
manager.modifierInput = False
manager.singleKeyCommand = False
manager.command = ""
event_manager = Mock(put_to_event_queue=Mock())
input_manager = Mock(
is_key_press=Mock(return_value=False),
no_key_pressed=Mock(return_value=False),
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
get_command_for_shortcut=Mock(return_value="SPEECH_HISTORY_PREV"),
)
speech_history_manager = Mock(is_active=Mock(return_value=True))
diff_review_manager = Mock(is_active=Mock(return_value=False))
vmenu_manager = Mock(get_active=Mock(return_value=False))
manager.environment = {
"input": {
"key_forward": 0,
"prev_input": ["KEY_UP"],
"curr_input": ["KEY_UP"],
},
"runtime": {
"InputManager": input_manager,
"EventManager": event_manager,
"VmenuManager": vmenu_manager,
"DiffReviewManager": diff_review_manager,
"SpeechHistoryManager": speech_history_manager,
},
}
manager.detect_shortcut_command()
event_manager.put_to_event_queue.assert_called_once_with(
FenrirEventType.execute_command, "SPEECH_HISTORY_PREV"
)
@pytest.mark.unit
def test_vmenu_plain_key_modal_command_is_dispatched():
manager = FenrirManager.__new__(FenrirManager)
manager.modifierInput = False
manager.singleKeyCommand = False
manager.command = ""
event_manager = Mock(put_to_event_queue=Mock())
input_manager = Mock(
is_key_press=Mock(return_value=False),
no_key_pressed=Mock(return_value=False),
get_curr_shortcut=Mock(return_value=str([1, ["KEY_UP"]])),
get_command_for_shortcut=Mock(return_value="PREV_VMENU_ENTRY"),
)
vmenu_manager = Mock(get_active=Mock(return_value=True))
speech_history_manager = Mock(is_active=Mock(return_value=False))
diff_review_manager = Mock(is_active=Mock(return_value=False))
manager.environment = {
"input": {
"key_forward": 0,
"prev_input": ["KEY_UP"],
"curr_input": ["KEY_UP"],
},
"runtime": {
"InputManager": input_manager,
"EventManager": event_manager,
"VmenuManager": vmenu_manager,
"DiffReviewManager": diff_review_manager,
"SpeechHistoryManager": speech_history_manager,
},
}
manager.detect_shortcut_command()
event_manager.put_to_event_queue.assert_called_once_with(
FenrirEventType.execute_command, "PREV_VMENU_ENTRY"
)
+186
View File
@@ -0,0 +1,186 @@
import os
import select
import time
from unittest.mock import ANY
from unittest.mock import Mock
import pytest
from fenrirscreenreader.speechDriver import dectalkDriver
from fenrirscreenreader.speechDriver import doubletalkDriver
from fenrirscreenreader.speechDriver import litetalkDriver
from fenrirscreenreader.speechDriver import tripletalkDriver
def build_environment(device):
settings_manager = Mock()
settings_manager.get_setting.side_effect = (
lambda section, setting: device
if (section, setting) == ("speech", "hardware_device")
else ""
)
settings_manager.get_setting_as_int.side_effect = (
lambda section, setting: 9600
if (section, setting) == ("speech", "hardware_baud_rate")
else 0
)
return {
"runtime": {
"SettingsManager": settings_manager,
"DebugManager": Mock(),
}
}
def read_available(fd, expected_length, timeout=1.0):
deadline = time.monotonic() + timeout
data = b""
while len(data) < expected_length and time.monotonic() < deadline:
readable, _, _ = select.select([fd], [], [], 0.05)
if readable:
data += os.read(fd, 1024)
return data
@pytest.fixture
def serial_pair():
master_fd, slave_fd = os.openpty()
try:
yield master_fd, os.ttyname(slave_fd)
finally:
os.close(master_fd)
os.close(slave_fd)
def initialized_driver(driver_class, serial_pair):
master_fd, slave_name = serial_pair
speech_driver = driver_class.driver()
speech_driver.initialize(build_environment(slave_name))
assert speech_driver._is_initialized
return speech_driver, master_fd
def test_dectalk_driver_speaks_printable_text(serial_pair):
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
try:
speech_driver.speak("Hello\nworld ☃")
assert read_available(master_fd, 13) == b"Hello world \x01"
finally:
speech_driver.shutdown()
def test_dectalk_driver_writes_settings_and_cancel(serial_pair):
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
try:
speech_driver.set_rate(1.0)
speech_driver.set_pitch(0.0)
speech_driver.set_volume(0.5)
speech_driver.cancel()
assert read_available(master_fd, 33) == (
b"[:ra 650][:dv ap 50][:vo 50]\x18"
)
finally:
speech_driver.shutdown()
def test_litetalk_driver_speaks_printable_text(serial_pair):
speech_driver, master_fd = initialized_driver(litetalkDriver, serial_pair)
try:
speech_driver.speak("Ready")
assert read_available(master_fd, 6) == b"Ready\r"
finally:
speech_driver.shutdown()
def test_litetalk_driver_writes_settings_and_cancel(serial_pair):
speech_driver, master_fd = initialized_driver(litetalkDriver, serial_pair)
try:
speech_driver.set_rate(1.0)
speech_driver.set_pitch(0.0)
speech_driver.set_volume(0.5)
speech_driver.cancel()
assert read_available(master_fd, 9) == b"\x019S\x010P\x014V\x18"
finally:
speech_driver.shutdown()
def test_configured_device_supports_classic_serial(serial_pair):
master_fd, slave_name = serial_pair
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment(slave_name))
try:
assert speech_driver.device == slave_name
speech_driver.speak("Serial")
assert read_available(master_fd, 7) == b"Serial\r"
finally:
speech_driver.shutdown()
def test_configured_device_strips_inline_comment(serial_pair):
master_fd, slave_name = serial_pair
device_setting = f"{slave_name} # built-in serial port"
speech_driver = litetalkDriver.driver()
speech_driver.initialize(build_environment(device_setting))
try:
assert speech_driver.device == slave_name
speech_driver.speak("Specific")
assert read_available(master_fd, 9) == b"Specific\r"
finally:
speech_driver.shutdown()
def test_auto_device_is_rejected():
speech_driver = litetalkDriver.driver()
with pytest.raises(RuntimeError, match="hardware speech device"):
speech_driver.initialize(build_environment("auto"))
debug_manager = speech_driver.env["runtime"]["DebugManager"]
debug_manager.write_debug_out.assert_called_with(
"Hardware speech requires an explicit serial device",
ANY,
on_any_level=True,
)
def test_hardware_driver_retries_partial_serial_writes(monkeypatch):
speech_driver = litetalkDriver.driver()
speech_driver.env = build_environment("/dev/ttyUSB0")
speech_driver.serial_port = 12
written_chunks = []
def fake_write(port, data):
assert port == 12
chunk = data[:2]
written_chunks.append(chunk)
return len(chunk)
monkeypatch.setattr(
"fenrirscreenreader.speechDriver.hardwareSerialDriver.os.write",
fake_write,
)
speech_driver._write_bytes(b"abcdef", "speech")
assert written_chunks == [b"ab", b"cd", b"ef"]
@pytest.mark.parametrize("driver_class", [doubletalkDriver, tripletalkDriver])
def test_litetalk_compatible_alias_drivers(driver_class, serial_pair):
speech_driver, master_fd = initialized_driver(driver_class, serial_pair)
try:
speech_driver.speak("Alias")
speech_driver.set_rate(1.0)
assert read_available(master_fd, 10) == b"\x019SAlias\r"
finally:
speech_driver.shutdown()
def test_hardware_driver_ignores_empty_and_non_string_text(serial_pair):
speech_driver, master_fd = initialized_driver(dectalkDriver, serial_pair)
try:
speech_driver.speak("")
speech_driver.speak(None)
assert read_available(master_fd, 1, timeout=0.2) == b""
finally:
speech_driver.shutdown()
+79
View File
@@ -26,6 +26,14 @@ def build_output_manager():
"SoundDriver": sound_driver,
"SpeechDriver": speech_driver,
"DebugManager": Mock(write_debug_out=Mock()),
"TextManager": Mock(
replace_head_lines=Mock(side_effect=lambda text: text)
),
"PunctuationManager": Mock(
proceed_punctuation=Mock(
side_effect=lambda text, _ignore_punctuation: text
)
),
},
}
return output_manager, sound_driver, speech_driver
@@ -122,6 +130,77 @@ def test_interrupt_output_waits_only_briefly_for_slow_cancel():
output_manager.interrupt_thread.join(timeout=1.0)
@pytest.mark.unit
def test_cancel_speech_skips_when_speech_driver_is_busy():
output_manager, _sound_driver, speech_driver = build_output_manager()
output_manager.speech_driver_lock_timeout = 0.01
output_manager.speech_driver_lock.acquire()
try:
start_time = time.monotonic()
output_manager.cancel_speech()
elapsed = time.monotonic() - start_time
finally:
output_manager.speech_driver_lock.release()
assert elapsed < 0.2
speech_driver.cancel.assert_not_called()
@pytest.mark.unit
def test_speak_text_drops_speech_when_cancel_holds_driver_lock():
output_manager, _sound_driver, speech_driver = build_output_manager()
output_manager.speech_driver_lock_timeout = 0.01
output_manager.speech_driver_lock.acquire()
try:
start_time = time.monotonic()
output_manager.speak_text("hello", interrupt=False, flush=False)
elapsed = time.monotonic() - start_time
finally:
output_manager.speech_driver_lock.release()
assert elapsed < 0.2
speech_driver.speak.assert_not_called()
@pytest.mark.unit
def test_present_text_records_speech_history_when_enabled():
output_manager, _sound_driver, speech_driver = build_output_manager()
speech_history_manager = Mock(add_text=Mock())
output_manager.env["runtime"]["SpeechHistoryManager"] = (
speech_history_manager
)
output_manager.present_text("hello history", interrupt=False)
speech_history_manager.add_text.assert_called_once_with("hello history")
speech_driver.speak.assert_called_once()
@pytest.mark.unit
def test_present_text_does_not_record_when_speech_disabled():
output_manager, _sound_driver, speech_driver = build_output_manager()
speech_history_manager = Mock(add_text=Mock())
output_manager.env["runtime"]["SpeechHistoryManager"] = (
speech_history_manager
)
def _get_setting_as_bool(section, setting):
if (section, setting) == ("speech", "enabled"):
return False
return True
output_manager.env["runtime"][
"SettingsManager"
].get_setting_as_bool.side_effect = _get_setting_as_bool
output_manager.present_text("hello history", interrupt=False)
speech_history_manager.add_text.assert_not_called()
speech_driver.speak.assert_not_called()
@pytest.mark.unit
def test_key_interrupt_command_uses_nonblocking_interrupt():
module = load_key_interrupt_module()
+101
View File
@@ -39,3 +39,104 @@ def test_progress_detector_skips_typing_delta():
command.is_real_progress_update.assert_not_called()
command.detect_progress.assert_not_called()
@pytest.mark.unit
def test_progress_detector_allows_long_tqdm_transfer_delta():
progress_module = _load_progress_module()
command = progress_module.command()
sample = (
"88%|"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"█████████████████████████▊ "
"| 843M/954M [00:54<00:07, 15.2MB/s]"
)
command.env = {
"commandBuffer": {"progress_monitoring": True},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
},
"screen": {
"new_delta": sample,
"new_content_text": sample,
"old_cursor": {"x": 0, "y": 0},
"new_cursor": {"x": 0, "y": 0},
},
}
assert len(sample) > 200
assert command.is_real_progress_update()
@pytest.mark.unit
def test_progress_detector_beeps_for_long_tqdm_transfer_delta():
progress_module = _load_progress_module()
command = progress_module.command()
sample = (
"90%|"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"████████████████████████████████████████████████████████████████"
"█████████████████████████████████████▍ "
"| 856M/954M [00:56<00:14, 6.78MB/s]"
)
command.env = {
"commandBuffer": {
"progress_monitoring": True,
"lastProgressValue": -1,
"lastProgressTime": 0,
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
},
"screen": {
"new_delta": sample,
"new_delta_is_typing": False,
"new_content_text": sample,
"old_cursor": {"x": 0, "y": 0},
"new_cursor": {"x": 0, "y": 0},
},
}
command.play_progress_tone = Mock()
command.run()
command.play_progress_tone.assert_called_once_with(90.0)
assert command.env["commandBuffer"]["lastProgressValue"] == 90.0
@pytest.mark.unit
def test_progress_detector_beeps_for_interruptible_status_without_ellipsis():
progress_module = _load_progress_module()
command = progress_module.command()
sample = "◦ Files: (1m 04s • esc to interrupt)"
command.env = {
"commandBuffer": {
"progress_monitoring": True,
"lastProgressValue": -1,
"lastProgressTime": 0,
},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"ScreenManager": Mock(is_screen_change=Mock(return_value=False)),
"CursorManager": Mock(is_cursor_vertical_move=Mock(return_value=False)),
},
"screen": {
"new_delta": sample,
"new_delta_is_typing": False,
"new_content_text": sample,
"old_cursor": {"x": 0, "y": 0},
"new_cursor": {"x": 0, "y": 0},
},
}
command.play_activity_beep = Mock()
command.run()
command.play_activity_beep.assert_called_once_with()
+84
View File
@@ -160,6 +160,90 @@ def test_pty_plain_stdin_does_not_record_tab_keypress():
pty_driver.inject_text_to_screen.assert_called_once_with(b"a")
@pytest.mark.unit
@pytest.mark.parametrize(
("sequence", "key_name"),
[
(b"\x1b[A", "KEY_UP"),
(b"\x1b[B", "KEY_DOWN"),
(b"\x1b[C", "KEY_RIGHT"),
(b"\x1b[D", "KEY_LEFT"),
(b"\x1b[5~", "KEY_PAGEUP"),
(b"\x1b[6~", "KEY_PAGEDOWN"),
(b"\x1b", "KEY_ESC"),
(b"\r", "KEY_ENTER"),
(b" ", "KEY_SPACE"),
(b"a", "KEY_A"),
(b"Z", "KEY_Z"),
],
)
def test_pty_vmenu_stdin_is_consumed_and_synthesizes_key_events(
sequence,
key_name,
):
pty_driver = PtyDriver()
event_queue = Mock()
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = False
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"DebugManager": Mock(write_debug_out=Mock()),
"SettingsManager": settings_manager,
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(sequence, event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
assert event_queue.put.call_count == 2
first_event = event_queue.put.call_args_list[0].args[0]
second_event = event_queue.put.call_args_list[1].args[0]
assert first_event["Type"] == FenrirEventType.keyboard_input
assert first_event["data"]["event_name"] == key_name
assert first_event["data"]["event_state"] == 1
assert second_event["data"]["event_name"] == key_name
assert second_event["data"]["event_state"] == 0
@pytest.mark.unit
def test_pty_vmenu_unknown_stdin_is_consumed_without_injection():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": []},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[1;5A", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_vmenu_stdin_does_not_duplicate_current_x11_key():
pty_driver = PtyDriver()
event_queue = Mock()
pty_driver.env = {
"input": {"curr_input": ["KEY_RIGHT"]},
"runtime": {
"VmenuManager": Mock(get_active=Mock(return_value=True)),
},
}
pty_driver.inject_text_to_screen = Mock()
pty_driver.handle_stdin_input(b"\x1b[C", event_queue)
pty_driver.inject_text_to_screen.assert_not_called()
event_queue.put.assert_not_called()
@pytest.mark.unit
def test_pty_stdin_input_honors_interrupt_disabled():
pty_driver = PtyDriver()
@@ -0,0 +1,35 @@
import json
import time
from fenrirscreenreader.core import remoteInstanceRegistry
def test_write_instance_prunes_stale_registry_files(tmp_path, monkeypatch):
monkeypatch.setattr(
remoteInstanceRegistry, "get_registry_dir", lambda: str(tmp_path)
)
monkeypatch.setattr(
remoteInstanceRegistry,
"process_exists",
lambda pid: pid == 456,
)
stale_instance = tmp_path / "123.json"
stale_instance.write_text(
json.dumps(
{
"pid": 123,
"updated_at": time.time(),
}
)
+ "\n",
encoding="utf-8",
)
invalid_instance = tmp_path / "invalid.json"
invalid_instance.write_text("not json\n", encoding="utf-8")
remoteInstanceRegistry.write_instance({"pid": 456})
assert not stale_instance.exists()
assert not invalid_instance.exists()
assert (tmp_path / "456.json").exists()
@@ -0,0 +1,126 @@
import importlib.util
from pathlib import Path
from unittest.mock import Mock
import pytest
from fenrirscreenreader.utils import char_utils
COMMANDS_DIR = (
Path(__file__).resolve().parents[2]
/ "src"
/ "fenrirscreenreader"
/ "commands"
/ "commands"
)
def load_command(name):
spec = importlib.util.spec_from_file_location(
f"fenrir_{name}", COMMANDS_DIR / f"{name}.py"
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.command()
def build_environment(cursor):
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
output_manager = Mock()
cursor_manager = Mock()
cursor_manager.enter_review_mode_curr_text_cursor.return_value = None
cursor_manager.get_review_or_text_cursor.return_value = cursor
return {
"punctuation": {"PUNCTDICT": {" ": "space"}},
"screen": {
"newCursorReview": cursor.copy(),
"new_cursor": cursor.copy(),
"new_content_text": "abc\ndef",
},
"runtime": {
"AttributeManager": Mock(has_attributes=Mock(return_value=False)),
"CursorManager": cursor_manager,
"OutputManager": output_manager,
"SettingsManager": settings_manager,
"TableManager": Mock(is_table_mode=Mock(return_value=False)),
},
}
def run_command(name, cursor):
env = build_environment(cursor)
command = load_command(name)
command.initialize(env)
command.run()
return env["runtime"]["OutputManager"]
def boundary_call(output_manager):
return output_manager.present_text.call_args_list[-1]
@pytest.mark.unit
def test_previous_line_uses_start_of_screen_sound_at_top():
output_manager = run_command("review_prev_line", {"x": 0, "y": 0})
call = boundary_call(output_manager)
assert call.args[0] == "start of screen"
assert call.kwargs["sound_icon"] == "StartOfScreen"
@pytest.mark.unit
def test_next_line_uses_end_of_screen_sound_at_bottom():
output_manager = run_command("review_next_line", {"x": 0, "y": 1})
call = boundary_call(output_manager)
assert call.args[0] == "end of screen"
assert call.kwargs["sound_icon"] == "EndOfScreen"
@pytest.mark.unit
def test_previous_character_uses_start_of_screen_sound_at_top_left():
output_manager = run_command("review_prev_char", {"x": 0, "y": 0})
call = boundary_call(output_manager)
assert call.args[0] == "start of screen"
assert call.kwargs["sound_icon"] == "StartOfScreen"
@pytest.mark.unit
def test_next_character_uses_end_of_screen_sound_at_bottom_right():
output_manager = run_command("review_next_char", {"x": 2, "y": 1})
call = boundary_call(output_manager)
assert call.args[0] == "end of screen"
assert call.kwargs["sound_icon"] == "EndOfScreen"
@pytest.mark.unit
def test_vertical_character_navigation_reports_boundaries_only_at_edges():
assert char_utils.get_up_char(0, 1, "abc\ndef") == (
0,
0,
"a",
False,
)
assert char_utils.get_up_char(0, 0, "abc\ndef") == (
0,
0,
"",
True,
)
assert char_utils.get_down_char(0, 0, "abc\ndef") == (
0,
1,
"d",
False,
)
assert char_utils.get_down_char(0, 1, "abc\ndef") == (
0,
1,
"",
True,
)
+30
View File
@@ -5,6 +5,8 @@ Tests the _validate_setting_value method to ensure proper input validation
for all configurable settings that could cause crashes or accessibility issues.
"""
from argparse import Namespace
import pytest
import sys
from pathlib import Path
@@ -71,6 +73,10 @@ class TestSpeechSettingsValidation:
# Valid drivers
self.manager._validate_setting_value("speech", "driver", "speechdDriver")
self.manager._validate_setting_value("speech", "driver", "genericDriver")
self.manager._validate_setting_value("speech", "driver", "dectalkDriver")
self.manager._validate_setting_value("speech", "driver", "doubletalkDriver")
self.manager._validate_setting_value("speech", "driver", "litetalkDriver")
self.manager._validate_setting_value("speech", "driver", "tripletalkDriver")
self.manager._validate_setting_value("speech", "driver", "dummyDriver")
# Invalid driver
@@ -202,6 +208,30 @@ def test_focus_settings_define_tui_toggle():
assert settings_data["focus"]["tui"] is False
@pytest.mark.unit
@pytest.mark.settings
def test_format_cli_args_reports_startup_flags_in_stable_order():
manager = SettingsManager()
cli_args = Namespace(
debug=True,
foreground=False,
force_all_screens=False,
ignore_screen=["7"],
options="speech#rate=1.2",
print=False,
setting="/tmp/settings.conf",
x11=True,
x11_window_id="0x123",
)
assert manager.format_cli_args(cli_args) == (
"{'debug': True, 'force_all_screens': False, 'foreground': False, "
"'ignore_screen': ['7'], 'options': 'speech#rate=1.2', "
"'print': False, 'setting': '/tmp/settings.conf', 'x11': True, "
"'x11_window_id': '0x123'}"
)
@pytest.mark.unit
@pytest.mark.settings
class TestSettingsPathSelection:
+175
View File
@@ -0,0 +1,175 @@
from unittest.mock import Mock
import pytest
from fenrirscreenreader.core.speechHistoryManager import SpeechHistoryManager
def build_speech_history_manager(history_size=3):
spoken_messages = []
output_manager = Mock()
def _capture_message(message, **_kwargs):
spoken_messages.append(message)
output_manager.present_text.side_effect = _capture_message
settings_manager = Mock()
settings_manager.get_setting_as_int.return_value = history_size
memory_manager = Mock(add_value_to_first_index=Mock())
input_manager = Mock(reset_input_state=Mock())
input_driver = Mock(refresh_grabs=Mock())
env = {
"runtime": {
"OutputManager": output_manager,
"SettingsManager": settings_manager,
"MemoryManager": memory_manager,
"InputManager": input_manager,
"InputDriver": input_driver,
},
"bindings": {"original": "COMMAND"},
"rawBindings": {"original": [1, ["KEY_FENRIR"]]},
}
manager = SpeechHistoryManager()
manager.initialize(env)
return manager, env, spoken_messages, memory_manager
@pytest.mark.unit
def test_speech_history_keeps_configured_number_of_items():
manager, _env, _spoken_messages, _memory_manager = (
build_speech_history_manager(history_size=2)
)
assert manager.add_text("one")
assert manager.add_text("two")
assert manager.add_text("three")
assert manager.history == ["three", "two"]
@pytest.mark.unit
def test_speech_history_suppresses_exact_duplicates_until_item_drops():
manager, _env, _spoken_messages, _memory_manager = (
build_speech_history_manager(history_size=2)
)
assert manager.add_text("hello world")
assert not manager.add_text("hello world")
assert manager.add_text("other")
assert manager.add_text("third")
assert manager.add_text("hello world")
assert manager.history == ["hello world", "third"]
@pytest.mark.unit
def test_speech_history_dedupe_keeps_case_and_suppresses_whitespace_variants():
manager, _env, _spoken_messages, _memory_manager = (
build_speech_history_manager()
)
assert manager.add_text("hello")
assert manager.add_text("Hello")
assert not manager.add_text("hello ")
assert not manager.add_text("hello ")
assert manager.history == ["Hello", "hello"]
@pytest.mark.unit
def test_open_empty_history_announces_empty_without_modal_bindings():
manager, env, spoken_messages, _memory_manager = (
build_speech_history_manager()
)
assert not manager.open_history()
assert not manager.is_active()
assert spoken_messages == ["speech history empty"]
assert env["bindings"] == {"original": "COMMAND"}
@pytest.mark.unit
def test_open_history_installs_modal_bindings_and_replay_is_not_recorded():
manager, env, spoken_messages, _memory_manager = (
build_speech_history_manager()
)
env["rawBindings"]["ctrl_shut_up"] = [1, ["KEY_CTRL"]]
manager.add_text("first")
manager.add_text("second")
assert manager.open_history()
manager.add_text("replayed")
assert manager.is_active()
assert spoken_messages == ["Speech history"]
assert manager.curr_index == -1
assert manager.history == ["second", "first"]
assert "original" not in env["bindings"]
assert env["rawBindings"]["original"] == [1, ["KEY_FENRIR"]]
assert env["rawBindings"]["ctrl_shut_up"] == [1, ["KEY_CTRL"]]
assert env["bindings"][str([1, ["KEY_UP"]])] == "SPEECH_HISTORY_PREV"
assert env["bindings"][str([1, ["KEY_ENTER"]])] == "SPEECH_HISTORY_COPY"
assert env["bindings"][str([1, ["KEY_ESC"]])] == "SPEECH_HISTORY_CLOSE"
assert env["rawBindings"][str([1, ["KEY_UP"]])] == [1, ["KEY_UP"]]
input_driver = env["runtime"]["InputDriver"]
input_driver.refresh_grabs.assert_called_once_with(force=True)
@pytest.mark.unit
def test_navigation_moves_between_newer_and_older_items():
manager, _env, spoken_messages, _memory_manager = (
build_speech_history_manager()
)
manager.add_text("oldest")
manager.add_text("middle")
manager.add_text("newest")
manager.open_history()
manager.prev_entry()
manager.prev_entry()
manager.prev_entry()
manager.next_entry()
assert spoken_messages[-4:] == ["newest", "middle", "oldest", "middle"]
@pytest.mark.unit
def test_copy_current_adds_clipboard_and_restores_bindings():
manager, env, spoken_messages, memory_manager = (
build_speech_history_manager()
)
manager.add_text("first")
manager.add_text("second")
manager.open_history()
manager.prev_entry()
manager.prev_entry()
manager.copy_current_to_clipboard()
memory_manager.add_value_to_first_index.assert_called_once_with(
"clipboardHistory", "first"
)
assert spoken_messages[-1] == "copied to clipboard"
assert not manager.is_active()
assert env["bindings"] == {"original": "COMMAND"}
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
@pytest.mark.unit
def test_close_history_restores_keyboard_state_and_grabs():
manager, env, _spoken_messages, _memory_manager = (
build_speech_history_manager()
)
manager.add_text("first")
manager.open_history()
manager.close_history()
assert not manager.is_active()
assert env["bindings"] == {"original": "COMMAND"}
assert env["rawBindings"] == {"original": [1, ["KEY_FENRIR"]]}
env["runtime"]["InputManager"].reset_input_state.assert_called_once_with()
assert env["runtime"]["InputDriver"].refresh_grabs.call_count == 2
+87
View File
@@ -140,6 +140,93 @@ def test_candidate_list_speaks_visible_list_without_cursor_advance():
assert manager.process_update() == "Documents/ Downloads/"
@pytest.mark.unit
def test_full_screen_scroll_speaks_only_inserted_candidate_list():
old_text = "\n".join(
[
"old top".ljust(30),
"old middle".ljust(30),
"old lower".ljust(30),
"old bottom".ljust(30),
"$ ./Toby".ljust(30),
]
)
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 4})
manager.capture_if_tab()
_set_screen_update(
env,
"\n".join(
[
"old middle".ljust(30),
"old lower".ljust(30),
"old bottom".ljust(30),
"TobyAccMod_V10-0.pk3 TobyConfig.ini".ljust(30),
"$ ./Toby".ljust(30),
]
),
{"x": 8, "y": 4},
delta="\n".join(
[
"old middle",
"old lower",
"old bottom",
"TobyAccMod_V10-0.pk3 TobyConfig.ini",
"$ ./Toby",
]
),
)
assert manager.process_update() == "TobyAccMod_V10-0.pk3 TobyConfig.ini"
@pytest.mark.unit
def test_same_height_repaint_without_inserted_candidates_stays_silent():
old_text = "\n".join(
[
"old top".ljust(30),
"old middle".ljust(30),
"old lower".ljust(30),
"$ ./Toby".ljust(30),
]
)
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 3})
manager.capture_if_tab()
_set_screen_update(
env,
"\n".join(
[
"status line".ljust(30),
"old prompt history".ljust(30),
"unrelated output".ljust(30),
"$ ./Toby".ljust(30),
]
),
{"x": 8, "y": 3},
delta="status line\nold prompt history\nunrelated output\n$ ./Toby",
)
assert manager.process_update() == ""
@pytest.mark.unit
def test_recent_tab_does_not_speak_delta_without_detected_completion():
old_text = "\n".join(["$ ./Toby".ljust(20), "".ljust(20)])
manager, env, _input_manager = _build_env(old_text, {"x": 8, "y": 0})
manager.capture_if_tab()
_set_screen_update(
env,
old_text,
{"x": 8, "y": 0},
delta="old unrelated screen content\n$ ./Toby",
)
assert manager.process_update() == ""
assert env["commandBuffer"]["tabCompletion"]["pending"] is not None
@pytest.mark.unit
def test_no_screen_change_stays_silent_and_keeps_pending_briefly():
manager, env, _input_manager = _build_env(
+223
View File
@@ -0,0 +1,223 @@
from unittest.mock import Mock, mock_open, patch
from fenrirscreenreader.remoteDriver import unixDriver
class FakeClientSocket:
def __init__(self, data):
self.data = data
self.sent = b""
def recv(self, _size):
return self.data
def sendall(self, data):
self.sent += data
def test_main_socket_routes_to_ancestor_private_socket(mock_environment):
driver = unixDriver.driver()
driver.env = mock_environment
driver.bound_sockets = [
(Mock(), "/tmp/fenrirscreenreader-999.sock"),
(Mock(), unixDriver.MAIN_SOCKET_FILE),
]
client_sock = FakeClientSocket(b"command say routed")
event_queue = Mock()
with patch.object(driver, "_get_peer_pid", return_value=1234), patch.object(
driver,
"_find_ancestor_private_socket",
return_value="/tmp/fenrirscreenreader-222.sock",
), patch.object(
driver,
"_find_available_private_socket",
return_value="/tmp/fenrirscreenreader-222.sock",
), patch.object(
driver, "_forward_remote_to_socket", return_value=True
) as forward:
driver._handle_client(
client_sock, event_queue, unixDriver.MAIN_SOCKET_FILE
)
forward.assert_called_once_with(
"command say routed", "/tmp/fenrirscreenreader-222.sock"
)
event_queue.put.assert_not_called()
def test_main_socket_routes_to_first_available_private_socket(
mock_environment,
):
driver = unixDriver.driver()
driver.env = mock_environment
driver.bound_sockets = [
(Mock(), "/tmp/fenrirscreenreader-999.sock"),
(Mock(), unixDriver.MAIN_SOCKET_FILE),
]
client_sock = FakeClientSocket(b"command say fallback")
event_queue = Mock()
with patch.object(driver, "_get_peer_pid", return_value=1234), patch.object(
driver,
"_find_ancestor_private_socket",
return_value="",
), patch.object(
driver,
"_find_available_private_socket",
return_value="/tmp/fenrirscreenreader-333.sock",
), patch.object(
driver, "_forward_remote_to_socket", return_value=True
) as forward:
driver._handle_client(
client_sock, event_queue, unixDriver.MAIN_SOCKET_FILE
)
forward.assert_called_once_with(
"command say fallback", "/tmp/fenrirscreenreader-333.sock"
)
event_queue.put.assert_not_called()
def test_main_socket_handles_command_locally_without_available_target(
mock_environment,
):
driver = unixDriver.driver()
driver.env = mock_environment
client_sock = FakeClientSocket(b"command say local")
event_queue = Mock()
with patch.object(driver, "_get_peer_pid", return_value=1234), patch.object(
driver,
"_find_available_private_socket",
return_value="",
):
driver._handle_client(
client_sock, event_queue, unixDriver.MAIN_SOCKET_FILE
)
event_queue.put.assert_called_once_with(
{
"Type": unixDriver.FenrirEventType.remote_incomming,
"data": "command say local",
}
)
def test_vcsa_main_socket_owner_handles_command_locally(mock_environment):
driver = unixDriver.driver()
driver.env = mock_environment
driver.bound_sockets = [(Mock(), unixDriver.MAIN_SOCKET_FILE)]
client_sock = FakeClientSocket(b"command say root")
event_queue = Mock()
with patch.object(driver, "_find_available_private_socket") as find_available:
driver._handle_client(
client_sock, event_queue, unixDriver.MAIN_SOCKET_FILE
)
find_available.assert_not_called()
event_queue.put.assert_called_once_with(
{
"Type": unixDriver.FenrirEventType.remote_incomming,
"data": "command say root",
}
)
def test_private_socket_handles_command_locally(mock_environment):
driver = unixDriver.driver()
driver.env = mock_environment
client_sock = FakeClientSocket(b"command say private")
event_queue = Mock()
driver._handle_client(
client_sock, event_queue, "/tmp/fenrirscreenreader-222.sock"
)
event_queue.put.assert_called_once_with(
{
"Type": unixDriver.FenrirEventType.remote_incomming,
"data": "command say private",
}
)
def test_list_command_still_returns_response_from_main_socket(mock_environment):
driver = unixDriver.driver()
driver.env = mock_environment
mock_environment["runtime"]["RemoteManager"] = Mock()
client_sock = FakeClientSocket(b"command list")
event_queue = Mock()
mock_environment["runtime"][
"RemoteManager"
].handle_remote_incomming_with_response.return_value = {
"success": True,
"message": "pid=1",
}
driver._handle_client(client_sock, event_queue, unixDriver.MAIN_SOCKET_FILE)
assert client_sock.sent == b"pid=1\n"
event_queue.put.assert_not_called()
def test_get_parent_pid_parses_process_names_with_spaces():
driver = unixDriver.driver()
stat_file = mock_open(
read_data="1234 (name with spaces) S 567 1 1 0 -1 0\n"
)
with patch("builtins.open", stat_file):
assert driver._get_parent_pid(1234) == 567
def test_find_available_private_socket_prefers_ancestor_socket(
mock_environment,
):
driver = unixDriver.driver()
driver.env = mock_environment
with patch(
"fenrirscreenreader.remoteDriver.unixDriver.remoteInstanceRegistry.list_instances",
return_value=[
{
"pid": 111,
"socket_files": ["/tmp/fenrirscreenreader-111.sock"],
},
{
"pid": 222,
"socket_files": ["/tmp/fenrirscreenreader-222.sock"],
},
],
), patch.object(driver, "_is_socket_active", return_value=True):
assert (
driver._find_available_private_socket(
"/tmp/fenrirscreenreader-222.sock"
)
== "/tmp/fenrirscreenreader-222.sock"
)
def test_find_available_private_socket_skips_main_socket(
mock_environment,
):
driver = unixDriver.driver()
driver.env = mock_environment
with patch(
"fenrirscreenreader.remoteDriver.unixDriver.remoteInstanceRegistry.list_instances",
return_value=[
{
"pid": 111,
"socket_files": [
unixDriver.MAIN_SOCKET_FILE,
"/tmp/fenrirscreenreader-111.sock",
],
}
],
), patch.object(driver, "_is_socket_active", return_value=True):
assert (
driver._find_available_private_socket()
== "/tmp/fenrirscreenreader-111.sock"
)
+91 -2
View File
@@ -217,6 +217,47 @@ def test_x11_build_passive_grabs_for_fenrir_keys_and_shortcuts():
assert ("KEY_BACKSPACE", X.Mod4Mask, True) in grabs
@pytest.mark.unit
@pytest.mark.parametrize(
"modifier_mask",
[X.ControlMask, X.ShiftMask, X.Mod1Mask],
)
def test_x11_poll_modifier_interrupt_keys_interrupts_without_input_events(
modifier_mask,
):
x11 = X11Driver()
x11.active = True
x11.modifier_interrupt_state = 0
x11.modifier_state = 0
x11.root = Mock()
x11.root.query_pointer.return_value = Mock(mask=modifier_mask)
settings_manager = Mock()
settings_manager.get_setting_as_bool.return_value = True
settings_manager.get_setting.return_value = ""
output_manager = Mock()
x11.env = {
"input": {"event_buffer": []},
"runtime": {
"SettingsManager": settings_manager,
"OutputManager": output_manager,
"DebugManager": Mock(),
},
}
x11.poll_modifier_interrupt_keys()
output_manager.interrupt_output_async.assert_called_once()
assert x11.env["input"]["event_buffer"] == []
output_manager.interrupt_output_async.reset_mock()
x11.root.query_pointer.return_value = Mock(mask=0)
x11.poll_modifier_interrupt_keys()
output_manager.interrupt_output_async.assert_not_called()
assert x11.env["input"]["event_buffer"] == []
@pytest.mark.unit
def test_x11_optional_modifier_masks_can_exclude_numlock():
x11 = X11Driver()
@@ -284,16 +325,64 @@ def test_x11_parse_window_id_accepts_decimal_and_hex():
@pytest.mark.unit
def test_x11_write_event_buffer_clears_buffer():
def test_x11_write_event_buffer_replays_grabbed_key_press():
x11 = X11Driver()
x11._initialized = True
x11.env = {"input": {"event_buffer": ["KEY_KP0"]}}
x11.display = Mock()
x11.env = {
"input": {
"event_buffer": [
{
"event_name": "KEY_LEFTMETA",
"event_state": 1,
"event_type": X.KeyPress,
"event_x_time": 1234,
}
]
}
}
x11.write_event_buffer()
x11.display.allow_events.assert_called_once_with(X.ReplayKeyboard, 1234)
assert x11.env["input"]["event_buffer"] == []
@pytest.mark.unit
def test_x11_write_event_buffer_does_not_replay_key_release():
x11 = X11Driver()
x11._initialized = True
x11.display = Mock()
x11.env = {
"input": {
"event_buffer": [
{
"event_name": "KEY_LEFTMETA",
"event_state": 0,
"event_type": X.KeyRelease,
"event_x_time": 1235,
}
]
}
}
x11.write_event_buffer()
x11.display.allow_events.assert_not_called()
assert x11.env["input"]["event_buffer"] == []
@pytest.mark.unit
def test_x11_map_event_keeps_x_event_time_for_replay():
x11 = X11Driver()
x11.keycode_to_key_name = Mock(return_value="KEY_LEFTMETA")
event = Mock(type=X.KeyPress, detail=133, state=X.Mod4Mask, time=5678)
input_event = x11.map_event(event)
assert input_event["event_x_time"] == 5678
@pytest.mark.unit
def test_x11_find_num_lock_mask_uses_modifier_mapping():
x11 = X11Driver()
+3 -2
View File
@@ -1,8 +1,9 @@
#!/bin/bash
# shellcheck disable=SC2329
cleanup() {
# Make sure Fenrir is restored on exit of this script
echo -n "setting set screen#suspendingScreen=" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo -n "setting set screen#suspendingScreen=" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
}
# Call the cleanup function on exit of this script
@@ -20,7 +21,7 @@ if ! [[ "$term" =~ ^[1-9]+$ ]]; then
fi
# Suspend the current terminal for Fenrir
echo -n "setting set screen#suspendingScreen=$term" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-deamon.sock
echo -n "setting set screen#suspendingScreen=$term" | socat - UNIX-CLIENT:/tmp/fenrirscreenreader-daemon.sock
# Start the x session
command startx