mira-modules/papatutuwawa/mira/modules/pollen.py

272 lines
9.6 KiB
Python

'''
Copyright (C) 2021 Alexander "PapaTutuWawa"
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
# TODO: remove Alle sollte jede subscription entfernen
# TODO: Deal with weird casing and typos
# TODO: pollen hilfe <CMD> should print help per command
import asyncio
import datetime
from mira.module import BaseModule
import requests
import aioxmpp
NAME = 'pollen'
API_ENDPOINT = 'https://allergie.hexal.de/pollenflug/vorhersage/load_pollendaten.php'
def api_query(plz, date):
return API_ENDPOINT + '?datum={}&plz={}'.format(date, plz)
def intensity_str(intensity):
'''
Map the intensity value to a human readable string
'''
return ('Kein Pollenflug',
'Schwach',
'Mittelschwer',
'Stark')[intensity]
class PollenModule(BaseModule):
__instance = None
_subcommand_table = {}
_pollen_data = {} # PLZ -> Date -> Type -> Severity
_dates_notified = {} # JID -> [Dates already sent out]
_sleep_duration = 100
@staticmethod
def get_instance(base, **kwargs):
if PollenModule.__instance == None:
PollenModule(base, **kwargs)
return PollenModule.__instance
def __init__(self, base, **kwargs):
if PollenModule.__instance != None:
raise Exception('Trying to init singleton twice')
super().__init__(base, **kwargs)
PollenModule.__instance = self
self._subcommand_table = {
'subscribe': self._subscribe,
'unsubscribe': self._unsubscribe,
'hilfe': self._help,
'*': self._any
}
self._sleep_duration = self.get_option('sleep_duration', 12 * 3600)
# Load data
# NOTE: Since we request the pollen_data at start anyway, we won't
# need to save it. We only save dates_notified to prevent
# sending one notification too many.
self._dates_notified = self._stm.get_data('dates_notified')
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._request_pollen_data(True))
def notification_body(self, plz, date, allergies):
'''
Format a message that will be sent to the user
'''
msg = '{} (*{}*)\n'.format(date, plz)
if 'Alle' in [x['type'] for x in allergies]:
allergies = list(self._pollen_data[plz][date].keys)
count = 0
for allergy in allergies:
intensity = self._pollen_data[plz][date][allergy['type']]
if intensity >= allergy['level']:
msg += '{}: {}'.format(allergy['type'], intensity_str(intensity))
count += 1
if count == 0:
return ''
return msg
def _is_notified(self, jid, date):
'''
Returns True, when the pollen data for the date of date has
already been sent to jid.
'''
if not jid in self._dates_notified:
return False
return date in self._dates_notified[jid]
def _set_dates_notified(self, jid, new_dates):
'''
Marks a date as being sent to a user. Also ensures that
there are at maximum 7 days in this list
'''
if not jid in self._dates_notified:
self._dates_notified[jid] = new_dates
self._stm.set_data('dates_notified', self._dates_notified)
return
self._dates_notified[jid] += new_dates
self._dates_notified[jid] = self._dates_notified[jid][-7:]
self._stm.set_data('dates_notified', self._dates_notified)
def _broadcast_jid(self, jid, filter_allergies=[]):
'''
Sends the pollen data to jid. If filter_allergies is set, then
only the data for the pollen types in filter_allergies will be sent.
'''
for plz, data in self._sum.get_subscriptions_for(jid).items():
if filter_allergies:
allergies = filter_allergies
else:
allergies = data['data']
notified = []
for date in self._pollen_data[plz]:
if self._is_notified(jid, date) and not filter_allergies:
continue
notified.append(date)
msg = self.notification_body(plz,
date,
allergies)
if msg:
self.send_message(aioxmpp.JID.fromstr(jid),
msg)
if notified:
self._set_dates_notified(jid, notified)
def _broadcast_all(self):
for plz in self._pollen_data:
for subscription in self._sum.get_subscriptions_for_keyword(plz):
jid = subscription[0]
data = subscription[1]
notified = []
for date in self._pollen_data[plz]:
if self._is_notified(jid, date):
continue
notified.append(date)
self.send_message(aioxmpp.JID.fromstr(jid),
self.notification_body(plz,
date,
data))
if notified:
self._set_dates_notified(jid, notified)
async def _request_pollen_data(self, loop):
while True:
today = datetime.date.today().strftime('%Y-%m-%d')
for plz in self._sum.get_subscription_keywords():
req = requests.get(api_query(plz, today))
data = req.json()['content']
pollen = data['pollen']
for date in data['values']:
if not plz in self._pollen_data:
self._pollen_data[plz] = {}
if not date in self._pollen_data[plz]:
self._pollen_data[plz][date] = {}
for i in range(len(pollen)):
type_ = pollen[i]
self._pollen_data[plz][date][type_] = int(data['values'][date][i])
# For when we want to just refresh data as someone just subscribed
if not loop:
break
self._broadcast_all()
await asyncio.sleep(self._sleep_duration)
async def _subscribe(self, cmd, msg):
if len(cmd) < 3 or len(cmd) > 4:
self.send_message(msg.from_, 'Verwendung: pollen add <PLZ> <Allergen> [<Level>]')
return
plz = cmd[1]
allergy = cmd[2]
jid = str(msg.from_.bare())
level = int(cmd[3]) if len(cmd) == 4 else 0
if self._sum.is_subscribed_to_data(jid, plz, 'Alle'):
self.send_message(msg.from_, 'Du hast schon alle Allergene abonniert')
return
if self._sum.is_subscribed_to_data(jid, plz, allergy):
self.send_message(msg.from_, 'Du hast das schon abonniert')
return
if level < 0 or level > 3:
self.send_message(msg.from_, 'Das Level muss zwischen 0 und 3 liegen')
return
self._sum.append_data_for_subscription(jid, plz, {
'type': allergy,
'level': level
})
self.send_message(msg.from_, 'Du erhälst nun Pollenmeldungen zu %s' % allergy)
# Just some bandwidth saving measure
if plz not in self._pollen_data:
await self._request_pollen_data(False)
self._broadcast_jid(jid, [{
'type': allergy,
'level': level
}])
async def _unsubscribe(self, cmd, msg):
if len(cmd) != 3:
self.send_message(msg.from_, 'Verwendung: pollen remove <PLZ> <Allergen>')
return
plz = cmd[1]
allergy = cmd[2]
jid = str(msg.from_.bare())
if allergy == 'Alle':
self._sum.filter_items_for_subscriptions(jid, plz, lambda x: False)
self.send_message(msg.from_, 'Du erhälst keine Pollenmeldungen mehr')
return
if not self._sum.is_subscribed_to_data_one(jid, plz, lambda x: x['type'] == allergy):
self.send_message(msg.from_, 'Du hast %s nicht abonniert' % (allergy))
return
self._sum.filter_items_for_subscription(str(msg.from_.bare()),
plz,
lambda x: x['type'] != allergy)
self.send_message(msg.from_, 'Du erhälst nun keine Pollenmeldungen zu %s mehr' % (allergy))
async def _help(self, cmd, msg):
body = '''Verfügbare Befehle:
pollen subscribe <PLZ> <Typ> [<Min. Level>]
pollen unsubscribe <PLZ> <Typ>
pollen hilfe
Pollentypen: Ambrosia, Ampfer, Beifuß, Birke, Buche, Eiche, Erle, Esche, Gräser, Hasel, Pappel, Roggen, Ulme, Wegerich, Weide, Alle (Alias für alle Pollentypen)
Level: 0 (Kein Pollenflug) - 3 (Starker Pollenflug)
'''
self.send_message(msg.from_, body)
async def _any(self, cmd, msg):
self.send_message(msg.from_, 'Unbekannter Befehl')
await self._help(cmd, msg)
def get_instance(base, **kwargs):
return PollenModule.get_instance(base, **kwargs)