''' This file is part of JANINE. JANINE is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. JANINE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with JANINE. If not, see . ''' import os import sys import json import configparser import logging import asyncio from collections import namedtuple import aioxmpp from aioxmpp.structs import PresenceShow import requests from janine.utils import find_one, make_msg, format_warning from janine.sources import sources_from_config, MiscDataSources log = logging.getLogger('janine') log.setLevel(logging.INFO) log.addHandler(logging.StreamHandler()) Warning_ = namedtuple('Warning_', ['id', 'sent', 'effective_from', 'expires', 'urgency', 'sender', 'headline', 'description', 'instruction', 'landkreise']) def stub_warning(id_): ''' Returns a stubbed warning for loading warnings from disk. The only real attribute is the @id_ . ''' return Warning_(id=id_, sent='', effective_from='', expires='', urgency='', sender='', headline='', description='', instruction='', landkreise=[]) def to_warning(data): ''' Returns a Warning given the raw data ''' info = find_one(lambda x: 'headline' in x.keys(), data['info']) return Warning_(id=data['identifier'], sent=data['sent'], # Not all items have to have those effective_from=info.get('effective', 'N/A'), # Not all items have to have those expires=info.get('expires', 'N/A'), urgency=info['urgency'], # Not all items have to have those sender=info.get('senderName', 'N/A'), headline=info['headline'], description=info['description'], instruction=info.get('instruction', 'N/A'), landkreise=get_landkreise(data)) def get_landkreise(data): ''' Returns the list of Landkreise relevant to the warning in @data ''' info = find_one(lambda e: 'area' in e.keys(), data['info']) geocode = find_one(lambda e: 'geocode' in e.keys(), info['area']) # Note: Some items may have multiple Landkreise return list(map(lambda e: e['valueName'], geocode['geocode'])) def parse_data(text): ''' Reads the remote response, parses it and returns a list of warnings. ''' data = json.loads(text) return [to_warning(el) for el in data] class WarningBot: ''' This class represents the actual bot. The only things to be done is call connect() after creating an instance. ''' def __init__(self): self._warnings0 = [] self._warnings1 = [] self._client = None self._warn_clients = {} self._refresh_timeout = 630 # 15min # Configuration stuff self._data_dir = '' self._client_store = '' self._warning_store = '' self._load_config() async def connect(self): ''' Starts the "event loop" of the bot ''' self._client = aioxmpp.PresenceManagedClient( self._jid, aioxmpp.make_security_layer(self._password)) async with self._client.connected(): log.info('Client connected to server') # In case you want a nice avatar if self._avatar: log.info('Setting avatar') with open(self._avatar, 'rb') as avatar_file: image_data = avatar_file.read() avatar_set = aioxmpp.avatar.AvatarSet() avatar_set.add_avatar_image('image/png', image_bytes=image_data) await (self._client.summon(aioxmpp.avatar.AvatarService) .publish_avatar_set(avatar_set)) # Set some presence information self._client.set_presence( aioxmpp.PresenceState(available=True, show=PresenceShow.CHAT), self._status) # Enable Carbons await self._client.summon(aioxmpp.CarbonsClient).enable() log.info('Message carbons enabled') # Register the message handler self._client.stream.register_message_callback( aioxmpp.MessageType.CHAT, None, self._handle_message) log.info('Message handler registered') # Start our fetch-send loop # NOTE: Originally, I wanted to use a cronjob and # signal.signal(...) for this but you can't # use async in event handlers loop = asyncio.get_event_loop() periodic = loop.create_task(self._periodic_requests()) log.info('Periodic ticker started') await periodic def __is_message_valid(self, msg): ''' Returns True on messages we want to handle. False otherwise. ''' if msg.type_ != aioxmpp.MessageType.CHAT: return False if not msg.body: return False if msg.from_.domain != self._jid.domain and self._same_domain: return False return True def _handle_message(self, msg): # Handle cases we don't want to deal with if not self.__is_message_valid(msg): return cmd_parts = str(msg.body.any()).split(' ') cmd = cmd_parts[0].lower() if cmd == 'subscribe': # Do we have a landkreis? if len(cmd_parts) < 2: self._client.enqueue(make_msg( to=msg.from_, body='Du hast keinen Landkreis angegeben')) return # Check if the entered Landkreis is valid landkreis = ' '.join(cmd_parts[1:]) if not landkreis in self._channels: self._client.enqueue(make_msg( to=msg.from_, body='Der angegebene Landkreis ist ungültig')) return if landkreis not in self._warn_clients.keys(): self._warn_clients[landkreis] = [] self._warn_clients[landkreis].append(str(msg.from_.bare())) self._client.enqueue(make_msg( to=msg.from_, body=f'Du erhälst nun Nachrichten zu {landkreis} von mir')) with open(self._client_store, 'w') as clients_file: clients_file.write(json.dumps(self._warn_clients)) # Send all known warnings for the landkreis to the user for warning in self._warnings0: if landkreis in warning.landkreise: body = format_warning(warning) self._client.enqueue(make_msg( to=msg.from_, body=body)) elif cmd == 'unsubscribe': # Do we have a landkreis? if len(cmd_parts) < 2: self._client.enqueue(make_msg( to=msg.from_, body='Du hast keinen Landkreis angegeben')) return landkreis = ' '.join(cmd_parts[1:]) if landkreis not in self._warn_clients: self._client.enqueue(make_msg( to=msg.from_, body=f'Du hast {landkreis} nicht abonniert')) return if str(msg.from_.bare()) in self._warn_clients[landkreis]: filter_ = lambda x: x != str(msg.from_.bare()) self._warn_clients[landkreis] = list(filter(filter_, self._warn_clients[landkreis])) self._client.enqueue(make_msg( to=msg.from_, body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir')) if len(self._warn_clients[landkreis]) == 0: del self._warn_clients[landkreis] else: self._client.enqueue(make_msg( to=msg.from_, body=f'Du hast {landkreis} nicht abonniert')) elif cmd == 'help': body = '''Verfügbare Befehle: subscribe - Abonniere einen Landkreis unsubscribe - Entferne das Abonnement zu einem Landkreis help - Gebe diese Hilfe aus''' self._client.enqueue(make_msg( to=msg.from_, body=body)) else: self._client.enqueue(make_msg( to=msg.from_, body='Diesen Befehl kenne ich nicht... Mit "help" kannst du alle Befehle sehen, die ich kenne.')) async def _periodic_requests(self): ''' "Executes" every self._refresh_timeout seconds to fetch all configured warnings and send them to the users, if there are any new ones. ''' while True: log.debug('Refreshing warning list') await self._fetch_warnings() # Flush the warnings to disk ids = list(map(lambda x: x.id, self._warnings0)) with open(self._warnings_file, 'w') as warnings_file: warnings_file.write(json.dumps(ids)) await asyncio.sleep(self._refresh_timeout) async def _fetch_warnings(self): ''' Fetches all warnings and tries to find new ones to send notifications. ''' self._warnings1 = self._warnings0 self._warnings0 = [] for source in self._sources: req = requests.get(source) self._warnings0 = parse_data(req.text) # Find new warnings and send the new ones ids = map(lambda x: x.id, self._warnings1) for warning in self._warnings0: if warning.id in ids: continue if len(set(warning.landkreise).intersection(self._warn_clients.keys())): body = format_warning(warning) for landkreis in warning.landkreise: for to in self._warn_clients.get(landkreis, []): msg = aioxmpp.stanza.Message( to=aioxmpp.JID.fromstr(to), type_=aioxmpp.MessageType.CHAT) msg.body[None] = body await self._client.send(msg) def _load_config(self): # Load config config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf' config = configparser.ConfigParser() config.read(config_path) # Configure sources self._sources = sources_from_config(config) self._data_dir = config['General'].get('DataDir', '/etc/janine') self._recipients = config['General']['Recipients'].split(',') self._refresh_timeout = int(config['General']['Timeout']) self._same_domain = config['General'].get('SameDomain', 'True') == 'True' # Persistent data # Subscribed clients self._client_store = os.path.join(self._data_dir, 'clients.json') if os.path.exists(self._client_store): with open(self._client_store, 'r') as clients_file: self._warn_clients = json.loads(clients_file.read()) log.info('Clients read from disk') self._warnings_file = os.path.join(self._data_dir, 'warnings.json') if os.path.exists(self._warnings_file): with open(self._warnings_file, 'r') as warnings_file: self._warnings0 = list(map(stub_warning, json.loads(warnings_file.read()))) log.info('Warnings read from disk') # Landkreise self._channels = [] channels = {} self._channel_file = os.path.join(self._data_dir, 'channels.json') if not os.path.exists(self._channel_file): log.info('Requesting search channels') req = requests.get(MiscDataSources.channels()) channels = json.loads(req.text) self._channels = list(map(lambda key: channels[key].get('NAME', ''), channels.keys())) try: with open(self._channel_file, 'w') as channel_file: channel_file.write(json.dumps(self._channels)) except Exception as err: log.error('Failed to cache channel data:') log.error(str(err)) else: with open(self._channel_file, 'r') as channel_file: self._channels = json.loads(channel_file.read()) log.info('Search channels read from disk') # Bot Config self._jid = aioxmpp.JID.fromstr(config['Bot']['JID']) self._password = config['Bot']['Password'] self._avatar = config['Bot'].get('Avatar', None) self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen') def main(): ''' Main function. ''' bot = WarningBot() loop = asyncio.get_event_loop() loop.run_until_complete(bot.connect()) loop.close() if __name__ == '__main__': main()