#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Fenrir TTY screen reader # By Chrys, Storm Dragon, and contributors. import signal import time from multiprocessing import Process from threading import Thread from fenrirscreenreader.core import debug from fenrirscreenreader.core.eventData import FenrirEventType # Standalone functions for multiprocessing (cannot be instance methods) def _heart_beat_timer(running): """ Standalone heartbeat timer function for multiprocessing. Returns current timestamp after a short sleep. """ try: time.sleep(0.5) except Exception as e: print(f"ProcessManager _heart_beat_timer: Error during sleep: {e}") return time.time() def _custom_event_worker_process( running, event_queue, function, pargs=None, run_once=False ): """ Standalone worker function for custom events in multiprocessing. Cannot use instance methods due to pickle limitations with self.env. """ if not callable(function): return while running.value: try: if pargs: function(running, event_queue, pargs) else: function(running, event_queue) except Exception as e: # Cannot use DebugManager in multiprocess context print( f"ProcessManager:_custom_event_worker_process:function(" f"{function}):{e}" ) if run_once: break def _simple_event_worker_process( running, event_queue, event, function, pargs=None, run_once=False ): """ Standalone worker function for simple events in multiprocessing. Cannot use instance methods due to pickle limitations with self.env. """ if not isinstance(event, FenrirEventType): return if not callable(function): return while running.value: data = None try: if pargs: data = function(running, pargs) else: data = function(running) except Exception as e: # Cannot use DebugManager in multiprocess context print( f"ProcessManager:_simple_event_worker_process:function(" f"{function}):{e}" ) try: event_queue.put({"Type": event, "data": data}, timeout=0.1) except Exception as e: print(f"ProcessManager: Failed to put event to queue: {e}") if run_once: break class ProcessManager: def __init__(self): self._Processes = [] self._Threads = [] def initialize(self, environment): self.env = environment self.running = self.env["runtime"]["EventManager"].get_running() self.add_simple_event_thread( FenrirEventType.heart_beat, _heart_beat_timer, multiprocess=True, ) def shutdown(self): self.terminate_all_processes() def terminate_all_processes(self): for proc in self._Processes: # try: # proc.terminate() # except KeyboardInterrupt: # pass # except: # pass proc.join(timeout=5.0) # Timeout to prevent hanging shutdown for t in self._Threads: t.join(timeout=5.0) # Timeout to prevent hanging shutdown def heart_beat_timer(self, active): try: time.sleep(0.5) except Exception as e: self.env["runtime"]["DebugManager"].write_debug_out( "ProcessManager heart_beat_timer: Error during sleep: " + str(e), debug.DebugLevel.ERROR, ) return time.time() def add_custom_event_thread( self, function, pargs=None, multiprocess=False, run_once=False ): event_queue = self.env["runtime"]["EventManager"].get_event_queue() original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) if multiprocess: t = Process( target=_custom_event_worker_process, args=(self.running, event_queue, function, pargs, run_once), ) self._Processes.append(t) else: # use thread instead of process t = Thread( target=self.custom_event_worker_thread, args=(event_queue, function, pargs, run_once), ) self._Threads.append(t) t.start() signal.signal(signal.SIGINT, original_sigint_handler) def add_simple_event_thread( self, event, function, pargs=None, multiprocess=False, run_once=False ): original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) if multiprocess: # Get event queue reference before creating process event_queue = self.env["runtime"]["EventManager"].get_event_queue() t = Process( target=_simple_event_worker_process, args=(self.running, event_queue, event, function, pargs, run_once), ) self._Processes.append(t) else: t = Thread( target=self.simple_event_worker_thread, args=(event, function, pargs, run_once), ) self._Threads.append(t) t.start() signal.signal(signal.SIGINT, original_sigint_handler) def custom_event_worker_thread( self, event_queue, function, pargs=None, run_once=False ): # if not isinstance(event_queue, Queue): # return if not callable(function): return while self.running.value: try: if pargs: function(self.running, event_queue, pargs) else: function(self.running, event_queue) except Exception as e: # Cannot use DebugManager in multiprocess context due to # pickle limitations with file handles print( "ProcessManager:custom_event_worker_thread:function(" + str(function) + "):" + str(e) ) if run_once: break def simple_event_worker_thread( self, event, function, pargs=None, run_once=False ): if not isinstance(event, FenrirEventType): return if not callable(function): return while self.running.value: data = None try: if pargs: data = function(self.running, pargs) else: data = function(self.running) except Exception as e: # Cannot use DebugManager in multiprocess context due to # pickle limitations with file handles print( "ProcessManager:simple_event_worker_thread:function(" + str(function) + "):" + str(e) ) self.env["runtime"]["EventManager"].put_to_event_queue(event, data) if run_once: break