407 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			407 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| '''
 | |
| This file is part of JANINE.
 | |
| 
 | |
| JANINE is free software: you can redistribute it and/or modify
 | |
| it under the terms of the GNU General Public License as published by
 | |
| the Free Software Foundation, either version 3 of the License, or
 | |
| (at your option) any later version.
 | |
| 
 | |
| JANINE is distributed in the hope that it will be useful,
 | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| GNU General Public License for more details.
 | |
| 
 | |
| You should have received a copy of the GNU General Public License
 | |
| along with JANINE.  If not, see <https://www.gnu.org/licenses/>.
 | |
| '''
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import json
 | |
| import configparser
 | |
| import logging
 | |
| import asyncio
 | |
| from collections import namedtuple
 | |
| from optparse import OptionParser
 | |
| from stat import S_IRUSR, S_IWUSR
 | |
| 
 | |
| import aioxmpp
 | |
| from aioxmpp.structs import PresenceShow
 | |
| 
 | |
| import requests
 | |
| 
 | |
| from janine.utils import find_one, make_msg, format_warning
 | |
| from janine.sources import sources_from_config, MiscDataSources
 | |
| 
 | |
| log = logging.getLogger('janine')
 | |
| log.setLevel(logging.INFO)
 | |
| log.addHandler(logging.StreamHandler())
 | |
| 
 | |
| Warning_ = namedtuple('Warning_', ['id',
 | |
|                                    'sent',
 | |
|                                    'effective_from',
 | |
|                                    'expires',
 | |
|                                    'urgency',
 | |
|                                    'sender',
 | |
|                                    'headline',
 | |
|                                    'description',
 | |
|                                    'instruction',
 | |
|                                    'landkreise'])
 | |
| 
 | |
| 
 | |
| def stub_warning(id_):
 | |
|     '''
 | |
|     Returns a stubbed warning for loading warnings from disk.
 | |
|     The only real attribute is the @id_ .
 | |
|     '''
 | |
|     return Warning_(id=id_,
 | |
|                     sent='',
 | |
|                     effective_from='',
 | |
|                     expires='',
 | |
|                     urgency='',
 | |
|                     sender='',
 | |
|                     headline='',
 | |
|                     description='',
 | |
|                     instruction='',
 | |
|                     landkreise=[])
 | |
| 
 | |
| def to_warning(data):
 | |
|     '''
 | |
|     Returns a Warning given the raw data
 | |
|     '''
 | |
|     info = find_one(lambda x: 'headline' in x.keys(), data['info'])
 | |
|     return Warning_(id=data['identifier'],
 | |
|                     sent=data['sent'],
 | |
|                     # Not all items have to have those
 | |
|                     effective_from=info.get('effective', 'N/A'),
 | |
|                     # Not all items have to have those
 | |
|                     expires=info.get('expires', 'N/A'),
 | |
|                     urgency=info['urgency'],
 | |
|                     # Not all items have to have those
 | |
|                     sender=info.get('senderName', 'N/A'),
 | |
|                     headline=info['headline'],
 | |
|                     description=info['description'],
 | |
|                     instruction=info.get('instruction', ''),
 | |
|                     landkreise=get_landkreise(data))
 | |
| 
 | |
| def get_landkreise(data):
 | |
|     '''
 | |
|     Returns the list of Landkreise relevant to the warning in @data
 | |
|     '''
 | |
|     info = find_one(lambda e: 'area' in e.keys(), data['info'])
 | |
|     geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
 | |
| 
 | |
|     # Note: Some items may have multiple Landkreise
 | |
|     return list(map(lambda e: e['valueName'], geocode['geocode']))
 | |
| 
 | |
| def parse_data(text):
 | |
|     '''
 | |
|     Reads the remote response, parses it and returns a list of warnings.
 | |
|     '''
 | |
|     data = json.loads(text)
 | |
|     return [to_warning(el) for el in data]
 | |
| 
 | |
| class WarningBot:
 | |
|     '''
 | |
|     This class represents the actual bot. The only things
 | |
|     to be done is call connect() after creating an instance.
 | |
|     '''
 | |
|     def __init__(self, config_file):
 | |
|         self._warnings0 = []
 | |
|         self._warnings1 = []
 | |
|         self._client = None
 | |
|         self._warn_clients = {}
 | |
|         self._refresh_timeout = 630 # 15min
 | |
| 
 | |
|         # Configuration stuff
 | |
|         self._data_dir = ''
 | |
|         self._client_store = ''
 | |
|         self._warning_store = ''
 | |
| 
 | |
|         self._load_config(config_file)
 | |
| 
 | |
|     async def connect(self):
 | |
|         '''
 | |
|         Starts the "event loop" of the bot
 | |
|         '''
 | |
|         self._client = aioxmpp.PresenceManagedClient(
 | |
|             self._jid,
 | |
|             aioxmpp.make_security_layer(self._password))
 | |
| 
 | |
|         log.debug('Connecting to server')
 | |
|         async with self._client.connected():
 | |
|             log.info('Client connected to server')
 | |
| 
 | |
|             # In case you want a nice avatar
 | |
|             if self._avatar:
 | |
|                 log.debug('Setting avatar')
 | |
|                 with open(self._avatar, 'rb') as avatar_file:
 | |
|                     image_data = avatar_file.read()
 | |
| 
 | |
|                 avatar_set = aioxmpp.avatar.AvatarSet()
 | |
|                 avatar_set.add_avatar_image('image/png', image_bytes=image_data)
 | |
|                 await (self._client.summon(aioxmpp.avatar.AvatarService)
 | |
|                        .publish_avatar_set(avatar_set))
 | |
|                 log.info('Avatar set')
 | |
| 
 | |
|             # Set some presence information
 | |
|             self._client.set_presence(
 | |
|                 aioxmpp.PresenceState(available=True,
 | |
|                                       show=PresenceShow.CHAT),
 | |
|                 self._status)
 | |
| 
 | |
|             # Enable Carbons
 | |
|             await self._client.summon(aioxmpp.CarbonsClient).enable()
 | |
|             log.info('Message carbons enabled')
 | |
| 
 | |
|             # Register the message handler
 | |
|             self._client.stream.register_message_callback(
 | |
|                 aioxmpp.MessageType.CHAT,
 | |
|                 None,
 | |
|                 self._handle_message)
 | |
|             log.info('Message handler registered')
 | |
| 
 | |
|             # Start our fetch-send loop
 | |
|             # NOTE: Originally, I wanted to use a cronjob and
 | |
|             #       signal.signal(...) for this but you can't
 | |
|             #       use async in event handlers
 | |
|             loop = asyncio.get_event_loop()
 | |
|             periodic = loop.create_task(self._periodic_requests())
 | |
|             log.info('Periodic ticker started')
 | |
|             await periodic
 | |
| 
 | |
|     def __is_message_valid(self, msg):
 | |
|         '''
 | |
|         Returns True on messages we want to handle. False otherwise.
 | |
|         '''
 | |
|         if msg.type_ != aioxmpp.MessageType.CHAT:
 | |
|             return False
 | |
| 
 | |
|         if not msg.body:
 | |
|             return False
 | |
| 
 | |
|         if msg.from_.domain != self._jid.domain and self._same_domain:
 | |
|             return False
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     def _handle_message(self, msg):
 | |
|         # Handle cases we don't want to deal with
 | |
|         if not self.__is_message_valid(msg):
 | |
|             return
 | |
| 
 | |
|         # Send a deliverability receipt
 | |
|         receipt = aioxmpp.mdr.compose_receipt(msg)
 | |
|         self._client.enqueue(receipt)
 | |
| 
 | |
|         cmd_parts = str(msg.body.any()).split(' ')
 | |
|         cmd = cmd_parts[0].lower()
 | |
| 
 | |
|         if cmd == 'subscribe':
 | |
|             # Do we have a landkreis?
 | |
|             if len(cmd_parts) < 2:
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Du hast keinen Landkreis angegeben'))
 | |
|                 return
 | |
| 
 | |
|             # Check if the entered Landkreis is valid
 | |
|             landkreis = ' '.join(cmd_parts[1:])
 | |
|             if not landkreis in self._channels:
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Der angegebene Landkreis ist ungültig'))
 | |
|                 return
 | |
| 
 | |
|             if landkreis not in self._warn_clients.keys():
 | |
|                 self._warn_clients[landkreis] = []
 | |
|             self._warn_clients[landkreis].append(str(msg.from_.bare()))
 | |
| 
 | |
|             self._client.enqueue(make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body=f'Du erhälst nun Nachrichten zu {landkreis} von mir'))
 | |
| 
 | |
|             with open(self._client_store, 'w') as clients_file:
 | |
|                 clients_file.write(json.dumps(self._warn_clients))
 | |
| 
 | |
|             # Send all known warnings for the landkreis to the user
 | |
|             for warning in self._warnings0:
 | |
|                 if landkreis in warning.landkreise:
 | |
|                     body = format_warning(warning)
 | |
|                     self._client.enqueue(make_msg(
 | |
|                         to=msg.from_,
 | |
|                         body=body))
 | |
| 
 | |
|         elif cmd == 'unsubscribe':
 | |
|             # Do we have a landkreis?
 | |
|             if len(cmd_parts) < 2:
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body='Du hast keinen Landkreis angegeben'))
 | |
|                 return
 | |
| 
 | |
|             landkreis = ' '.join(cmd_parts[1:])
 | |
|             if landkreis not in self._warn_clients:
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du hast {landkreis} nicht abonniert'))
 | |
|                 return
 | |
| 
 | |
|             if str(msg.from_.bare()) in self._warn_clients[landkreis]:
 | |
|                 filter_ = lambda x: x != str(msg.from_.bare())
 | |
|                 self._warn_clients[landkreis] = list(filter(filter_,
 | |
|                                                             self._warn_clients[landkreis]))
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir'))
 | |
| 
 | |
|                 if len(self._warn_clients[landkreis]) == 0:
 | |
|                     del self._warn_clients[landkreis]
 | |
|             else:
 | |
|                 self._client.enqueue(make_msg(
 | |
|                     to=msg.from_,
 | |
|                     body=f'Du hast {landkreis} nicht abonniert'))
 | |
|         elif cmd == 'help':
 | |
|             body = '''Verfügbare Befehle:
 | |
| 
 | |
| subscribe <Landkreis> - Abonniere einen Landkreis
 | |
| unsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis
 | |
| help - Gebe diese Hilfe aus'''
 | |
|             self._client.enqueue(make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body=body))
 | |
|         else:
 | |
|             self._client.enqueue(make_msg(
 | |
|                 to=msg.from_,
 | |
|                 body='Diesen Befehl kenne ich nicht... Mit "help" kannst du alle Befehle sehen, die ich kenne.'))
 | |
| 
 | |
|     async def _periodic_requests(self):
 | |
|         '''
 | |
|         "Executes" every self._refresh_timeout seconds to fetch all
 | |
|         configured warnings and send them to the users, if there are
 | |
|         any new ones.
 | |
|         '''
 | |
|         while True:
 | |
|             log.debug('Refreshing warning list')
 | |
|             await self._fetch_warnings()
 | |
| 
 | |
|             # Flush the warnings to disk
 | |
|             ids = [x.id for x in self._warnings0]
 | |
|             with open(self._warnings_file, 'w') as warnings_file:
 | |
|                 warnings_file.write(json.dumps(ids))
 | |
| 
 | |
|             await asyncio.sleep(self._refresh_timeout)
 | |
| 
 | |
|     async def _fetch_warnings(self):
 | |
|         '''
 | |
|         Fetches all warnings and tries to find new ones
 | |
|         to send notifications.
 | |
|         '''
 | |
|         self._warnings1 = self._warnings0
 | |
|         self._warnings0 = []
 | |
|         for source in self._sources:
 | |
|             try:
 | |
|                 req = requests.get(source)
 | |
|                 self._warnings0 += parse_data(req.text)
 | |
|             except urllib3.exceptions.MaxRetryError:
 | |
|                 log.warn('Connection timeout for request to %s', source)
 | |
|                 continue 
 | |
| 
 | |
|         # Find new warnings and send the new ones
 | |
|         ids = [x.id for x in self._warnings1]
 | |
|         for warning in self._warnings0:
 | |
|             if warning.id in ids:
 | |
|                 continue
 | |
| 
 | |
|             # We need to use a set as a warning can apply to more than one
 | |
|             # Landkreis
 | |
|             if set(warning.landkreise).intersection(self._warn_clients.keys()):
 | |
|                 body = format_warning(warning)
 | |
|                 for landkreis in warning.landkreise:
 | |
|                     for to in self._warn_clients.get(landkreis, []):
 | |
|                         await self._client.send(make_msg(
 | |
|                             aioxmpp.JID.fromstr(to),
 | |
|                             body))
 | |
| 
 | |
|     def _load_config(self, config_file):
 | |
|         # Load config
 | |
|         config = configparser.ConfigParser()
 | |
|         config.read(config_file)
 | |
| 
 | |
|         # Configure sources
 | |
|         self._sources = sources_from_config(config)
 | |
|         self._data_dir = config['General'].get('DataDir', '/etc/janine/data')
 | |
|         self._refresh_timeout = int(config['General']['Timeout'])
 | |
|         self._same_domain = config['General'].get('SameDomain', 'True') == 'True'
 | |
| 
 | |
|         # Persistent data
 | |
|         ## Subscribed clients
 | |
|         self._client_store = os.path.join(self._data_dir, 'clients.json')
 | |
|         if os.path.exists(self._client_store):
 | |
|             with open(self._client_store, 'r') as clients_file:
 | |
|                 self._warn_clients = json.loads(clients_file.read())
 | |
|             log.info('Clients read from disk')
 | |
|         else:
 | |
|             with open(self._client_store, 'w') as clients_file:
 | |
|                 clients_file.write('{}')
 | |
|             os.chmod(self._client_store, S_IRUSR | S_IWUSR)
 | |
|             log.info('Clients file created with 0600')
 | |
|         
 | |
|         ## Warnings
 | |
|         self._warnings_file = os.path.join(self._data_dir, 'warnings.json')
 | |
|         if os.path.exists(self._warnings_file):
 | |
|             with open(self._warnings_file, 'r') as warnings_file:
 | |
|                 self._warnings0 = list(map(stub_warning,
 | |
|                                            json.loads(warnings_file.read())))
 | |
|             log.info('Warnings read from disk')
 | |
| 
 | |
|         ## Landkreise
 | |
|         self._channels = []
 | |
|         self._channel_file = os.path.join(self._data_dir, 'channels.json')
 | |
|         if not os.path.exists(self._channel_file):
 | |
|             log.info('Requesting search channels')
 | |
|             req = requests.get(MiscDataSources.channels())
 | |
|             channels = json.loads(req.text)
 | |
|             self._channels = [channels[key].get('NAME', '') for key in channels]
 | |
| 
 | |
|             try:
 | |
|                 with open(self._channel_file, 'w') as channel_file:
 | |
|                     channel_file.write(json.dumps(self._channels))
 | |
|             except Exception as err:
 | |
|                 log.error('Failed to cache channel data:')
 | |
|                 log.error(str(err))
 | |
|         else:
 | |
|             with open(self._channel_file, 'r') as channel_file:
 | |
|                 self._channels = json.loads(channel_file.read())
 | |
|             log.info('Search channels read from disk')
 | |
| 
 | |
|         # Bot Config
 | |
|         self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
 | |
|         self._password = config['Bot']['Password']
 | |
|         self._avatar = config['Bot'].get('Avatar', None)
 | |
|         self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen')
 | |
| 
 | |
| def main():
 | |
|     '''
 | |
|     Main function.
 | |
|     '''
 | |
|     parser = OptionParser()
 | |
|     parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False)
 | |
|     (options, args) = parser.parse_args()
 | |
| 
 | |
|     if options.debug:
 | |
|         log.setLevel(logging.DEBUG)
 | |
| 
 | |
|     if len(args) != 0:
 | |
|         config_file = args[0]
 | |
|     else:
 | |
|         config_file = '/etc/janine/janine.conf'
 | |
| 
 | |
|     bot = WarningBot(config_file)
 | |
|     loop = asyncio.get_event_loop()
 | |
|     loop.run_until_complete(bot.connect())
 | |
|     loop.close()
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |