400 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			400 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| import json
 | |
| import datetime
 | |
| import configparser
 | |
| import threading
 | |
| import logging
 | |
| import signal
 | |
| import asyncio
 | |
| from collections import namedtuple
 | |
| 
 | |
| import aioxmpp
 | |
| from aioxmpp.structs import PresenceShow
 | |
| 
 | |
| import requests
 | |
| 
 | |
| from janine.utils import find_one, find_all
 | |
| from janine.sources import sources_from_config, MiscDataSources
 | |
| 
 | |
| log = logging.getLogger('janine')
 | |
| log.setLevel(logging.INFO)
 | |
| log.addHandler(logging.StreamHandler())
 | |
| 
 | |
| Warning = namedtuple('Warning', ['id',
 | |
|                                  'sent',
 | |
|                                  'effective_from',
 | |
|                                  'expires',
 | |
|                                  'urgency',
 | |
|                                  'sender',
 | |
|                                  'headline',
 | |
|                                  'description',
 | |
|                                  'instruction',
 | |
|                                  'landkreise'])
 | |
| 
 | |
| 
 | |
| def stub_warning(id_):
 | |
|     '''
 | |
|     Returns a stubbed warning for loading warnings from disk.
 | |
|     The only real attribute is the @id_ .
 | |
|     '''
 | |
|     return Warning(id=id_,
 | |
|                    sent='',
 | |
|                    effective_from='',
 | |
|                    expires='',
 | |
|                    urgency='',
 | |
|                    sender='',
 | |
|                    headline='',
 | |
|                    description='',
 | |
|                    instruction='',
 | |
|                    landkreise=[])
 | |
| 
 | |
| def to_warning(data):
 | |
|     '''
 | |
|     Returns a Warning given the raw data
 | |
|     '''
 | |
|     info = find_one(lambda x: 'headline' in x.keys(), data['info'])
 | |
|     return Warning(id=data['identifier'],
 | |
|                    sent=data['sent'],
 | |
|                    # Not all items have to have those
 | |
|                    effective_from=info.get('effective', 'N/A'),
 | |
|                    # Not all items have to have those
 | |
|                    expires=info.get('expires', 'N/A'),
 | |
|                    urgency=info['urgency'],
 | |
|                    # Not all items have to have those
 | |
|                    sender=info.get('senderName', 'N/A'),
 | |
|                    headline=info['headline'],
 | |
|                    description=info['description'],
 | |
|                    instruction=info.get('instruction', 'N/A'),
 | |
|                    landkreise=get_landkreise(data))
 | |
| 
 | |
| def get_landkreise(data):
 | |
|     '''
 | |
|     Returns the list of Landkreise relevant to the warning in @data
 | |
|     '''
 | |
|     info = find_one(lambda e: 'area' in e.keys(), data['info'])
 | |
|     geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
 | |
| 
 | |
|     # Note: Some items may have multiple Landkreise
 | |
|     return list(map(lambda e: e['valueName'], geocode['geocode']))    
 | |
| 
 | |
| def parse_data(text):
 | |
|     '''
 | |
|     Reads the remote response, parses it and returns a list of warnings.
 | |
|     '''
 | |
|     data = json.loads(text)
 | |
|     return [to_warning(el) for el in data]
 | |
| 
 | |
| class WarningBot:
 | |
|     def __init__(self):
 | |
|         self._warnings0 = []
 | |
|         self._warnings1 = []
 | |
|         self._client = None
 | |
|         self._warn_clients = {}
 | |
|         self._refresh_timeout = 630 # 15min
 | |
| 
 | |
|         # Configuration stuff
 | |
|         self._data_dir = ''
 | |
|         self._client_store = ''
 | |
|         self._warning_store = ''
 | |
| 
 | |
|         self._load_config()
 | |
| 
 | |
|     async def connect(self):
 | |
|         '''
 | |
|         Starts the "event loop" of the bot
 | |
|         '''
 | |
|         self._client = aioxmpp.PresenceManagedClient(
 | |
|                 self._jid,
 | |
|                 aioxmpp.make_security_layer(self._password))
 | |
| 
 | |
|         async with self._client.connected() as stream:
 | |
|             log.info('Client connected to server')
 | |
| 
 | |
|             # In case you want a nice avatar
 | |
|             if self._avatar:
 | |
|                 log.info('Setting avatar')
 | |
|                 with open(self._avatar, 'rb') as f:
 | |
|                     image_data = f.read()
 | |
| 
 | |
|                 avatar_set = aioxmpp.avatar.AvatarSet()
 | |
|                 avatar_set.add_avatar_image('image/png', image_bytes=image_data)
 | |
|                 await (self._client.summon(aioxmpp.avatar.AvatarService)
 | |
|                         .publish_avatar_set(avatar_set))
 | |
| 
 | |
|             # Set some presence information
 | |
|             self._client.set_presence(
 | |
|                     aioxmpp.PresenceState(available=True,
 | |
|                                           show=PresenceShow.CHAT),
 | |
|                     self._status)
 | |
| 
 | |
|             # Enable Carbons
 | |
|             await self._client.summon(aioxmpp.CarbonsClient).enable()
 | |
|             log.info('Message carbons enabled')
 | |
| 
 | |
|             # Register the message handler
 | |
|             self._client.stream.register_message_callback(
 | |
|                     aioxmpp.MessageType.CHAT,
 | |
|                     None,
 | |
|                     self._handle_message)
 | |
|             log.info('Message handler registered')
 | |
| 
 | |
|             # Start our fetch-send loop
 | |
|             # NOTE: Originally, I wanted to use a cronjob and
 | |
|             #       signal.signal(...) for this but you can't
 | |
|             #       use async in event handlers
 | |
|             loop = asyncio.get_event_loop()
 | |
|             periodic = loop.create_task(self._periodic_requests())
 | |
|             log.info('Periodic ticker started')            
 | |
|             await periodic
 | |
| 
 | |
|     def __is_message_valid(self, msg):
 | |
|         '''
 | |
|         Returns True on messages we want to handle. False otherwise.
 | |
|         '''
 | |
|         if msg.type_ != aioxmpp.MessageType.CHAT:
 | |
|             return False
 | |
|         
 | |
|         if not msg.body:
 | |
|             return False
 | |
|         
 | |
|         if msg.from_.domain != self._jid.domain and self._same_domain:
 | |
|             return False
 | |
| 
 | |
|         return True
 | |
|     
 | |
|     def __make_msg(self, to, body):
 | |
|         '''
 | |
|         Wrapper for creating a message object to enqueue or send.
 | |
|         '''
 | |
|         msg = aioxmpp.Message(
 | |
|                 type_=aioxmpp.MessageType.CHAT,
 | |
|                 to=to)
 | |
|         msg.body[None] = body
 | |
| 
 | |
|         return msg
 | |
| 
 | |
|     def _handle_message(self, msg):
 | |
|         # Handle cases we don't want to deal with
 | |
|         if not self.__is_message_valid(msg):
 | |
|             return
 | |
| 
 | |
|         cmd_parts = str(msg.body.any()).split(' ')
 | |
|         cmd = cmd_parts[0].lower()
 | |
| 
 | |
|         if cmd == 'subscribe':
 | |
|             # Do we have a landkreis?
 | |
|             if len(cmd_parts) < 2:
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Du hast keinen Landkreis angegeben')) 
 | |
|                 return
 | |
| 
 | |
|             # Check if the entered Landkreis is valid
 | |
|             landkreis = ' '.join(cmd_parts[1:])
 | |
|             if not landkreis in self._channels:
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Der angegebene Landkreis ist ungültig')) 
 | |
|                 return
 | |
|             
 | |
|             if not landkreis in self._warn_clients.keys():
 | |
|                 self._warn_clients[landkreis] = []
 | |
|             self._warn_clients[landkreis].append(str(msg.from_.bare()))
 | |
| 
 | |
|             self._client.enqueue(self.__make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body=f'Du erhälst nun Nachrichten zu {landkreis} von mir'))
 | |
| 
 | |
|             with open(self._client_store, 'w') as cf:
 | |
|                 cf.write(json.dumps(self._warn_clients))
 | |
| 
 | |
|             # Send all known warnings for the landkreis to the user
 | |
|             for warning in self._warnings0:
 | |
|                 if landkreis in warning.landkreise:
 | |
|                     body = self._format_warning(warning)
 | |
|                     self._client.enqueue(self.__make_msg(
 | |
|                         to=msg.from_,
 | |
|                         body=body))
 | |
| 
 | |
|         elif cmd == 'unsubscribe':
 | |
|             # Do we have a landkreis?
 | |
|             if len(cmd_parts) < 2:
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Du hast keinen Landkreis angegeben')) 
 | |
|                 return
 | |
|             
 | |
|             landkreis = ' '.join(cmd_parts[1:])
 | |
|             if not landkreis in self._warn_clients:
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du hast {landkreis} nicht abonniert')) 
 | |
|                 return
 | |
| 
 | |
|             if str(msg.from_.bare()) in self._warn_clients[landkreis]:
 | |
|                 filter_ = lambda x: x != str(msg.from_.bare())
 | |
|                 self._warn_clients[landkreis] = list(filter(filter_,
 | |
|                                                             self._warn_clients[landkreis]))
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir')) 
 | |
|         
 | |
|                 if len(self._warn_clients[landkreis]) == 0:
 | |
|                     del self._warn_clients[landkreis]
 | |
|             else:
 | |
|                 self._client.enqueue(self.__make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du hast {landkreis} nicht abonniert'))
 | |
|         elif cmd == 'help':
 | |
|             body = 'Verfügbare Befehle:\n\nsubscribe <Landkreis> - Abonniere einen Landkreis\nunsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis\nhelp - Gebe diese Hilfe aus'
 | |
|             self._client.enqueue(self.__make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body=body))
 | |
|         else:
 | |
|             self._client.enqueue(self.__make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body='Diesen Befehl kenne ich nicht... Mit "help" kannst du alle Befehle sehen, die ich kenne.'))
 | |
| 
 | |
|     async def _periodic_requests(self):
 | |
|         '''
 | |
|         "Executes" every self._refresh_timeout seconds to fetch all
 | |
|         configured warnings and send them to the users, if there are
 | |
|         any new ones.
 | |
|         '''
 | |
|         while True:
 | |
|             log.debug('Refreshing warning list')
 | |
|             await self._fetch_warnings()
 | |
|             
 | |
|             # Flush the warnings to disk
 | |
|             ids = list(map(lambda x: x.id, self._warnings0))
 | |
|             with open(self._warnings_file, 'w') as wf:
 | |
|                 wf.write(json.dumps(ids))
 | |
| 
 | |
|             await asyncio.sleep(self._refresh_timeout)
 | |
| 
 | |
|     async def _fetch_warnings(self):
 | |
|         '''
 | |
|         Fetches all warnings and tries to find new ones
 | |
|         to send notifications.
 | |
|         '''
 | |
|         self._warnings1 = self._warnings0
 | |
|         self._warnings0 = []
 | |
|         for source in self._sources:
 | |
|             req = requests.get(source)
 | |
|             self._warnings0 = parse_data(req.text)
 | |
|         
 | |
|         # Find new warnings and send the new ones
 | |
|         ids = map(lambda x: x.id, self._warnings1)
 | |
|         for warning in self._warnings0:
 | |
|             if warning.id in ids:
 | |
|                 continue
 | |
|             
 | |
|             if len(set(warning.landkreise).intersection(self._warn_clients.keys())):
 | |
|                 body = self._format_warning(warning)
 | |
|                 for landkreis in warning.landkreise:
 | |
|                     for to in self._warn_clients.get(landkreis, []):
 | |
|                         msg = aioxmpp.stanza.Message(
 | |
|                             to=aioxmpp.JID.fromstr(to),
 | |
|                             type_=aioxmpp.MessageType.CHAT)
 | |
|                         msg.body[None] = body
 | |
|                         await self._client.send(msg)
 | |
| 
 | |
|     def __time_format(self, time_str):
 | |
|         '''
 | |
|         Reformat ISO style time data to a more
 | |
|         readable format.
 | |
|         '''
 | |
|         date = None
 | |
|         try:
 | |
|             date = datetime.datetime.fromisoformat(time_str)
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         if not date:
 | |
|             return time_str
 | |
| 
 | |
|         return '{}.{}.{} {}:{}'.format(date.day,
 | |
|                                        date.month,
 | |
|                                        date.year,
 | |
|                                        date.hour,
 | |
|                                        date.minute)
 | |
| 
 | |
|     def _format_warning(self, warning):
 | |
|         '''
 | |
|         Send a warning to all the recipients
 | |
|         '''
 | |
|         # Reformat the message a bit
 | |
|         effective_time = self.__time_format(warning.effective_from)
 | |
|         expiry_time = self.__time_format(warning.expires)
 | |
|         body = f'*{warning.headline}*\n({effective_time} bis {expiry_time})\n\n{warning.description}'
 | |
| 
 | |
|         # Smells like script injection, but okay
 | |
|         body = body.replace('<br>', '\n')
 | |
|         body = body.replace('<br/>', '\n')
 | |
|         return body
 | |
| 
 | |
|     def _load_config(self):
 | |
|         # Load config
 | |
|         config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf'
 | |
|         config = configparser.ConfigParser()
 | |
|         config.read(config_path)
 | |
|     
 | |
|         # Configure sources
 | |
|         self._sources = sources_from_config(config)
 | |
|         self._data_dir = config['General'].get('DataDir', '/etc/janine')
 | |
|         self._recipients = config['General']['Recipients'].split(',')
 | |
|         self._refresh_timeout = int(config['General']['Timeout'])
 | |
|         self._same_domain = config['General'].get('SameDomain', 'True') == 'True'
 | |
|         
 | |
|         # Persistent data
 | |
|         # Subscribed clients
 | |
|         self._client_store = os.path.join(self._data_dir, 'clients.json')
 | |
|         if os.path.exists(self._client_store):
 | |
|             with open(self._client_store, 'r') as cf:
 | |
|                 self._warn_clients = json.loads(cf.read())
 | |
|             log.info('Clients read from disk')
 | |
|   
 | |
|         self._warnings_file = os.path.join(self._data_dir, 'warnings.json')
 | |
|         if os.path.exists(self._warnings_file):
 | |
|             with open(self._warnings_file, 'r') as wf:
 | |
|                 self._warnings0 = list(map(stub_warning, json.loads(wf.read())))
 | |
|             log.info('Warnings read from disk')
 | |
| 
 | |
|         # Landkreise
 | |
|         self._channels = []
 | |
|         channels = {}
 | |
|         channel_file = os.path.join(self._data_dir, 'channels.json')
 | |
|         if not os.path.exists(channel_file):
 | |
|             log.info('Requesting search channels')
 | |
|             req = requests.get(MiscDataSources.channels())
 | |
|             channels = json.loads(req.text)
 | |
|             self._channels = list(map(lambda key: channels[key].get('NAME', ''),
 | |
|                                       channels.keys())) 
 | |
| 
 | |
|             try:
 | |
|                 with open(channel_file, 'w') as f:
 | |
|                     f.write(json.dumps(self._channels))
 | |
|             except Exception as err:
 | |
|                 log.error('Failed to cache channel data:')
 | |
|                 log.error(str(err))
 | |
|         else:
 | |
|             with open(channel_file, 'r') as f:
 | |
|                 self._channels = json.loads(f.read())
 | |
|             log.info('Search channels read from disk')
 | |
| 
 | |
|         # Bot Config
 | |
|         self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
 | |
|         self._password = config['Bot']['Password']
 | |
|         self._avatar = config['Bot'].get('Avatar', None)
 | |
|         self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen')
 | |
| 
 | |
| def main():
 | |
|     bot = WarningBot()
 | |
|     loop = asyncio.get_event_loop()
 | |
|     loop.run_until_complete(bot.connect())
 | |
|     loop.close()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |