Speed updatabase generation. Generate and use a certificate by default.
This commit is contained in:
117
util.py
117
util.py
@@ -27,6 +27,108 @@ YT_PKG_NAME = 'yt-dlp'
|
||||
|
||||
log = logging.getLogger("bot")
|
||||
|
||||
# Default certificate filename for auto-generation
|
||||
DEFAULT_CERT_NAME = "bragi.pem"
|
||||
|
||||
|
||||
def generate_certificate(cert_path):
|
||||
"""Generate a self-signed certificate for Mumble authentication.
|
||||
|
||||
Args:
|
||||
cert_path: Path where the certificate file will be saved
|
||||
|
||||
Returns:
|
||||
True if certificate was generated successfully, False otherwise
|
||||
"""
|
||||
try:
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
import datetime
|
||||
|
||||
log.info(f"certificate: generating new self-signed certificate at {cert_path}")
|
||||
|
||||
# Generate RSA private key (2048 bits is standard for this use)
|
||||
privateKey = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
)
|
||||
|
||||
# Create certificate subject/issuer
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, "Bragi Music Bot"),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Bragi"),
|
||||
])
|
||||
|
||||
# Build and sign certificate (valid for 10 years)
|
||||
cert = (
|
||||
x509.CertificateBuilder()
|
||||
.subject_name(subject)
|
||||
.issuer_name(issuer)
|
||||
.public_key(privateKey.public_key())
|
||||
.serial_number(x509.random_serial_number())
|
||||
.not_valid_before(datetime.datetime.now(datetime.timezone.utc))
|
||||
.not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=3650))
|
||||
.sign(privateKey, hashes.SHA256())
|
||||
)
|
||||
|
||||
# Write both private key and certificate to same PEM file
|
||||
# (this is the format Mumble expects)
|
||||
with open(cert_path, "wb") as f:
|
||||
f.write(privateKey.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
log.info("certificate: successfully generated new certificate")
|
||||
return True
|
||||
|
||||
except ImportError:
|
||||
log.warning("certificate: cryptography library not installed, cannot generate certificate")
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error(f"certificate: failed to generate certificate: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_or_create_certificate(config_cert_path):
|
||||
"""Get existing certificate or create a new one if needed.
|
||||
|
||||
Args:
|
||||
config_cert_path: Certificate path from config (may be empty)
|
||||
|
||||
Returns:
|
||||
Path to certificate file, or empty string if none available
|
||||
"""
|
||||
# If user specified a certificate in config, use that
|
||||
if config_cert_path:
|
||||
resolved = solve_filepath(config_cert_path)
|
||||
if resolved and os.path.exists(resolved):
|
||||
log.debug(f"certificate: using configured certificate: {resolved}")
|
||||
return resolved
|
||||
elif config_cert_path:
|
||||
log.warning(f"certificate: configured certificate not found: {config_cert_path}")
|
||||
# Fall through to auto-generation
|
||||
|
||||
# Check for existing auto-generated certificate
|
||||
scriptDir = os.path.dirname(os.path.realpath(__file__))
|
||||
defaultCertPath = os.path.join(scriptDir, DEFAULT_CERT_NAME)
|
||||
|
||||
if os.path.exists(defaultCertPath):
|
||||
log.debug(f"certificate: using existing auto-generated certificate: {defaultCertPath}")
|
||||
return defaultCertPath
|
||||
|
||||
# Generate new certificate
|
||||
if generate_certificate(defaultCertPath):
|
||||
return defaultCertPath
|
||||
|
||||
# No certificate available
|
||||
log.warning("certificate: no certificate available, connecting without one")
|
||||
return ""
|
||||
|
||||
|
||||
def solve_filepath(path):
|
||||
if not path:
|
||||
@@ -42,6 +144,12 @@ def solve_filepath(path):
|
||||
|
||||
|
||||
def get_recursive_file_list_sorted(path):
|
||||
# Audio file extensions to include (fast check before expensive magic call)
|
||||
AUDIO_EXTENSIONS = {
|
||||
'.mp3', '.flac', '.ogg', '.opus', '.m4a', '.m4b', '.mp4', '.m4p',
|
||||
'.wav', '.aac', '.wma', '.aiff', '.aif', '.ape', '.mka', '.webm'
|
||||
}
|
||||
|
||||
filelist = []
|
||||
for root, dirs, files in os.walk(path, topdown=True, onerror=None, followlinks=True):
|
||||
relroot = root.replace(path, '', 1)
|
||||
@@ -55,9 +163,16 @@ def get_recursive_file_list_sorted(path):
|
||||
if not os.access(fullpath, os.R_OK):
|
||||
continue
|
||||
|
||||
# Fast path: check extension first (covers 99% of cases)
|
||||
ext = os.path.splitext(file)[1].lower()
|
||||
if ext in AUDIO_EXTENSIONS:
|
||||
filelist.append(os.path.join(relroot, file))
|
||||
continue
|
||||
|
||||
# Slow path: use magic for files without recognized extensions
|
||||
try:
|
||||
mime = magic.from_file(fullpath, mime=True)
|
||||
if 'audio' in mime or 'audio' in magic.from_file(fullpath).lower() or 'video' in mime:
|
||||
if 'audio' in mime or 'video' in mime:
|
||||
filelist.append(os.path.join(relroot, file))
|
||||
except:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user