import os import sys import json import datetime import configparser import threading import logging import signal import asyncio from collections import namedtuple import aioxmpp from aioxmpp.structs import PresenceShow import requests from janine.utils import find_one, find_all from janine.sources import sources_from_config, MiscDataSources log = logging.getLogger('janine') #log.basicConfig(level=logging.INFO) Warning = namedtuple('Warning', ['id', 'sent', 'effective_from', 'expires', 'urgency', 'sender', 'headline', 'description', 'instruction']) def to_warning(data): ''' Returns a Warning given the raw data ''' info = find_one(lambda x: 'headline' in x.keys(), data['info']) return Warning(id=data['identifier'], sent=data['sent'], # Not all items have to have those effective_from=info.get('effective', 'N/A'), # Not all items have to have those expires=info.get('expires', 'N/A'), urgency=info['urgency'], # Not all items have to have those sender=info.get('senderName', 'N/A'), headline=info['headline'], description=info['description'], instruction=info.get('instruction', 'N/A')) def landkreis_filter(kreis, item): ''' Returns True when the item is relevant to the Landkreis @kreis ''' info = find_one(lambda e: 'area' in e.keys(), item['info']) geocode = find_one(lambda e: 'geocode' in e.keys(), info['area']) # Note: Some items may have multiple Landkreise values = list(map(lambda e: e['valueName'], geocode['geocode'])) return kreis in values def parse_data(text, filter_): ''' Reads the remote response, parses it and returns a list of warnings. ''' data = json.loads(text) return [to_warning(raw) for raw in find_all(filter_, data)] class WarningBot: def __init__(self): self._warnings0 = [] self._warnings1 = [] self._client = None self._refresh_timeout = 630 # 15min self._load_config() async def connect(self): ''' Starts the "event loop" of the bot ''' self._client = aioxmpp.PresenceManagedClient( self._jid, aioxmpp.make_security_layer(self._password)) # First we check if the entered Landkreis is valid channels = {} if not os.path.exists('/etc/janine/channels.json'): log.debug('Requesting search channels') req = requests.get(MiscDataSources.channels()) channels = json.loads(req.text) try: with open('/etc/janine/channels.json', 'w') as f: f.write(json.dumps(channels)) except Exception as err: log.error('Failed to cache channel data:') log.error(str(err)) else: with open('/etc/janine/channels.json', 'r') as f: channels = json.loads(f.read()) for key in channels.keys(): if channels[key].get('NAME', '') == self._landkreis: log.info('Landkreis valid') break else: log.error('Invalid Landkreis') return async with self._client.connected() as stream: logging.info('Client connected to server') # In case you want a nice avatar if self._avatar: log.info('Setting avatar') with open(self._avatar, 'rb') as f: image_data = f.read() avatar_set = aioxmpp.avatar.AvatarSet() avatar_set.add_avatar_image('image/png', image_bytes=image_data) await selg.avatar.publish_avatar_set(avatar_set) # Set some presence information self._client.set_presence( aioxmpp.PresenceState(available=True, show=PresenceShow.DND), self._status) # Start our fetch-send loop # NOTE: Originally, I wanted to use a cronjob and # signal.signal(...) for this but you can't # use async in event handlers loop = asyncio.get_event_loop() periodic = loop.create_task(self._periodic_requests()) logging.info('Periodic ticker started') await periodic async def _periodic_requests(self): ''' "Executes" every self._refresh_timeout seconds to fetch all configured warnings and send them to the users, if there are any new ones. ''' while True: logging.debug('Refreshing warning list') await self._fetch_warnings() await asyncio.sleep(self._refresh_timeout) def _filter_func(self): return lambda item: landkreis_filter(self._landkreis, item) async def _fetch_warnings(self): ''' Fetches all warnings and tries to find new ones to send notifications. ''' self._warnings1 = self._warnings0 self._warnings0 = [] for source in self._sources: req = requests.get(source) self._warnings0 = parse_data(req.text, self._filter_func()) # Find new warnings and send the new ones ids = map(lambda x: x.id, self._warnings1) for warning in self._warnings0: if warning.id in ids: continue await self._send_notification(warning) def __time_format(self, time_str): ''' Reformat ISO style time data to a more readable format. ''' date = None try: date = datetime.datetime.fromisoformat(time_str) except: pass if not date: return time_str return '{}.{}.{} {}:{}'.format(date.day, date.month, date.year, date.hour, date.minute) async def _send_notification(self, warning): ''' Send a warning to all the recipients ''' # Reformat the message a bit effective_time = self.__time_format(warning.effective_from) expiry_time = self.__time_format(warning.expires) body = '*{}*\n({} bis {})\n\n{}'.format(warning.headline, effective_time, expiry_time, warning.description) # Smells like script injection, but okay body = body.replace('
', '\n') body = body.replace('
', '\n') for recipient in self._recipients: msg = aioxmpp.stanza.Message( to=aioxmpp.JID.fromstr(recipient), type_=aioxmpp.MessageType.CHAT) msg.body[None] = body await self._client.send(msg) def _load_config(self): # Load config config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf' config = configparser.ConfigParser() config.read(config_path) # Configure sources self._sources = sources_from_config(config) self._landkreis = config['General']['Landkreis'] self._recipients = config['General']['Recipients'].split(',') self._refresh_timeout = int(config['General']['Timeout']) # Bot Config self._jid = aioxmpp.JID.fromstr(config['Bot']['JID']) self._password = config['Bot']['Password'] self._avatar = config['Bot'].get('Avatar', None) self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen') def main(): bot = WarningBot() loop = asyncio.get_event_loop() loop.run_until_complete(bot.connect()) loop.close() if __name__ == '__main__': main()