From b6b08503f4d2141e68c0bde82e3295e3c19f31f4 Mon Sep 17 00:00:00 2001 From: Storm Dragon Date: Sun, 30 Mar 2025 13:24:36 -0400 Subject: [PATCH] Some minor code cleanup. --- Greet/plugin.py | 70 ++++++++++++------------- Greet/test.py | 6 +-- Weather/plugin.py | 128 +++++++++++++++++++++++----------------------- Weather/test.py | 10 ++-- 4 files changed, 107 insertions(+), 107 deletions(-) diff --git a/Greet/plugin.py b/Greet/plugin.py index 632b136..274b6e8 100644 --- a/Greet/plugin.py +++ b/Greet/plugin.py @@ -21,17 +21,17 @@ class Greet(callbacks.Plugin): A plugin to greet users when they join a channel. Features flood protection and timeout between greetings. """ - + def __init__(self, irc): self.__parent = super(Greet, self) self.__parent.__init__(irc) - + # Dictionary to store last greeting times per user self.lastGreetTime = {} - + # Default cooldown between greetings for the same user (in seconds) self.userCooldown = 300 # 5 minutes - + # Debug mode self.debugMode = False @@ -41,30 +41,30 @@ class Greet(callbacks.Plugin): """ channel = msg.args[0] nick = msg.nick - + # Log event for debugging if self.debugMode: self._logDebug(irc, f"JOIN event detected: {nick} joined {channel}") - + # Don't greet ourselves if nick == irc.nick: if self.debugMode: self._logDebug(irc, f"Not greeting self (bot)") return - + # Check if greetings are enabled for this channel if not self.registryValue('enabled', channel): if self.debugMode: self._logDebug(irc, f"Greetings disabled for {channel}") return - + # Get the greeting message for this channel greeting = self.registryValue('message', channel) if not greeting: if self.debugMode: self._logDebug(irc, f"No greeting message set for {channel}") return - + # Check if this user was greeted recently userKey = f"{nick.lower()}@{channel}" now = time.time() @@ -74,16 +74,16 @@ class Greet(callbacks.Plugin): if self.debugMode: self._logDebug(irc, f"User {nick} was greeted too recently") return - + # Get the timeout timeout = self.registryValue('timeout', channel) or 3 - + if self.debugMode: self._logDebug(irc, f"Will greet in {channel} after {timeout}s: {greeting}") - + # Update last greet time for this user self.lastGreetTime[userKey] = now - + # Use a closure to capture the current values def sendGreeting(channelToGreet=channel, greetingMsg=greeting): if self.debugMode: @@ -95,10 +95,10 @@ class Greet(callbacks.Plugin): except Exception as e: if self.debugMode: self._logDebug(irc, f"Error sending greeting: {str(e)}") - + # Schedule the greeting schedule.addEvent(sendGreeting, time.time() + timeout) - + def _logDebug(self, irc, message): """Send debug messages to the first available channel""" try: @@ -106,7 +106,7 @@ class Greet(callbacks.Plugin): for owner in self.registryValue('owners') or []: irc.queueMsg(ircmsgs.privmsg(owner, f"[GREET-DEBUG] {message}")) return - + # Fallback: send to the first available channel for channel in list(irc.state.channels): try: @@ -121,7 +121,7 @@ class Greet(callbacks.Plugin): @wrap(['channel']) def greettest(self, irc, msg, args, channel): """[] - + Test the greeting by sending it immediately. """ greeting = self.registryValue('message', channel) @@ -134,29 +134,29 @@ class Greet(callbacks.Plugin): @wrap(['owner', 'channel']) def greettrigger(self, irc, msg, args, channel): """[] - + Simulates a join event with timeout (owner only). """ greeting = self.registryValue('message', channel) if not greeting: irc.error(f"No greeting message is set for {channel}") return - + # Get the timeout timeout = self.registryValue('timeout', channel) or 3 - + irc.replySuccess(f"Simulating join event with {timeout}s delay") - + # Schedule the greeting def sendGreeting(): irc.queueMsg(ircmsgs.privmsg(channel, greeting)) - + schedule.addEvent(sendGreeting, time.time() + timeout) @wrap(['owner', 'boolean']) def greetdebug(self, irc, msg, args, enable): """ - + Enables or disables debug mode (owner only). """ self.debugMode = enable @@ -164,11 +164,11 @@ class Greet(callbacks.Plugin): irc.replySuccess("Debug mode enabled") else: irc.replySuccess("Debug mode disabled") - + @wrap(['channel']) def greeton(self, irc, msg, args, channel): """[] - + Enables greetings for a channel. """ self.setRegistryValue('enabled', True, channel) @@ -181,7 +181,7 @@ class Greet(callbacks.Plugin): @wrap(['channel']) def greetoff(self, irc, msg, args, channel): """[] - + Disables greetings for a channel. """ self.setRegistryValue('enabled', False, channel) @@ -190,34 +190,34 @@ class Greet(callbacks.Plugin): @wrap(['channel']) def greetclear(self, irc, msg, args, channel): """[] - + Removes the greeting message for a channel. """ self.setRegistryValue('message', '', channel) self.setRegistryValue('enabled', False, channel) irc.replySuccess(f"Greeting message cleared and disabled for {channel}.") - + @wrap(['channel']) def greetstatus(self, irc, msg, args, channel): """[] - + Shows the current greeting status and message for a channel. """ enabled = self.registryValue('enabled', channel) message = self.registryValue('message', channel) timeout = self.registryValue('timeout', channel) or 3 - + if enabled and message: irc.reply(f"Greetings are enabled for {channel}. Timeout: {timeout} seconds. Message: \"{message}\"") elif enabled and not message: irc.reply(f"Greetings are enabled for {channel}, but no message is set.") else: irc.reply(f"Greetings are disabled for {channel}. Message: \"{message or 'None'}\"") - + @wrap(['channel', 'positiveInt']) def greettimeout(self, irc, msg, args, channel, seconds): """[] - + Sets the timeout in seconds between joins before sending a greeting. Default is 3 seconds. """ @@ -227,11 +227,11 @@ class Greet(callbacks.Plugin): @wrap(['channel', 'text']) def greet(self, irc, msg, args, channel, text): """[] - + Sets the greeting message for a channel. The greeting will be sent when users join the channel, with flood protection. - + To manage greetings, use the other commands: greeton - Enables greetings for the channel greetoff - Disables greetings for the channel @@ -241,7 +241,7 @@ protection. greettrigger - Simulates a join event with timeout (owner only) greetstatus - Shows current status and message greettimeout - Sets delay before greeting - + Examples: greet #channel Welcome to our channel! """ diff --git a/Greet/test.py b/Greet/test.py index aef13f4..f50de4e 100644 --- a/Greet/test.py +++ b/Greet/test.py @@ -8,7 +8,7 @@ from supybot.test import * class GreetTestCase(PluginTestCase): plugins = ('Greet',) - + def testGreet(self): # Set a greeting message self.assertNotError('greet #test Hello, world!') @@ -26,13 +26,13 @@ class GreetTestCase(PluginTestCase): self.assertNotError('greetclear #test') # Check if it's cleared and disabled self.assertRegexp('greetstatus #test', 'disabled') - + def testTimeout(self): # Set a timeout self.assertNotError('greettimeout #test 5') # Check if it's set self.assertRegexp('greetstatus #test', '5 seconds') - + def testTestAndTrigger(self): # Set a greeting message self.assertNotError('greet #test Hello, world!') diff --git a/Weather/plugin.py b/Weather/plugin.py index 2ac5a98..1046c76 100644 --- a/Weather/plugin.py +++ b/Weather/plugin.py @@ -32,7 +32,7 @@ except ImportError: class WeatherDB: """Class to handle user location storage and retrieval""" - + pluginVersion = "0.1" def __init__(self, filename): self.filename = filename @@ -40,7 +40,7 @@ class WeatherDB: # Cache for geocoded locations self.location_cache = {} self.load() - + def load(self): """Load the database from file""" try: @@ -64,33 +64,33 @@ class WeatherDB: except IOError: # File doesn't exist or can't be read pass - + def save(self): """Save the database to file""" with open(self.filename, 'w') as f: # Save user locations for user, data in self.db.items(): f.write(f"{user}::{data['location']}::{data['lat']}::{data['lon']}\n") - + # Save cached locations for key, data in self.location_cache.items(): f.write(f"CACHE::{key}::{data['location']}||{data['lat']}||{data['lon']}\n") - + def set(self, user, location, lat, lon): """Set a user's location""" self.db[user] = {'location': location, 'lat': lat, 'lon': lon} self.save() - + def get(self, user): """Get a user's location""" return self.db.get(user, None) - + def remove(self, user): """Remove a user's location""" if user in self.db: del self.db[user] self.save() - + def add_to_cache(self, query, location, lat, lon): """Add a location to the cache""" query_key = query.lower() @@ -100,7 +100,7 @@ class WeatherDB: 'lon': lon } self.save() - + def get_from_cache(self, query): """Get a location from the cache""" query_key = query.lower() @@ -114,7 +114,7 @@ class Configure(registry.Group): def configure(self, advanced): from supybot.questions import expect, anything, something, yn conf.registerPlugin('Weather', True) - + if advanced: # User agent for Nominatim user_agent = something("used for Nominatim?", @@ -139,11 +139,11 @@ conf.registerChannelValue(Config, 'forecastDays', class Weather(callbacks.Plugin): """Get weather forecasts using the Open-Meteo API""" - + def __init__(self, irc): self.__parent = super(Weather, self) self.__parent.__init__(irc) - + # Initialize weather codes (mapping from Open-Meteo codes to conditions) self.weather_codes = { 0: "Clear sky", @@ -175,41 +175,41 @@ class Weather(callbacks.Plugin): 96: "Thunderstorm with slight hail", 99: "Thunderstorm with heavy hail" } - + # Initialize database self.db = WeatherDB(conf.supybot.directories.data.dirize('Weather.db')) - + # Initialize geocoder user_agent = self.registryValue('userAgent') self.geocoder = Nominatim(user_agent=user_agent) - + # Set up logging self.log = log.getPluginLogger('Weather') - + def _celsius_to_fahrenheit(self, celsius): """Convert Celsius to Fahrenheit""" if celsius is None: return None return (celsius * 9/5) + 32 - + def _format_temp(self, temp, unit='F'): """Format temperature with unit""" if temp is None: return "N/A" return f"{temp:.1f}°{unit}" - + def _kmh_to_mph(self, kmh): """Convert km/h to mph""" if kmh is None: return None return kmh * 0.621371 - + def _format_wind(self, wind_speed, unit='mph'): """Format wind speed with unit""" if wind_speed is None: return "N/A" return f"{wind_speed:.1f} {unit}" - + def _geocode(self, location): """Geocode a location string to get latitude and longitude""" cached_location = self.db.get_from_cache(location) @@ -220,7 +220,7 @@ class Weather(callbacks.Plugin): 'lon': cached_location['lon'], 'address': cached_location['location'] } - + # If not in cache, query the geocoding service try: self.log.info(f"Geocoding new location: {location}") @@ -233,7 +233,7 @@ class Weather(callbacks.Plugin): location_data.latitude, location_data.longitude ) - + return { 'lat': location_data.latitude, 'lon': location_data.longitude, @@ -243,7 +243,7 @@ class Weather(callbacks.Plugin): except (GeocoderTimedOut, GeocoderServiceError) as e: self.log.error(f"Geocoding error: {e}") return None - + def _get_weather(self, lat, lon, unit='F', forecast_days=3): """Get weather data from Open-Meteo API""" url = f"https://api.open-meteo.com/v1/forecast" @@ -255,28 +255,28 @@ class Weather(callbacks.Plugin): 'timezone': 'auto', 'forecast_days': forecast_days } - + try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() - + # Process current weather current = data.get('current', {}) temp_c = current.get('temperature_2m') temp_f = self._celsius_to_fahrenheit(temp_c) if temp_c is not None else None temp = temp_f if unit == 'F' else temp_c - + humidity = current.get('relative_humidity_2m') - + wind_kmh = current.get('wind_speed_10m') wind_mph = self._kmh_to_mph(wind_kmh) if wind_kmh is not None else None wind = wind_mph if unit == 'F' else wind_kmh wind_unit = 'mph' if unit == 'F' else 'km/h' - + weather_code = current.get('weather_code', 0) conditions = self.weather_codes.get(weather_code, "Unknown") - + # Process forecast forecast = [] daily = data.get('daily', {}) @@ -284,26 +284,26 @@ class Weather(callbacks.Plugin): min_temps_c = daily.get('temperature_2m_min', []) max_temps_c = daily.get('temperature_2m_max', []) codes = daily.get('weather_code', []) - + for i in range(min(len(dates), forecast_days)): date_str = dates[i] min_temp_c = min_temps_c[i] if i < len(min_temps_c) else None max_temp_c = max_temps_c[i] if i < len(max_temps_c) else None code = codes[i] if i < len(codes) else 0 - + min_temp_f = self._celsius_to_fahrenheit(min_temp_c) if min_temp_c is not None else None max_temp_f = self._celsius_to_fahrenheit(max_temp_c) if max_temp_c is not None else None - + min_temp = min_temp_f if unit == 'F' else min_temp_c max_temp = max_temp_f if unit == 'F' else max_temp_c - + # Format date to day of week try: date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d') day = date_obj.strftime('%A') except ValueError: day = date_str - + forecast.append({ 'day': day, 'date': date_str, @@ -311,7 +311,7 @@ class Weather(callbacks.Plugin): 'max_temp': max_temp, 'conditions': self.weather_codes.get(code, "Unknown") }) - + return { 'current': { 'temp': temp, @@ -323,14 +323,14 @@ class Weather(callbacks.Plugin): 'forecast': forecast, 'unit': unit } - + except requests.RequestException as e: self.log.error(f"Weather API error: {e}") return None - + def setweather(self, irc, msg, args, location): """ - + Set your location for weather lookups. Location can be a city name, ZIP code, or any location string. """ # Improved identification check @@ -339,25 +339,25 @@ class Weather(callbacks.Plugin): # User is identified if we got this far except KeyError: irc.error("You must be identified to use this command. Register with NickServ and identify yourself.", Raise=True) - + # Get the user's name username = msg.prefix.split('!')[0] - + # Geocode the location location_data = self._geocode(location) if not location_data: irc.error("Could not find that location. Please try a different search term.", Raise=True) - + # Store the user's location self.db.set(username, location_data['address'], location_data['lat'], location_data['lon']) - + irc.reply(f"Your location has been set to: {location_data['address']}") - + setweather = wrap(setweather, ['text']) - + def delweather(self, irc, msg, args): """takes no arguments - + Delete your saved location. """ # Improved identification check @@ -366,20 +366,20 @@ class Weather(callbacks.Plugin): # User is identified if we got this far except KeyError: irc.error("You must be identified to use this command. Register with NickServ and identify yourself.", Raise=True) - + # Get the user's name username = msg.prefix.split('!')[0] - + # Remove the user's location self.db.remove(username) - + irc.reply("Your location has been deleted.") - + delweather = wrap(delweather) - + def weather(self, irc, msg, args, location): """[] - + Show weather for your saved location, or for the specified location. """ channel = msg.args[0] if irc.isChannel(msg.args[0]) else None @@ -388,17 +388,17 @@ class Weather(callbacks.Plugin): show_wind = self.registryValue('showWind', channel) show_forecast = self.registryValue('showForecast', channel) forecast_days = self.registryValue('forecastDays', channel) - + # Get the user's name user = msg.prefix.split('!')[0] - + # Determine if showing weather for a location or the user's saved location if location: # Looking up weather for the specified location location_data = self._geocode(location) if not location_data: irc.error("Could not find that location. Please try a different search term.", Raise=True) - + lat = location_data['lat'] lon = location_data['lon'] location_name = location_data['address'] @@ -408,33 +408,33 @@ class Weather(callbacks.Plugin): user_data = self.db.get(user) if not user_data: irc.error("You have not set a location. Use '.weather ' to check the weather for a specific location, or '.setweather ' to save your location if you're registered.", Raise=True) - + lat = user_data['lat'] lon = user_data['lon'] location_name = user_data['location'] weather_for = f"your location ({location_name})" - + # Get the weather data weather_data = self._get_weather(lat, lon, unit, forecast_days) if not weather_data: irc.error("Could not retrieve weather data. Please try again later.", Raise=True) - + # Format the current weather current = weather_data['current'] temp = self._format_temp(current['temp'], unit) conditions = current['conditions'] - + reply = f"Weather for {weather_for}: {temp}, {conditions}" - + if show_humidity and current['humidity'] is not None: reply += f", Humidity: {current['humidity']}%" - + if show_wind and current['wind'] is not None: reply += f", Wind: {self._format_wind(current['wind'], current['wind_unit'])}" - + # Send the current weather irc.reply(reply) - + # Format and send the forecast if requested if show_forecast and weather_data['forecast']: forecast_parts = [] @@ -442,10 +442,10 @@ class Weather(callbacks.Plugin): min_temp = self._format_temp(day['min_temp'], unit) max_temp = self._format_temp(day['max_temp'], unit) forecast_parts.append(f"{day['day']}: {min_temp} to {max_temp}, {day['conditions']}") - + forecast_msg = f"Forecast: {' | '.join(forecast_parts)}" irc.reply(forecast_msg) - + weather = wrap(weather, [optional('text')]) diff --git a/Weather/test.py b/Weather/test.py index eeb7ce0..1ddf3ca 100644 --- a/Weather/test.py +++ b/Weather/test.py @@ -8,21 +8,21 @@ from supybot.test import * class WeatherTestCase(PluginTestCase): plugins = ('Weather',) - + def testWeatherCommands(self): # Since we can't actually test the API calls in a unit test, # we'll just test that the commands exist and respond - + # Test setweather command self.assertNotError('setweather Bluefield, WV') - + # Test weather command with no arguments # This should use the previously set location self.assertNotError('weather') - + # Test weather command with a location self.assertNotError('weather Salt Lake City, Utah') - + # Test delweather command self.assertNotError('delweather')