Files
fenrir/src/fenrirscreenreader/soundDriver/gstreamerDriver.py
2025-07-07 00:42:23 -04:00

134 lines
4.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Fenrir TTY screen reader
# By Chrys, Storm Dragon, and contributors.
import threading
import time
from fenrirscreenreader.core import debug
from fenrirscreenreader.core.soundDriver import sound_driver
_gstreamerAvailable = False
try:
import gi
from gi.repository import GLib
gi.require_version("Gst", "1.0")
from gi.repository import Gst
_gstreamerAvailable, args = Gst.init_check(None)
except Exception as e:
_gstreamerAvailable = False
_availableError = str(e)
class driver(sound_driver):
def __init__(self):
sound_driver.__init__(self)
self._source = None
self._sink = None
def initialize(self, environment):
self.env = environment
global _gstreamerAvailable
self._initialized = _gstreamerAvailable
if not self._initialized:
global _availableError
self.environment["runtime"]["DebugManager"].write_debug_out(
"Gstreamer not available " + _availableError,
debug.DebugLevel.ERROR,
)
return
self._player = Gst.ElementFactory.make("playbin", "player")
bus = self._player.get_bus()
bus.add_signal_watch()
bus.connect("message", self._on_player_message)
self._pipeline = Gst.Pipeline(name="fenrir-pipeline")
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._on_pipeline_message)
self._source = Gst.ElementFactory.make("audiotestsrc", "src")
self._sink = Gst.ElementFactory.make("autoaudiosink", "output")
self._pipeline.add(self._source)
self._pipeline.add(self._sink)
self._source.link(self._sink)
self.mainloop = GLib.MainLoop()
self.thread = threading.Thread(target=self.mainloop.run)
self.thread.start()
def shutdown(self):
if not self._initialized:
return
self.cancel()
self.mainloop.quit()
# Wait for the GLib MainLoop thread to finish to prevent shutdown races
if hasattr(self, "thread") and self.thread.is_alive():
# 2 second timeout to prevent hanging
self.thread.join(timeout=2.0)
def _on_player_message(self, bus, message):
if not self._initialized:
return
if message.type == Gst.MessageType.EOS:
self._player.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
self._player.set_state(Gst.State.NULL)
error, info = message.parse_error()
self.env["runtime"]["DebugManager"].write_debug_out(
"GSTREAMER: _on_player_message" + str(error) + str(info),
debug.DebugLevel.WARNING,
)
def _on_pipeline_message(self, bus, message):
if not self._initialized:
return
if message.type == Gst.MessageType.EOS:
self._pipeline.set_state(Gst.State.NULL)
elif message.type == Gst.MessageType.ERROR:
self._pipeline.set_state(Gst.State.NULL)
error, info = message.parse_error()
self.env["runtime"]["DebugManager"].write_debug_out(
"GSTREAMER: _on_pipeline_message" + str(error) + str(info),
debug.DebugLevel.WARNING,
)
def _on_timeout(self, element):
if not self._initialized:
return
element.set_state(Gst.State.NULL)
def play_sound_file(self, file_name, interrupt=True):
if not self._initialized:
return
if interrupt:
self.cancel()
self._player.set_property("volume", self.volume)
self._player.set_property("uri", "file://%s" % file_name)
self._player.set_state(Gst.State.PLAYING)
def play_frequence(
self, frequence, duration, adjust_volume=0.0, interrupt=True
):
if not self._initialized:
return
if interrupt:
self.cancel()
duration = duration * 1000
self._source.set_property("volume", self.volume * adjust_volume)
self._source.set_property("freq", frequence)
self._pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add(duration, self._on_timeout, self._pipeline)
def cancel(self, element=None):
if not self._initialized:
return
if element:
element.set_state(Gst.State.NULL)
return
self._player.set_state(Gst.State.NULL)
self._pipeline.set_state(Gst.State.NULL)