2025-03-30 13:24:36 -04:00

456 lines
16 KiB
Python

###
# Copyright (c) 2025, Stormux
#
# This plugin is licensed under the same terms as Limnoria itself.
###
import supybot.utils as utils
from supybot.commands import *
import supybot.plugins as plugins
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
import supybot.ircmsgs as ircmsgs
import supybot.conf as conf
import supybot.registry as registry
import supybot.ircdb as ircdb
import supybot.log as log
import requests
import datetime
import geopy.geocoders
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
# For sending the version with geo api request
from . import __version__
try:
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('Weather')
except ImportError:
# Placeholder that allows to run the plugin on a bot without the i18n module
_ = lambda x: x
class WeatherDB:
"""Class to handle user location storage and retrieval"""
pluginVersion = "0.1"
def __init__(self, filename):
self.filename = filename
self.db = {}
# Cache for geocoded locations
self.location_cache = {}
self.load()
def load(self):
"""Load the database from file"""
try:
with open(self.filename, 'r') as f:
for line in f:
line = line.strip()
if line:
if line.startswith('CACHE::'):
# This is a cached location
_, location_key, location_data = line.split('::', 2)
location, lat, lon = location_data.split('||')
self.location_cache[location_key.lower()] = {
'location': location,
'lat': lat,
'lon': lon
}
else:
# User's saved location
(user, location, lat, lon) = line.split('::')
self.db[user] = {'location': location, 'lat': lat, 'lon': lon}
except IOError:
# File doesn't exist or can't be read
pass
def save(self):
"""Save the database to file"""
with open(self.filename, 'w') as f:
# Save user locations
for user, data in self.db.items():
f.write(f"{user}::{data['location']}::{data['lat']}::{data['lon']}\n")
# Save cached locations
for key, data in self.location_cache.items():
f.write(f"CACHE::{key}::{data['location']}||{data['lat']}||{data['lon']}\n")
def set(self, user, location, lat, lon):
"""Set a user's location"""
self.db[user] = {'location': location, 'lat': lat, 'lon': lon}
self.save()
def get(self, user):
"""Get a user's location"""
return self.db.get(user, None)
def remove(self, user):
"""Remove a user's location"""
if user in self.db:
del self.db[user]
self.save()
def add_to_cache(self, query, location, lat, lon):
"""Add a location to the cache"""
query_key = query.lower()
self.location_cache[query_key] = {
'location': location,
'lat': lat,
'lon': lon
}
self.save()
def get_from_cache(self, query):
"""Get a location from the cache"""
query_key = query.lower()
return self.location_cache.get(query_key, None)
class WeatherConfig(registry.OnlySomeStrings):
validStrings = ('C', 'F')
class Configure(registry.Group):
def configure(self, advanced):
from supybot.questions import expect, anything, something, yn
conf.registerPlugin('Weather', True)
if advanced:
# User agent for Nominatim
user_agent = something("used for Nominatim?",
default=f"stormux-limnoria-weather-plugin/{__version__}")
self.userAgent.setValue(user_agent)
Config = conf.registerPlugin('Weather')
conf.registerGlobalValue(Config, 'userAgent',
registry.String(f'stormux-limnoria-weather-plugin/{__version__}', _("""User agent for Nominatim""")))
conf.registerChannelValue(Config, 'defaultUnit',
WeatherConfig('F', _("""Default temperature unit (C or F)""")))
conf.registerChannelValue(Config, 'showHumidity',
registry.Boolean(True, _("""Show humidity in weather reports""")))
conf.registerChannelValue(Config, 'showWind',
registry.Boolean(True, _("""Show wind speed in weather reports""")))
conf.registerChannelValue(Config, 'showForecast',
registry.Boolean(True, _("""Show forecast in weather reports""")))
conf.registerChannelValue(Config, 'forecastDays',
registry.PositiveInteger(3, _("""Number of forecast days to display""")))
class Weather(callbacks.Plugin):
"""Get weather forecasts using the Open-Meteo API"""
def __init__(self, irc):
self.__parent = super(Weather, self)
self.__parent.__init__(irc)
# Initialize weather codes (mapping from Open-Meteo codes to conditions)
self.weather_codes = {
0: "Clear sky",
1: "Mainly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Fog",
48: "Rime fog",
51: "Light drizzle",
53: "Moderate drizzle",
55: "Dense drizzle",
56: "Light freezing drizzle",
57: "Dense freezing drizzle",
61: "Slight rain",
63: "Moderate rain",
65: "Heavy rain",
66: "Light freezing rain",
67: "Heavy freezing rain",
71: "Slight snow fall",
73: "Moderate snow fall",
75: "Heavy snow fall",
77: "Snow flurries",
80: "Slight rain showers",
81: "Moderate rain showers",
82: "Heavy rain showers",
85: "Slight snow showers",
86: "Heavy snow showers",
95: "Thunderstorm",
96: "Thunderstorm with slight hail",
99: "Thunderstorm with heavy hail"
}
# Initialize database
self.db = WeatherDB(conf.supybot.directories.data.dirize('Weather.db'))
# Initialize geocoder
user_agent = self.registryValue('userAgent')
self.geocoder = Nominatim(user_agent=user_agent)
# Set up logging
self.log = log.getPluginLogger('Weather')
def _celsius_to_fahrenheit(self, celsius):
"""Convert Celsius to Fahrenheit"""
if celsius is None:
return None
return (celsius * 9/5) + 32
def _format_temp(self, temp, unit='F'):
"""Format temperature with unit"""
if temp is None:
return "N/A"
return f"{temp:.1f}°{unit}"
def _kmh_to_mph(self, kmh):
"""Convert km/h to mph"""
if kmh is None:
return None
return kmh * 0.621371
def _format_wind(self, wind_speed, unit='mph'):
"""Format wind speed with unit"""
if wind_speed is None:
return "N/A"
return f"{wind_speed:.1f} {unit}"
def _geocode(self, location):
"""Geocode a location string to get latitude and longitude"""
cached_location = self.db.get_from_cache(location)
if cached_location:
self.log.info(f"Using cached geocoding for: {location}")
return {
'lat': cached_location['lat'],
'lon': cached_location['lon'],
'address': cached_location['location']
}
# If not in cache, query the geocoding service
try:
self.log.info(f"Geocoding new location: {location}")
location_data = self.geocoder.geocode(location)
if location_data:
# Add to our cache for future use
self.db.add_to_cache(
location,
location_data.address,
location_data.latitude,
location_data.longitude
)
return {
'lat': location_data.latitude,
'lon': location_data.longitude,
'address': location_data.address
}
return None
except (GeocoderTimedOut, GeocoderServiceError) as e:
self.log.error(f"Geocoding error: {e}")
return None
def _get_weather(self, lat, lon, unit='F', forecast_days=3):
"""Get weather data from Open-Meteo API"""
url = f"https://api.open-meteo.com/v1/forecast"
params = {
'latitude': lat,
'longitude': lon,
'current': 'temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m',
'daily': 'weather_code,temperature_2m_max,temperature_2m_min',
'timezone': 'auto',
'forecast_days': forecast_days
}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Process current weather
current = data.get('current', {})
temp_c = current.get('temperature_2m')
temp_f = self._celsius_to_fahrenheit(temp_c) if temp_c is not None else None
temp = temp_f if unit == 'F' else temp_c
humidity = current.get('relative_humidity_2m')
wind_kmh = current.get('wind_speed_10m')
wind_mph = self._kmh_to_mph(wind_kmh) if wind_kmh is not None else None
wind = wind_mph if unit == 'F' else wind_kmh
wind_unit = 'mph' if unit == 'F' else 'km/h'
weather_code = current.get('weather_code', 0)
conditions = self.weather_codes.get(weather_code, "Unknown")
# Process forecast
forecast = []
daily = data.get('daily', {})
dates = daily.get('time', [])
min_temps_c = daily.get('temperature_2m_min', [])
max_temps_c = daily.get('temperature_2m_max', [])
codes = daily.get('weather_code', [])
for i in range(min(len(dates), forecast_days)):
date_str = dates[i]
min_temp_c = min_temps_c[i] if i < len(min_temps_c) else None
max_temp_c = max_temps_c[i] if i < len(max_temps_c) else None
code = codes[i] if i < len(codes) else 0
min_temp_f = self._celsius_to_fahrenheit(min_temp_c) if min_temp_c is not None else None
max_temp_f = self._celsius_to_fahrenheit(max_temp_c) if max_temp_c is not None else None
min_temp = min_temp_f if unit == 'F' else min_temp_c
max_temp = max_temp_f if unit == 'F' else max_temp_c
# Format date to day of week
try:
date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d')
day = date_obj.strftime('%A')
except ValueError:
day = date_str
forecast.append({
'day': day,
'date': date_str,
'min_temp': min_temp,
'max_temp': max_temp,
'conditions': self.weather_codes.get(code, "Unknown")
})
return {
'current': {
'temp': temp,
'humidity': humidity,
'wind': wind,
'wind_unit': wind_unit,
'conditions': conditions
},
'forecast': forecast,
'unit': unit
}
except requests.RequestException as e:
self.log.error(f"Weather API error: {e}")
return None
def setweather(self, irc, msg, args, location):
"""<location>
Set your location for weather lookups. Location can be a city name, ZIP code, or any location string.
"""
# Improved identification check
try:
user = ircdb.users.getUser(msg.prefix)
# User is identified if we got this far
except KeyError:
irc.error("You must be identified to use this command. Register with NickServ and identify yourself.", Raise=True)
# Get the user's name
username = msg.prefix.split('!')[0]
# Geocode the location
location_data = self._geocode(location)
if not location_data:
irc.error("Could not find that location. Please try a different search term.", Raise=True)
# Store the user's location
self.db.set(username, location_data['address'], location_data['lat'], location_data['lon'])
irc.reply(f"Your location has been set to: {location_data['address']}")
setweather = wrap(setweather, ['text'])
def delweather(self, irc, msg, args):
"""takes no arguments
Delete your saved location.
"""
# Improved identification check
try:
user = ircdb.users.getUser(msg.prefix)
# User is identified if we got this far
except KeyError:
irc.error("You must be identified to use this command. Register with NickServ and identify yourself.", Raise=True)
# Get the user's name
username = msg.prefix.split('!')[0]
# Remove the user's location
self.db.remove(username)
irc.reply("Your location has been deleted.")
delweather = wrap(delweather)
def weather(self, irc, msg, args, location):
"""[<location>]
Show weather for your saved location, or for the specified location.
"""
channel = msg.args[0] if irc.isChannel(msg.args[0]) else None
unit = self.registryValue('defaultUnit', channel)
show_humidity = self.registryValue('showHumidity', channel)
show_wind = self.registryValue('showWind', channel)
show_forecast = self.registryValue('showForecast', channel)
forecast_days = self.registryValue('forecastDays', channel)
# Get the user's name
user = msg.prefix.split('!')[0]
# Determine if showing weather for a location or the user's saved location
if location:
# Looking up weather for the specified location
location_data = self._geocode(location)
if not location_data:
irc.error("Could not find that location. Please try a different search term.", Raise=True)
lat = location_data['lat']
lon = location_data['lon']
location_name = location_data['address']
weather_for = location_name
else:
# Looking up the sender's weather from their saved location
user_data = self.db.get(user)
if not user_data:
irc.error("You have not set a location. Use '.weather <location>' to check the weather for a specific location, or '.setweather <location>' to save your location if you're registered.", Raise=True)
lat = user_data['lat']
lon = user_data['lon']
location_name = user_data['location']
weather_for = f"your location ({location_name})"
# Get the weather data
weather_data = self._get_weather(lat, lon, unit, forecast_days)
if not weather_data:
irc.error("Could not retrieve weather data. Please try again later.", Raise=True)
# Format the current weather
current = weather_data['current']
temp = self._format_temp(current['temp'], unit)
conditions = current['conditions']
reply = f"Weather for {weather_for}: {temp}, {conditions}"
if show_humidity and current['humidity'] is not None:
reply += f", Humidity: {current['humidity']}%"
if show_wind and current['wind'] is not None:
reply += f", Wind: {self._format_wind(current['wind'], current['wind_unit'])}"
# Send the current weather
irc.reply(reply)
# Format and send the forecast if requested
if show_forecast and weather_data['forecast']:
forecast_parts = []
for day in weather_data['forecast']:
min_temp = self._format_temp(day['min_temp'], unit)
max_temp = self._format_temp(day['max_temp'], unit)
forecast_parts.append(f"{day['day']}: {min_temp} to {max_temp}, {day['conditions']}")
forecast_msg = f"Forecast: {' | '.join(forecast_parts)}"
irc.reply(forecast_msg)
weather = wrap(weather, [optional('text')])
Class = Weather
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: