#!/usr/bin/env python3 import os import re import secrets import signal import string import subprocess import sys import tempfile import time import pwd import threading stormuxAdmin = ("storm",) ircServer = "irc.stormux.org" ircPort = 6667 ircChannel = "#stormux" remoteHost = "billysballoons.com" sasUser = "sas" pingIntervalSeconds = 180 pingCount = 5 maxWormholeFailures = 3 sudoKeepaliveThread = None sudoKeepaliveStop = threading.Event() def speak_message(message): try: subprocess.run(["spd-say", message], check=False) except FileNotFoundError: print(message, flush=True) def say_or_print(message, useSpeech): if useSpeech: speak_message(message) else: print(message, flush=True) def run_command(command, inputText=None, check=False, env=None): return subprocess.run( command, input=inputText, text=True, capture_output=True, check=check, env=env, ) def ensure_sudo(useSpeech): if os.geteuid() == 0: return True if useSpeech: speak_message("Sudo password required. Please enter your password now.") result = run_command(["sudo", "-v"]) if result.returncode == 0: start_sudo_keepalive() return True return False def start_sudo_keepalive(): global sudoKeepaliveThread if sudoKeepaliveThread and sudoKeepaliveThread.is_alive(): return def keepalive_loop(): while not sudoKeepaliveStop.wait(240): run_command(["sudo", "-n", "-v"]) sudoKeepaliveThread = threading.Thread(target=keepalive_loop, daemon=True) sudoKeepaliveThread.start() def run_privileged(command, useSpeech, inputText=None, check=True): if os.geteuid() == 0: fullCommand = command else: if not ensure_sudo(useSpeech): raise RuntimeError("sudo authentication failed") fullCommand = ["sudo"] + command return run_command(fullCommand, inputText=inputText, check=check) def run_as_user(userName, command, useSpeech, check=True): if os.geteuid() == 0: fullCommand = ["sudo", "-u", userName, "-H"] + command else: if not ensure_sudo(useSpeech): raise RuntimeError("sudo authentication failed") fullCommand = ["sudo", "-u", userName, "-H"] + command return run_command(fullCommand, check=check) def user_exists(userName): result = run_command(["getent", "passwd", userName]) return result.returncode == 0 def ensure_wheel(userName, useSpeech): result = run_command(["id", "-nG", userName]) groups = result.stdout.strip().split() if "wheel" not in groups: run_privileged(["usermod", "-a", "-G", "wheel", userName], useSpeech) def generate_password(): allowedChars = string.ascii_letters + string.digits length = secrets.randbelow(5) + 6 return "".join(secrets.choice(allowedChars) for _ in range(length)) def get_user_home(userName): return pwd.getpwnam(userName).pw_dir def set_password(userName, password, useSpeech): run_privileged(["chpasswd"], useSpeech, inputText=f"{userName}:{password}\n") def generate_ssh_key(userName, useSpeech): userHome = get_user_home(userName) sshDir = os.path.join(userHome, ".ssh") privateKeyPath = os.path.join(sshDir, "id_ed25519") publicKeyPath = f"{privateKeyPath}.pub" run_privileged(["mkdir", "-p", sshDir], useSpeech) run_privileged(["chown", f"{userName}:{userName}", sshDir], useSpeech) run_privileged(["chmod", "700", sshDir], useSpeech) for entry in list_subdirs(sshDir): if os.path.isfile(entry) or os.path.islink(entry): run_privileged(["rm", "-f", entry], useSpeech, check=False) run_as_user( userName, ["ssh-keygen", "-t", "ed25519", "-N", "", "-f", privateKeyPath], useSpeech, ) run_privileged(["chmod", "600", privateKeyPath], useSpeech) run_privileged(["chmod", "644", publicKeyPath], useSpeech) run_privileged(["chown", f"{userName}:{userName}", privateKeyPath, publicKeyPath], useSpeech) return privateKeyPath, publicKeyPath def path_exists_for_user(path, userName, useSpeech): result = run_as_user(userName, ["stat", path], useSpeech, check=False) return result.returncode == 0 def extract_message_text(line): if "> " in line: return line.split("> ", 1)[1].strip() if ": " in line: return line.split(": ", 1)[1].strip() return line.strip() def parse_sender(line): match = re.search(r"<([^>]+)>", line) if match: return match.group(1) return None def find_wormhole_code(message): lowered = message.strip().lower() if lowered in ("yes", "accept"): return None match = re.search(r"\b\d+-[A-Za-z0-9-]+\b", message) if match: return match.group(0) return None class IrcSession: def __init__(self, server, port, nick, channel, baseDir): self.server = server self.port = port self.nick = nick self.channel = channel self.baseDir = baseDir self.serverDir = None self.serverInPath = None self.channelInPath = None self.iiProcess = None self.pmOffsets = {} def start(self): if not shutil_which("ii"): raise RuntimeError("ii is not installed") supportsI = ii_supports_i() processEnv = os.environ.copy() iiCommand = ["ii", "-s", self.server, "-p", str(self.port), "-n", self.nick] if supportsI: iiCommand += ["-i", self.baseDir] else: processEnv["HOME"] = self.baseDir self.iiProcess = subprocess.Popen( iiCommand, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=processEnv, ) self.serverDir = self.wait_for_server_dir() self.serverInPath = os.path.join(self.serverDir, "in") def stop(self): if self.channelInPath and os.path.exists(self.channelInPath): try: self.write_line(self.channelInPath, f"/part {self.channel}") except OSError: pass if self.serverInPath and os.path.exists(self.serverInPath): try: self.write_line(self.serverInPath, "/quit") except OSError: pass if self.iiProcess and self.iiProcess.poll() is None: self.iiProcess.terminate() try: self.iiProcess.wait(timeout=5) except subprocess.TimeoutExpired: self.iiProcess.kill() def join_channel(self): joinMessage = f"/join {self.channel}" channelDir = os.path.join(self.serverDir, self.channel) channelAltDir = os.path.join(self.serverDir, self.channel.lstrip("#")) startTime = time.monotonic() nextJoinTime = startTime while time.monotonic() - startTime < 60: if time.monotonic() >= nextJoinTime: self.write_line(self.serverInPath, joinMessage) nextJoinTime = time.monotonic() + 5 for candidate in (channelDir, channelAltDir): inPath = os.path.join(candidate, "in") if os.path.exists(inPath): self.channelInPath = inPath return time.sleep(0.5) self.channelInPath = None def send_channel_message(self, message): if not self.channelInPath: self.refresh_channel_in_path() if self.channelInPath and os.path.exists(self.channelInPath): self.write_line(self.channelInPath, message) else: self.write_line(self.serverInPath, f"/msg {self.channel} {message}") def send_private_message(self, nick, message): nickDir = os.path.join(self.serverDir, nick) inPath = os.path.join(nickDir, "in") if os.path.exists(inPath): self.write_line(inPath, message) else: self.write_line(self.serverInPath, f"/msg {nick} {message}") def get_private_messages(self, allowedUsers): messages = [] for nick in allowedUsers: nickDir = os.path.join(self.serverDir, nick) outPath = os.path.join(nickDir, "out") if not os.path.exists(outPath): continue lastPos = self.pmOffsets.get(outPath, 0) with open(outPath, "r", encoding="utf-8", errors="ignore") as fileHandle: fileHandle.seek(lastPos) for line in fileHandle: sender = parse_sender(line) if sender and sender == self.nick: continue if sender and sender != nick: continue messageText = extract_message_text(line) if messageText: messages.append((nick, messageText)) self.pmOffsets[outPath] = fileHandle.tell() return messages def refresh_channel_in_path(self): channelDir = os.path.join(self.serverDir, self.channel) channelAltDir = os.path.join(self.serverDir, self.channel.lstrip("#")) for candidate in (channelDir, channelAltDir): inPath = os.path.join(candidate, "in") if os.path.exists(inPath): self.channelInPath = inPath return def wait_for_server_dir(self): for _ in range(120): for rootDir in [self.baseDir] + list_subdirs(self.baseDir): if not os.path.isdir(rootDir): continue for entry in os.listdir(rootDir): path = os.path.join(rootDir, entry) if os.path.isdir(path) and self.server in entry: inPath = os.path.join(path, "in") if os.path.exists(inPath): return path time.sleep(0.5) raise RuntimeError("ii server directory not found") @staticmethod def write_line(path, message): with open(path, "w", encoding="utf-8", errors="ignore") as fileHandle: fileHandle.write(message + "\n") fileHandle.flush() def ii_supports_i(): result = run_command(["ii", "-h"]) output = (result.stdout or "") + (result.stderr or "") return "-i" in output def list_subdirs(path): try: return [os.path.join(path, entry) for entry in os.listdir(path)] except OSError: return [] def shutil_which(command): for path in os.environ.get("PATH", "").split(os.pathsep): candidate = os.path.join(path, command) if os.path.isfile(candidate) and os.access(candidate, os.X_OK): return candidate return None def build_nick(): baseUser = os.environ.get("SUDO_USER") or os.environ.get("USER") or "sas" return f"{baseUser}-{int(time.time())}" def main(): say_or_print("Checking accessibility. Is your screen reader working? (y/n)", True) answer = input().strip().lower() useSpeech = answer in ("n", "no") shouldRemoveUser = False cleanupDone = False tempDir = tempfile.mkdtemp(prefix="sas-ii-") ircSession = None sshProcess = None def cleanup(exitMessage=None): nonlocal cleanupDone if cleanupDone: return cleanupDone = True nonlocal sshProcess if exitMessage: say_or_print(exitMessage, useSpeech) if sshProcess and sshProcess.poll() is None: sshProcess.terminate() try: sshProcess.wait(timeout=5) except subprocess.TimeoutExpired: sshProcess.kill() if ircSession: ircSession.stop() if shouldRemoveUser: try: run_privileged(["pkill", "-u", sasUser], useSpeech, check=False) time.sleep(1) result = run_privileged(["userdel", "-r", sasUser], useSpeech, check=False) run_privileged(["rm", "-rf", f"/home/{sasUser}"], useSpeech, check=False) if result.returncode != 0 and user_exists(sasUser): say_or_print( "Cleanup warning: failed to remove sas user. Please remove it manually.", useSpeech, ) except Exception: pass sudoKeepaliveStop.set() if sudoKeepaliveThread: sudoKeepaliveThread.join(timeout=2) try: remove_tree(tempDir) except Exception: pass def handle_signal(signum, frame): cleanup("Interrupted. Cleaning up.") sys.exit(1) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) try: if not user_exists(sasUser): run_privileged( ["useradd", "-m", "-d", f"/home/{sasUser}", "-s", "/bin/bash", "-G", "wheel", sasUser], useSpeech, ) shouldRemoveUser = True else: say_or_print( "User 'sas' exists. Remove and recreate it? This will delete /home/sas. (y/n)", useSpeech, ) response = input().strip().lower() if response not in ("y", "yes"): cleanup("The sas user is unavailable. Remove it manually and try again.") return 1 run_privileged(["pkill", "-u", sasUser], useSpeech, check=False) run_privileged(["userdel", "-r", sasUser], useSpeech, check=False) run_privileged(["rm", "-rf", f"/home/{sasUser}"], useSpeech, check=False) run_privileged( ["useradd", "-m", "-d", f"/home/{sasUser}", "-s", "/bin/bash", "-G", "wheel", sasUser], useSpeech, ) shouldRemoveUser = True ensure_wheel(sasUser, useSpeech) password = generate_password() set_password(sasUser, password, useSpeech) privateKeyPath, publicKeyPath = generate_ssh_key(sasUser, useSpeech) sasHome = get_user_home(sasUser) knownHostsPath = os.path.join(sasHome, ".ssh", "known_hosts_sas") run_privileged(["touch", knownHostsPath], useSpeech) run_privileged(["chmod", "600", knownHostsPath], useSpeech) run_privileged(["chown", f"{sasUser}:{sasUser}", knownHostsPath], useSpeech) nick = build_nick() ircSession = IrcSession(ircServer, ircPort, nick, ircChannel, tempDir) ircSession.start() ircSession.join_channel() say_or_print("Waiting for assistance on IRC.", useSpeech) startTime = time.monotonic() nextPingTime = startTime pingsSent = 0 confirmedAdmin = None while time.monotonic() - startTime < pingIntervalSeconds * pingCount: now = time.monotonic() if pingsSent < pingCount and now >= nextPingTime: ircSession.send_channel_message(f"{nick} is requesting assistance.") pingsSent += 1 nextPingTime = startTime + (pingsSent * pingIntervalSeconds) for adminNick, messageText in ircSession.get_private_messages(stormuxAdmin): if messageText.strip().lower() in ("yes", "accept"): confirmedAdmin = adminNick break if confirmedAdmin: break time.sleep(1) if not confirmedAdmin: cleanup("No one was available to help, please try again later.") return 1 ircSession.send_private_message( confirmedAdmin, f'password: "{password}" please send wormhole ssh invite code', ) failures = 0 while failures < maxWormholeFailures: inviteCode = None while inviteCode is None: for adminNick, messageText in ircSession.get_private_messages(stormuxAdmin): inviteCode = find_wormhole_code(messageText) if inviteCode: break if inviteCode: break time.sleep(1) if not path_exists_for_user(publicKeyPath, sasUser, useSpeech): raise RuntimeError(f"Public key missing: {publicKeyPath}") wormholeCommand = [ "wormhole", "ssh", "accept", "--yes", inviteCode, ] result = run_as_user(sasUser, wormholeCommand, useSpeech, check=False) if result.returncode == 0: say_or_print("Wormhole key transfer succeeded.", useSpeech) break failures += 1 errorTextFull = (result.stderr or result.stdout or "").strip() if errorTextFull and not useSpeech: print("Wormhole ssh accept error:", flush=True) print(errorTextFull, flush=True) errorText = errorTextFull if errorText: errorText = " ".join(errorText.split()) if len(errorText) > 400: errorText = errorText[:400] + "..." ircSession.send_private_message( confirmedAdmin, f"Wormhole ssh accept failed: {errorText}", ) ircSession.send_private_message( confirmedAdmin, "Wormhole ssh accept failed. Please send a new invite code.", ) if failures >= maxWormholeFailures: cleanup("Wormhole failed too many times. Exiting.") return 1 say_or_print("Starting reverse SSH tunnel. Press Ctrl+C to stop.", useSpeech) sshCommand = [ "ssh", "-N", "-R", "localhost:2232:localhost:22", "-o", "ExitOnForwardFailure=yes", "-o", "BatchMode=yes", "-o", "ServerAliveInterval=30", "-o", "ServerAliveCountMax=3", "-o", "StrictHostKeyChecking=accept-new", "-o", f"UserKnownHostsFile={knownHostsPath}", "-i", privateKeyPath, f"{sasUser}@{remoteHost}", ] sshCommand = ["sudo", "-u", sasUser, "-H"] + sshCommand sshProcess = subprocess.Popen(sshCommand) sshProcess.wait() except Exception as exc: cleanup(f"Error: {exc}") return 1 finally: cleanup() return 0 def remove_tree(path): if not os.path.exists(path): return for rootDir, dirNames, fileNames in os.walk(path, topdown=False): for fileName in fileNames: try: os.unlink(os.path.join(rootDir, fileName)) except OSError: pass for dirName in dirNames: try: os.rmdir(os.path.join(rootDir, dirName)) except OSError: pass try: os.rmdir(path) except OSError: pass if __name__ == "__main__": sys.exit(main())