From 9bba576acf18611a5dd65f6d7a9693f43efa7984 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Thu, 4 Jun 2026 20:32:59 -0400 Subject: [PATCH] Initial commit. --- sas.py | 582 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 582 insertions(+) create mode 100755 sas.py diff --git a/sas.py b/sas.py new file mode 100755 index 0000000..4b289d5 --- /dev/null +++ b/sas.py @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 + +import os +import re +import secrets +import signal +import string +import subprocess +import sys +import tempfile +import time +import pwd +import threading + + +stormuxAdmin = ("storm",) +ircServer = "irc.stormux.org" +ircPort = 6667 +ircChannel = "#stormux" +remoteHost = "billysballoons.com" +sasUser = "sas" +pingIntervalSeconds = 180 +pingCount = 5 +maxWormholeFailures = 3 + +sudoKeepaliveThread = None +sudoKeepaliveStop = threading.Event() + + +def speak_message(message): + try: + subprocess.run(["spd-say", message], check=False) + except FileNotFoundError: + print(message, flush=True) + + +def say_or_print(message, useSpeech): + if useSpeech: + speak_message(message) + else: + print(message, flush=True) + + +def run_command(command, inputText=None, check=False, env=None): + return subprocess.run( + command, + input=inputText, + text=True, + capture_output=True, + check=check, + env=env, + ) + + +def ensure_sudo(useSpeech): + if os.geteuid() == 0: + return True + if useSpeech: + speak_message("Sudo password required. Please enter your password now.") + result = run_command(["sudo", "-v"]) + if result.returncode == 0: + start_sudo_keepalive() + return True + return False + + +def start_sudo_keepalive(): + global sudoKeepaliveThread + if sudoKeepaliveThread and sudoKeepaliveThread.is_alive(): + return + + def keepalive_loop(): + while not sudoKeepaliveStop.wait(240): + run_command(["sudo", "-n", "-v"]) + + sudoKeepaliveThread = threading.Thread(target=keepalive_loop, daemon=True) + sudoKeepaliveThread.start() + + +def run_privileged(command, useSpeech, inputText=None, check=True): + if os.geteuid() == 0: + fullCommand = command + else: + if not ensure_sudo(useSpeech): + raise RuntimeError("sudo authentication failed") + fullCommand = ["sudo"] + command + return run_command(fullCommand, inputText=inputText, check=check) + + +def run_as_user(userName, command, useSpeech, check=True): + if os.geteuid() == 0: + fullCommand = ["sudo", "-u", userName, "-H"] + command + else: + if not ensure_sudo(useSpeech): + raise RuntimeError("sudo authentication failed") + fullCommand = ["sudo", "-u", userName, "-H"] + command + return run_command(fullCommand, check=check) + + +def user_exists(userName): + result = run_command(["getent", "passwd", userName]) + return result.returncode == 0 + + +def ensure_wheel(userName, useSpeech): + result = run_command(["id", "-nG", userName]) + groups = result.stdout.strip().split() + if "wheel" not in groups: + run_privileged(["usermod", "-a", "-G", "wheel", userName], useSpeech) + + +def generate_password(): + allowedChars = string.ascii_letters + string.digits + length = secrets.randbelow(5) + 6 + return "".join(secrets.choice(allowedChars) for _ in range(length)) + + +def get_user_home(userName): + return pwd.getpwnam(userName).pw_dir + + +def set_password(userName, password, useSpeech): + run_privileged(["chpasswd"], useSpeech, inputText=f"{userName}:{password}\n") + + +def generate_ssh_key(userName, useSpeech): + userHome = get_user_home(userName) + sshDir = os.path.join(userHome, ".ssh") + privateKeyPath = os.path.join(sshDir, "id_ed25519") + publicKeyPath = f"{privateKeyPath}.pub" + + run_privileged(["mkdir", "-p", sshDir], useSpeech) + run_privileged(["chown", f"{userName}:{userName}", sshDir], useSpeech) + run_privileged(["chmod", "700", sshDir], useSpeech) + for entry in list_subdirs(sshDir): + if os.path.isfile(entry) or os.path.islink(entry): + run_privileged(["rm", "-f", entry], useSpeech, check=False) + + run_as_user( + userName, + ["ssh-keygen", "-t", "ed25519", "-N", "", "-f", privateKeyPath], + useSpeech, + ) + + run_privileged(["chmod", "600", privateKeyPath], useSpeech) + run_privileged(["chmod", "644", publicKeyPath], useSpeech) + run_privileged(["chown", f"{userName}:{userName}", privateKeyPath, publicKeyPath], useSpeech) + + return privateKeyPath, publicKeyPath + + +def path_exists_for_user(path, userName, useSpeech): + result = run_as_user(userName, ["stat", path], useSpeech, check=False) + return result.returncode == 0 + + +def extract_message_text(line): + if "> " in line: + return line.split("> ", 1)[1].strip() + if ": " in line: + return line.split(": ", 1)[1].strip() + return line.strip() + + +def parse_sender(line): + match = re.search(r"<([^>]+)>", line) + if match: + return match.group(1) + return None + + +def find_wormhole_code(message): + lowered = message.strip().lower() + if lowered in ("yes", "accept"): + return None + match = re.search(r"\b\d+-[A-Za-z0-9-]+\b", message) + if match: + return match.group(0) + return None + + +class IrcSession: + def __init__(self, server, port, nick, channel, baseDir): + self.server = server + self.port = port + self.nick = nick + self.channel = channel + self.baseDir = baseDir + self.serverDir = None + self.serverInPath = None + self.channelInPath = None + self.iiProcess = None + self.pmOffsets = {} + + def start(self): + if not shutil_which("ii"): + raise RuntimeError("ii is not installed") + supportsI = ii_supports_i() + processEnv = os.environ.copy() + iiCommand = ["ii", "-s", self.server, "-p", str(self.port), "-n", self.nick] + if supportsI: + iiCommand += ["-i", self.baseDir] + else: + processEnv["HOME"] = self.baseDir + self.iiProcess = subprocess.Popen( + iiCommand, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=processEnv, + ) + self.serverDir = self.wait_for_server_dir() + self.serverInPath = os.path.join(self.serverDir, "in") + + def stop(self): + if self.channelInPath and os.path.exists(self.channelInPath): + try: + self.write_line(self.channelInPath, f"/part {self.channel}") + except OSError: + pass + if self.serverInPath and os.path.exists(self.serverInPath): + try: + self.write_line(self.serverInPath, "/quit") + except OSError: + pass + if self.iiProcess and self.iiProcess.poll() is None: + self.iiProcess.terminate() + try: + self.iiProcess.wait(timeout=5) + except subprocess.TimeoutExpired: + self.iiProcess.kill() + + def join_channel(self): + joinMessage = f"/join {self.channel}" + channelDir = os.path.join(self.serverDir, self.channel) + channelAltDir = os.path.join(self.serverDir, self.channel.lstrip("#")) + startTime = time.monotonic() + nextJoinTime = startTime + while time.monotonic() - startTime < 60: + if time.monotonic() >= nextJoinTime: + self.write_line(self.serverInPath, joinMessage) + nextJoinTime = time.monotonic() + 5 + for candidate in (channelDir, channelAltDir): + inPath = os.path.join(candidate, "in") + if os.path.exists(inPath): + self.channelInPath = inPath + return + time.sleep(0.5) + self.channelInPath = None + + def send_channel_message(self, message): + if not self.channelInPath: + self.refresh_channel_in_path() + if self.channelInPath and os.path.exists(self.channelInPath): + self.write_line(self.channelInPath, message) + else: + self.write_line(self.serverInPath, f"/msg {self.channel} {message}") + + def send_private_message(self, nick, message): + nickDir = os.path.join(self.serverDir, nick) + inPath = os.path.join(nickDir, "in") + if os.path.exists(inPath): + self.write_line(inPath, message) + else: + self.write_line(self.serverInPath, f"/msg {nick} {message}") + + def get_private_messages(self, allowedUsers): + messages = [] + for nick in allowedUsers: + nickDir = os.path.join(self.serverDir, nick) + outPath = os.path.join(nickDir, "out") + if not os.path.exists(outPath): + continue + lastPos = self.pmOffsets.get(outPath, 0) + with open(outPath, "r", encoding="utf-8", errors="ignore") as fileHandle: + fileHandle.seek(lastPos) + for line in fileHandle: + sender = parse_sender(line) + if sender and sender == self.nick: + continue + if sender and sender != nick: + continue + messageText = extract_message_text(line) + if messageText: + messages.append((nick, messageText)) + self.pmOffsets[outPath] = fileHandle.tell() + return messages + + def refresh_channel_in_path(self): + channelDir = os.path.join(self.serverDir, self.channel) + channelAltDir = os.path.join(self.serverDir, self.channel.lstrip("#")) + for candidate in (channelDir, channelAltDir): + inPath = os.path.join(candidate, "in") + if os.path.exists(inPath): + self.channelInPath = inPath + return + + def wait_for_server_dir(self): + for _ in range(120): + for rootDir in [self.baseDir] + list_subdirs(self.baseDir): + if not os.path.isdir(rootDir): + continue + for entry in os.listdir(rootDir): + path = os.path.join(rootDir, entry) + if os.path.isdir(path) and self.server in entry: + inPath = os.path.join(path, "in") + if os.path.exists(inPath): + return path + time.sleep(0.5) + raise RuntimeError("ii server directory not found") + + @staticmethod + def write_line(path, message): + with open(path, "w", encoding="utf-8", errors="ignore") as fileHandle: + fileHandle.write(message + "\n") + fileHandle.flush() + + +def ii_supports_i(): + result = run_command(["ii", "-h"]) + output = (result.stdout or "") + (result.stderr or "") + return "-i" in output + + +def list_subdirs(path): + try: + return [os.path.join(path, entry) for entry in os.listdir(path)] + except OSError: + return [] + + +def shutil_which(command): + for path in os.environ.get("PATH", "").split(os.pathsep): + candidate = os.path.join(path, command) + if os.path.isfile(candidate) and os.access(candidate, os.X_OK): + return candidate + return None + + +def build_nick(): + baseUser = os.environ.get("SUDO_USER") or os.environ.get("USER") or "sas" + return f"{baseUser}-{int(time.time())}" + + +def main(): + say_or_print("Checking accessibility. Is your screen reader working? (y/n)", True) + answer = input().strip().lower() + useSpeech = answer in ("n", "no") + + shouldRemoveUser = False + cleanupDone = False + tempDir = tempfile.mkdtemp(prefix="sas-ii-") + ircSession = None + sshProcess = None + + def cleanup(exitMessage=None): + nonlocal cleanupDone + if cleanupDone: + return + cleanupDone = True + nonlocal sshProcess + if exitMessage: + say_or_print(exitMessage, useSpeech) + + if sshProcess and sshProcess.poll() is None: + sshProcess.terminate() + try: + sshProcess.wait(timeout=5) + except subprocess.TimeoutExpired: + sshProcess.kill() + + if ircSession: + ircSession.stop() + + if shouldRemoveUser: + try: + run_privileged(["pkill", "-u", sasUser], useSpeech, check=False) + time.sleep(1) + result = run_privileged(["userdel", "-r", sasUser], useSpeech, check=False) + run_privileged(["rm", "-rf", f"/home/{sasUser}"], useSpeech, check=False) + if result.returncode != 0 and user_exists(sasUser): + say_or_print( + "Cleanup warning: failed to remove sas user. Please remove it manually.", + useSpeech, + ) + except Exception: + pass + + sudoKeepaliveStop.set() + if sudoKeepaliveThread: + sudoKeepaliveThread.join(timeout=2) + + try: + remove_tree(tempDir) + except Exception: + pass + + def handle_signal(signum, frame): + cleanup("Interrupted. Cleaning up.") + sys.exit(1) + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + try: + if not user_exists(sasUser): + run_privileged( + ["useradd", "-m", "-d", f"/home/{sasUser}", "-s", "/bin/bash", "-G", "wheel", sasUser], + useSpeech, + ) + shouldRemoveUser = True + else: + say_or_print( + "User 'sas' exists. Remove and recreate it? This will delete /home/sas. (y/n)", + useSpeech, + ) + response = input().strip().lower() + if response not in ("y", "yes"): + cleanup("The sas user is unavailable. Remove it manually and try again.") + return 1 + run_privileged(["pkill", "-u", sasUser], useSpeech, check=False) + run_privileged(["userdel", "-r", sasUser], useSpeech, check=False) + run_privileged(["rm", "-rf", f"/home/{sasUser}"], useSpeech, check=False) + run_privileged( + ["useradd", "-m", "-d", f"/home/{sasUser}", "-s", "/bin/bash", "-G", "wheel", sasUser], + useSpeech, + ) + shouldRemoveUser = True + + ensure_wheel(sasUser, useSpeech) + + password = generate_password() + set_password(sasUser, password, useSpeech) + + privateKeyPath, publicKeyPath = generate_ssh_key(sasUser, useSpeech) + sasHome = get_user_home(sasUser) + knownHostsPath = os.path.join(sasHome, ".ssh", "known_hosts_sas") + run_privileged(["touch", knownHostsPath], useSpeech) + run_privileged(["chmod", "600", knownHostsPath], useSpeech) + run_privileged(["chown", f"{sasUser}:{sasUser}", knownHostsPath], useSpeech) + + nick = build_nick() + ircSession = IrcSession(ircServer, ircPort, nick, ircChannel, tempDir) + ircSession.start() + ircSession.join_channel() + + say_or_print("Waiting for assistance on IRC.", useSpeech) + startTime = time.monotonic() + nextPingTime = startTime + pingsSent = 0 + confirmedAdmin = None + + while time.monotonic() - startTime < pingIntervalSeconds * pingCount: + now = time.monotonic() + if pingsSent < pingCount and now >= nextPingTime: + ircSession.send_channel_message(f"{nick} is requesting assistance.") + pingsSent += 1 + nextPingTime = startTime + (pingsSent * pingIntervalSeconds) + + for adminNick, messageText in ircSession.get_private_messages(stormuxAdmin): + if messageText.strip().lower() in ("yes", "accept"): + confirmedAdmin = adminNick + break + if confirmedAdmin: + break + time.sleep(1) + + if not confirmedAdmin: + cleanup("No one was available to help, please try again later.") + return 1 + + ircSession.send_private_message( + confirmedAdmin, + f'password: "{password}" please send wormhole ssh invite code', + ) + + failures = 0 + while failures < maxWormholeFailures: + inviteCode = None + while inviteCode is None: + for adminNick, messageText in ircSession.get_private_messages(stormuxAdmin): + inviteCode = find_wormhole_code(messageText) + if inviteCode: + break + if inviteCode: + break + time.sleep(1) + + if not path_exists_for_user(publicKeyPath, sasUser, useSpeech): + raise RuntimeError(f"Public key missing: {publicKeyPath}") + + wormholeCommand = [ + "wormhole", + "ssh", + "accept", + "--yes", + inviteCode, + ] + result = run_as_user(sasUser, wormholeCommand, useSpeech, check=False) + if result.returncode == 0: + say_or_print("Wormhole key transfer succeeded.", useSpeech) + break + + failures += 1 + errorTextFull = (result.stderr or result.stdout or "").strip() + if errorTextFull and not useSpeech: + print("Wormhole ssh accept error:", flush=True) + print(errorTextFull, flush=True) + errorText = errorTextFull + if errorText: + errorText = " ".join(errorText.split()) + if len(errorText) > 400: + errorText = errorText[:400] + "..." + ircSession.send_private_message( + confirmedAdmin, + f"Wormhole ssh accept failed: {errorText}", + ) + ircSession.send_private_message( + confirmedAdmin, + "Wormhole ssh accept failed. Please send a new invite code.", + ) + + if failures >= maxWormholeFailures: + cleanup("Wormhole failed too many times. Exiting.") + return 1 + + say_or_print("Starting reverse SSH tunnel. Press Ctrl+C to stop.", useSpeech) + sshCommand = [ + "ssh", + "-N", + "-R", + "localhost:2232:localhost:22", + "-o", + "ExitOnForwardFailure=yes", + "-o", + "BatchMode=yes", + "-o", + "ServerAliveInterval=30", + "-o", + "ServerAliveCountMax=3", + "-o", + "StrictHostKeyChecking=accept-new", + "-o", + f"UserKnownHostsFile={knownHostsPath}", + "-i", + privateKeyPath, + f"{sasUser}@{remoteHost}", + ] + sshCommand = ["sudo", "-u", sasUser, "-H"] + sshCommand + sshProcess = subprocess.Popen(sshCommand) + sshProcess.wait() + + except Exception as exc: + cleanup(f"Error: {exc}") + return 1 + finally: + cleanup() + + return 0 + + +def remove_tree(path): + if not os.path.exists(path): + return + for rootDir, dirNames, fileNames in os.walk(path, topdown=False): + for fileName in fileNames: + try: + os.unlink(os.path.join(rootDir, fileName)) + except OSError: + pass + for dirName in dirNames: + try: + os.rmdir(os.path.join(rootDir, dirName)) + except OSError: + pass + try: + os.rmdir(path) + except OSError: + pass + + +if __name__ == "__main__": + sys.exit(main())