185 lines
6.2 KiB
Python
185 lines
6.2 KiB
Python
import sys
|
|
import json
|
|
import datetime
|
|
import configparser
|
|
import threading
|
|
import logging
|
|
import signal
|
|
import asyncio
|
|
from collections import namedtuple
|
|
|
|
import aioxmpp
|
|
from aioxmpp.structs import PresenceShow
|
|
|
|
import requests
|
|
|
|
from janine.utils import find_one, find_all
|
|
from janine.sources import sources_from_config
|
|
|
|
log = logging.getLogger('janine')
|
|
|
|
Warning = namedtuple('Warning', ['id',
|
|
'sent',
|
|
'effective_from',
|
|
'expires',
|
|
'urgency',
|
|
'sender',
|
|
'headline',
|
|
'description',
|
|
'instruction'])
|
|
|
|
def to_warning(data):
|
|
'''
|
|
Returns a Warning given the raw data
|
|
'''
|
|
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
|
|
return Warning(id=data['identifier'],
|
|
sent=data['sent'],
|
|
# Not all items have to have those
|
|
effective_from=info.get('effective', 'N/A'),
|
|
# Not all items have to have those
|
|
expires=info.get('expires', 'N/A'),
|
|
urgency=info['urgency'],
|
|
# Not all items have to have those
|
|
sender=info.get('senderName', 'N/A'),
|
|
headline=info['headline'],
|
|
description=info['description'],
|
|
instruction=info.get('instruction', 'N/A'))
|
|
|
|
def landkreis_filter(kreis, item):
|
|
'''
|
|
Returns True when the item is relevant to the Landkreis @kreis
|
|
'''
|
|
info = find_one(lambda e: 'area' in e.keys(), item['info'])
|
|
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
|
|
|
|
# Note: Some items may have multiple Landkreise
|
|
values = list(map(lambda e: e['valueName'], geocode['geocode']))
|
|
return kreis in values
|
|
|
|
def parse_data(text, filter_):
|
|
'''
|
|
Reads the remote response, parses it and returns a list of warnings.
|
|
'''
|
|
data = json.loads(text)
|
|
return [to_warning(raw) for raw in find_all(filter_, data)]
|
|
|
|
class WarningBot:
|
|
def __init__(self):
|
|
self._warnings0 = []
|
|
self._warnings1 = []
|
|
self._client = None
|
|
self._refresh_timeout = 630 # 15min
|
|
|
|
self._load_config()
|
|
|
|
async def connect(self):
|
|
'''
|
|
Starts the "event loop" of the bot
|
|
'''
|
|
self._client = aioxmpp.PresenceManagedClient(
|
|
self._jid,
|
|
aioxmpp.make_security_layer(self._password))
|
|
|
|
async with self._client.connected() as stream:
|
|
logging.info('Client connected to server')
|
|
|
|
# In case you want a nice avatar
|
|
if self._avatar:
|
|
log.info('Setting avatar')
|
|
with open(self._avatar, 'rb') as f:
|
|
image_data = f.read()
|
|
|
|
avatar_set = aioxmpp.avatar.AvatarSet()
|
|
avatar_set.add_avatar_image('image/png', image_bytes=image_data)
|
|
await selg.avatar.publish_avatar_set(avatar_set)
|
|
|
|
# Set some presence information
|
|
self._client.set_presence(
|
|
aioxmpp.PresenceState(available=True,
|
|
show=PresenceShow.DND),
|
|
'Gibt dir im Notfall Bescheid')
|
|
|
|
# Start our fetch-send loop
|
|
# NOTE: Originally, I wanted to use a cronjob and
|
|
# signal.signal(...) for this but you can't
|
|
# use async in event handlers
|
|
loop = asyncio.get_event_loop()
|
|
periodic = loop.create_task(self._periodic_requests())
|
|
logging.info('Periodic ticker started')
|
|
await periodic
|
|
|
|
async def _periodic_requests(self):
|
|
'''
|
|
"Executes" every self._refresh_timeout seconds to fetch all
|
|
configured warnings and send them to the users, if there are
|
|
any new ones.
|
|
'''
|
|
while True:
|
|
logging.debug('Refreshing warning list')
|
|
await self._fetch_warnings()
|
|
await asyncio.sleep(self._refresh_timeout)
|
|
|
|
def _filter_func(self):
|
|
return lambda item: landkreis_filter(self._landkreis, item)
|
|
|
|
async def _fetch_warnings(self):
|
|
'''
|
|
Fetches all warnings and tries to find new ones
|
|
to send notifications.
|
|
'''
|
|
self._warnings1 = self._warnings0
|
|
self._warnings0 = []
|
|
for source in self._sources:
|
|
req = requests.get(source)
|
|
self._warnings0 = parse_data(req.text, self._filter_func())
|
|
|
|
# Find new warnings and send the new ones
|
|
ids = map(lambda x: x.id, self._warnings1)
|
|
for warning in self._warnings0:
|
|
if warning.id in ids:
|
|
continue
|
|
|
|
await self._send_notification(warning)
|
|
|
|
async def _send_notification(self, warning):
|
|
'''
|
|
Send a warning to all the recipients
|
|
'''
|
|
body = '*{}*\n\n{}'.format(warning.headline, warning.description)
|
|
# Smells like script injection, but okay
|
|
body = body.replace('<br>', '\n')
|
|
|
|
for recipient in self._recipients:
|
|
msg = aioxmpp.stanza.Message(
|
|
to=aioxmpp.JID.fromstr(recipient),
|
|
type_=aioxmpp.MessageType.CHAT)
|
|
msg.body[None] = body
|
|
await self._client.send(msg)
|
|
|
|
def _load_config(self):
|
|
# Load config
|
|
config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf'
|
|
config = configparser.ConfigParser()
|
|
config.read(config_path)
|
|
|
|
# Configure sources
|
|
self._sources = sources_from_config(config)
|
|
self._landkreis = config['General']['Landkreis']
|
|
self._recipients = config['General']['Recipients'].split(',')
|
|
self._refresh_timeout = int(config['General']['Timeout'])
|
|
|
|
# Bot Config
|
|
self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
|
|
self._password = config['Bot']['Password']
|
|
self._avatar = config['Bot'].get('Avatar', None)
|
|
|
|
def main():
|
|
bot = WarningBot()
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(bot.connect())
|
|
loop.close()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|