From 1b36a139931a5099199ed138e3cb832dfcea338c Mon Sep 17 00:00:00 2001 From: Alexander PapaTutuWawa Date: Thu, 17 Sep 2020 15:32:12 +0200 Subject: [PATCH] feat: Allow subscribtion to Landkreise --- janine.example.conf | 1 - janine/janine.py | 178 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 139 insertions(+), 40 deletions(-) diff --git a/janine.example.conf b/janine.example.conf index 34cc998..3ac1af5 100644 --- a/janine.example.conf +++ b/janine.example.conf @@ -3,7 +3,6 @@ IHP = y # Hochwasserwarnungen DWD = y # Unwetterwarnungen MOWAS = y # Gefahrendurchsagen BIWAPP = y # Warnmeldungen -Landkreis = Stuttgart # Warnungen für diesen Landkreis weiterleiten Recipients = some.user@xmpp.server,other@xmpp.server # Empfänger Timeout=630 # Zeit in Sekunden nach welcher nach neuen Warnungen geschaut wird DataDir = /etc/janine/data # Verzeichnis für persistente Daten diff --git a/janine/janine.py b/janine/janine.py index ae3a60d..4736c3c 100644 --- a/janine/janine.py +++ b/janine/janine.py @@ -28,7 +28,8 @@ Warning = namedtuple('Warning', ['id', 'sender', 'headline', 'description', - 'instruction']) + 'instruction', + 'landkreise']) def to_warning(data): ''' @@ -46,31 +47,32 @@ def to_warning(data): sender=info.get('senderName', 'N/A'), headline=info['headline'], description=info['description'], - instruction=info.get('instruction', 'N/A')) + instruction=info.get('instruction', 'N/A'), + landkreise=get_landkreise(data)) -def landkreis_filter(kreis, item): +def get_landkreise(data): ''' - Returns True when the item is relevant to the Landkreis @kreis + Returns the list of Landkreise relevant to the warning in @data ''' - info = find_one(lambda e: 'area' in e.keys(), item['info']) + info = find_one(lambda e: 'area' in e.keys(), data['info']) geocode = find_one(lambda e: 'geocode' in e.keys(), info['area']) # Note: Some items may have multiple Landkreise - values = list(map(lambda e: e['valueName'], geocode['geocode'])) - return kreis in values + return list(map(lambda e: e['valueName'], geocode['geocode'])) -def parse_data(text, filter_): +def parse_data(text): ''' Reads the remote response, parses it and returns a list of warnings. ''' data = json.loads(text) - return [to_warning(raw) for raw in find_all(filter_, data)] + return [to_warning(el) for el in data] class WarningBot: def __init__(self): self._warnings0 = [] self._warnings1 = [] self._client = None + self._warn_clients = {} self._refresh_timeout = 630 # 15min # Configuration stuff @@ -87,30 +89,25 @@ class WarningBot: aioxmpp.make_security_layer(self._password)) # First we check if the entered Landkreis is valid + self._channels = [] channels = {} channel_file = os.path.join(self._data_dir, 'channels.json') if not os.path.exists(channel_file): log.debug('Requesting search channels') req = requests.get(MiscDataSources.channels()) channels = json.loads(req.text) + self._channels = list(map(lambda key: channels[key].get('NAME', ''), + channels.keys())) try: with open(channel_file, 'w') as f: - f.write(json.dumps(channels)) + f.write(json.dumps(self._channels)) except Exception as err: log.error('Failed to cache channel data:') log.error(str(err)) else: with open(channel_file, 'r') as f: - channels = json.loads(f.read()) - - for key in channels.keys(): - if channels[key].get('NAME', '') == self._landkreis: - log.info('Landkreis valid') - break - else: - log.error('Invalid Landkreis') - return + self._channels = json.loads(f.read()) async with self._client.connected() as stream: logging.info('Client connected to server') @@ -130,7 +127,14 @@ class WarningBot: aioxmpp.PresenceState(available=True, show=PresenceShow.DND), self._status) - + + # Register the message handler + self._client.stream.register_message_callback( + aioxmpp.MessageType.CHAT, + None, + self._handle_message) + logging.info('Message handler registered') + # Start our fetch-send loop # NOTE: Originally, I wanted to use a cronjob and # signal.signal(...) for this but you can't @@ -140,6 +144,100 @@ class WarningBot: logging.info('Periodic ticker started') await periodic + def __is_message_valid(self, msg): + if msg.type_ != aioxmpp.MessageType.CHAT: + return False + + if not msg.body: + return False + + # TODO: Make this configurable + if msg.from_.domain != self._jid.domain: + return False + + return True + + def __make_msg(self, to, body): + msg = aioxmpp.Message( + type_=aioxmpp.MessageType.CHAT, + to=to) + msg.body[None] = body + + return msg + + def _handle_message(self, msg): + # Handle cases we don't want to deal with + if not self.__is_message_valid(msg): + return + + cmd_parts = str(msg.body.any()).split(' ') + cmd = cmd_parts[0].lower() + + if cmd == 'subscribe': + # Do we have a landkreis? + if len(cmd_parts) < 2: + self._client.enqueue(self.__make_msg( + to=msg.from_, + body='Du hast keinen Landkreis angegeben')) + return + + # Check if the entered Landkreis is valid + landkreis = ' '.join(cmd_parts[1:]) + if not landkreis in self._channels: + self._client.enqueue(self.__make_msg( + to=msg.from_, + body='Der angegebene Landkreis ist ungültig')) + return + + if not landkreis in self._warn_clients.keys(): + self._warn_clients[landkreis] = [] + self._warn_clients[landkreis].append(str(msg.from_)) + + # TODO: Flush self._warn_clients to disk + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=f'Du erhälst nun Nachrichten zu {landkreis} von mir')) + + # Send all known warnings for the landkreis to the user + for warning in self._warnings0: + if landkreis in warning.landkreise: + body = self._format_warning(warning) + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=body)) + + elif cmd == 'unsubscribe': + # Do we have a landkreis? + if len(cmd_parts) < 2: + self._client.enqueue(self.__make_msg( + to=msg.from_, + body='Du hast keinen Landkreis angegeben')) + return + + landkreis = ' '.join(cmd_parts[1:]) + if not landreis in self._warn_clients.keys(): + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=f'Du hast {landkreis} nicht abonniert')) + return + + if str(msg.from_) in self._warn_clients[landkreis]: + filter_ = lambda x: x != str(msg.from_) + self._warn_clients[landkreis] = list(filter(filter_, + self._warn_clients[landkreis])) + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir')) + else: + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=f'Du hast {landkreis} nicht abonniert')) + elif cmd == 'help': + body = 'Verfügbare Befehle:\n\nsubscribe - Abonniere einen Landkreis\nunsubscribe - Entferne das Abonnement zu einem Landkreis\nhelp - Gebe diese Hilfe aus' + self._client.enqueue(self.__make_msg( + to=msg.from_, + body=body)) + async def _periodic_requests(self): ''' "Executes" every self._refresh_timeout seconds to fetch all @@ -151,9 +249,6 @@ class WarningBot: await self._fetch_warnings() await asyncio.sleep(self._refresh_timeout) - def _filter_func(self): - return lambda item: landkreis_filter(self._landkreis, item) - async def _fetch_warnings(self): ''' Fetches all warnings and tries to find new ones @@ -163,15 +258,23 @@ class WarningBot: self._warnings0 = [] for source in self._sources: req = requests.get(source) - self._warnings0 = parse_data(req.text, self._filter_func()) + self._warnings0 = parse_data(req.text) # Find new warnings and send the new ones ids = map(lambda x: x.id, self._warnings1) for warning in self._warnings0: if warning.id in ids: continue - - await self._send_notification(warning) + + if len(set(warning.landkreise).intersection(self._warn_clients.keys())): + body = self._format_warning(warning) + for landkreis in warning.landkreise: + for to in self._warn_clients.get(landkreis, []): + msg = aioxmpp.stanza.Message( + to=aioxmpp.JID.fromstr(to), + type_=aioxmpp.MessageType.CHAT) + msg.body[None] = body + await self._client.send(msg) def __time_format(self, time_str): ''' @@ -193,27 +296,19 @@ class WarningBot: date.hour, date.minute) - async def _send_notification(self, warning): + def _format_warning(self, warning): ''' Send a warning to all the recipients ''' # Reformat the message a bit effective_time = self.__time_format(warning.effective_from) expiry_time = self.__time_format(warning.expires) - body = '*{}*\n({} bis {})\n\n{}'.format(warning.headline, - effective_time, - expiry_time, - warning.description) + body = f'*{warning.headline}*\n({effective_time} bis {expiry_time})\n\n{warning.description}' + # Smells like script injection, but okay body = body.replace('
', '\n') body = body.replace('
', '\n') - - for recipient in self._recipients: - msg = aioxmpp.stanza.Message( - to=aioxmpp.JID.fromstr(recipient), - type_=aioxmpp.MessageType.CHAT) - msg.body[None] = body - await self._client.send(msg) + return body def _load_config(self): # Load config @@ -224,9 +319,14 @@ class WarningBot: # Configure sources self._sources = sources_from_config(config) self._data_dir = config['General'].get('DataDir', '/etc/janine') - self._landkreis = config['General']['Landkreis'] self._recipients = config['General']['Recipients'].split(',') self._refresh_timeout = int(config['General']['Timeout']) + + # Warning data + client_file = os.path.join(self._data_dir, 'clients.json') + if os.path.exists(client_file): + with open(client_file, 'r') as cf: + self._warn_clients = json.loads(cf.read()) # Bot Config self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])