mira-modules/papatutuwawa/mira/modules/pollen.py

273 lines
9.3 KiB
Python

"""
Copyright (C) 2021 Alexander "PapaTutuWawa"
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
# TODO: remove Alle sollte jede subscription entfernen
# TODO: Deal with weird casing and typos
# TODO: pollen hilfe <CMD> should print help per command
import asyncio
import datetime
from mira.module import BaseModule
import requests
import aioxmpp
NAME = "pollen"
API_ENDPOINT = "https://allergie.hexal.de/pollenflug/vorhersage/load_pollendaten.php"
def api_query(plz, date):
return API_ENDPOINT + "?datum={}&plz={}".format(date, plz)
def intensity_str(intensity):
"""
Map the intensity value to a human readable string
"""
return ("Kein Pollenflug", "Schwach", "Mittelschwer", "Stark")[intensity]
class PollenModule(BaseModule):
__instance = None
_subcommand_table = {}
_pollen_data = {} # PLZ -> Date -> Type -> Severity
_dates_notified = {} # JID -> [Dates already sent out]
_sleep_duration = 100
@staticmethod
def get_instance(base, **kwargs):
if PollenModule.__instance == None:
PollenModule(base, **kwargs)
return PollenModule.__instance
def __init__(self, base, **kwargs):
if PollenModule.__instance != None:
raise Exception("Trying to init singleton twice")
super().__init__(base, **kwargs)
PollenModule.__instance = self
self._subcommand_table = {
"subscribe": self._subscribe,
"unsubscribe": self._unsubscribe,
"hilfe": self._help,
"*": self._any,
}
self._sleep_duration = self.get_option("sleep_duration", 12 * 3600)
# Load data
# NOTE: Since we request the pollen_data at start anyway, we won't
# need to save it. We only save dates_notified to prevent
# sending one notification too many.
self._dates_notified = self._stm.get_data("dates_notified")
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._request_pollen_data(True))
def notification_body(self, plz, date, allergies):
"""
Format a message that will be sent to the user
"""
msg = "{} (*{}*)\n".format(date, plz)
if "Alle" in [x["type"] for x in allergies]:
allergies = list(self._pollen_data[plz][date].keys)
count = 0
for allergy in allergies:
intensity = self._pollen_data[plz][date][allergy["type"]]
if intensity >= allergy["level"]:
msg += "{}: {}".format(allergy["type"], intensity_str(intensity))
count += 1
if count == 0:
return ""
return msg
def _is_notified(self, jid, date):
"""
Returns True, when the pollen data for the date of date has
already been sent to jid.
"""
if not jid in self._dates_notified:
return False
return date in self._dates_notified[jid]
def _set_dates_notified(self, jid, new_dates):
"""
Marks a date as being sent to a user. Also ensures that
there are at maximum 7 days in this list
"""
if not jid in self._dates_notified:
self._dates_notified[jid] = new_dates
self._stm.set_data("dates_notified", self._dates_notified)
return
self._dates_notified[jid] += new_dates
self._dates_notified[jid] = self._dates_notified[jid][-7:]
self._stm.set_data("dates_notified", self._dates_notified)
def _broadcast_jid(self, jid, filter_allergies=[]):
"""
Sends the pollen data to jid. If filter_allergies is set, then
only the data for the pollen types in filter_allergies will be sent.
"""
for plz, data in self._sum.get_subscriptions_for_jid(jid).items():
if filter_allergies:
allergies = filter_allergies
else:
allergies = data["data"]
notified = []
for date in self._pollen_data[plz]:
if self._is_notified(jid, date) and not filter_allergies:
continue
notified.append(date)
msg = self.notification_body(plz, date, allergies)
if msg:
self.send_message(aioxmpp.JID.fromstr(jid), msg)
if notified:
self._set_dates_notified(jid, notified)
def _broadcast_all(self):
for plz in self._pollen_data:
for jid, data in self._sum.get_subscriptions_for_keyword(plz).items():
notified = []
for date in self._pollen_data[plz]:
if self._is_notified(jid, date):
continue
notified.append(date)
self.send_message(
aioxmpp.JID.fromstr(jid),
self.notification_body(plz, date, data),
)
if notified:
self._set_dates_notified(jid, notified)
async def _request_pollen_data(self, loop):
while True:
today = datetime.date.today().strftime("%Y-%m-%d")
for plz in self._sum.get_subscription_keywords():
req = requests.get(api_query(plz, today))
data = req.json()["content"]
pollen = data["pollen"]
for date in data["values"]:
if not plz in self._pollen_data:
self._pollen_data[plz] = {}
if not date in self._pollen_data[plz]:
self._pollen_data[plz][date] = {}
for i in range(len(pollen)):
type_ = pollen[i]
self._pollen_data[plz][date][type_] = int(
data["values"][date][i]
)
# For when we want to just refresh data as someone just subscribed
if not loop:
break
self._broadcast_all()
await asyncio.sleep(self._sleep_duration)
async def _subscribe(self, cmd, msg):
if len(cmd) < 3 or len(cmd) > 4:
self.send_message(
msg.from_, "Verwendung: pollen add <PLZ> <Allergen> [<Level>]"
)
return
plz = cmd[1]
allergy = cmd[2]
jid = str(msg.from_.bare())
level = int(cmd[3]) if len(cmd) == 4 else 0
if self._sum.is_subscribed_to_data(jid, plz, "Alle"):
self.send_message(msg.from_, "Du hast schon alle Allergene abonniert")
return
if self._sum.is_subscribed_to_data(jid, plz, allergy):
self.send_message(msg.from_, "Du hast das schon abonniert")
return
if level < 0 or level > 3:
self.send_message(msg.from_, "Das Level muss zwischen 0 und 3 liegen")
return
self._sum.append_subscription_data(
jid, plz, {"type": allergy, "level": level}
)
self.send_message(msg.from_, "Du erhälst nun Pollenmeldungen zu %s" % allergy)
# Just some bandwidth saving measure
if plz not in self._pollen_data:
await self._request_pollen_data(False)
self._broadcast_jid(jid, [{"type": allergy, "level": level}])
async def _unsubscribe(self, cmd, msg):
if len(cmd) != 3:
self.send_message(msg.from_, "Verwendung: pollen remove <PLZ> <Allergen>")
return
plz = cmd[1]
allergy = cmd[2]
jid = str(msg.from_.bare())
if allergy == "Alle":
self._sum.filter_subscription_data_items(jid, plz, lambda x: False)
self.send_message(msg.from_, "Du erhälst keine Pollenmeldungen mehr")
return
if not self._sum.is_subscribed_to_data_one(
jid, plz, lambda x: x["type"] == allergy
):
self.send_message(msg.from_, "Du hast %s nicht abonniert" % (allergy))
return
self._sum.filter_subscription_data_items(
str(msg.from_.bare()), plz, lambda x: x["type"] != allergy
)
self.send_message(
msg.from_, "Du erhälst nun keine Pollenmeldungen zu %s mehr" % (allergy)
)
async def _help(self, cmd, msg):
body = """Verfügbare Befehle:
pollen subscribe <PLZ> <Typ> [<Min. Level>]
pollen unsubscribe <PLZ> <Typ>
pollen hilfe
Pollentypen: Ambrosia, Ampfer, Beifuß, Birke, Buche, Eiche, Erle, Esche, Gräser, Hasel, Pappel, Roggen, Ulme, Wegerich, Weide, Alle (Alias für alle Pollentypen)
Level: 0 (Kein Pollenflug) - 3 (Starker Pollenflug)
"""
self.send_message(msg.from_, body)
async def _any(self, cmd, msg):
self.send_message(msg.from_, "Unbekannter Befehl")
await self._help(cmd, msg)
def get_instance(base, **kwargs):
return PollenModule.get_instance(base, **kwargs)