meta: Initial commit

This commit is contained in:
2020-08-22 15:09:59 +02:00
commit e7a3743a84
9 changed files with 995 additions and 0 deletions

0
janine/__init__.py Normal file
View File

184
janine/janine.py Normal file
View File

@@ -0,0 +1,184 @@
import sys
import json
import datetime
import configparser
import threading
import logging
import signal
import asyncio
from collections import namedtuple
import aioxmpp
from aioxmpp.structs import PresenceShow
import requests
from janine.utils import find_one, find_all, dict_get_fallback
from janine.sources import sources_from_config
log = logging.getLogger('janine')
Warning = namedtuple('Warning', ['id',
'sent',
'effective_from',
'expires',
'urgency',
'sender',
'headline',
'description',
'instruction'])
def to_warning(data):
'''
Returns a Warning given the raw data
'''
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
return Warning(id=data['identifier'],
sent=data['sent'],
# Not all items have to have those
effective_from=dict_get_fallback(info, 'effective', 'N/A'),
# Not all items have to have those
expires=dict_get_fallback(info, 'expires', 'N/A'),
urgency=info['urgency'],
# Not all items have to have those
sender=dict_get_fallback(info, 'senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=dict_get_fallback(info, 'instruction', 'N/A'))
def landkreis_filter(kreis, item):
'''
Returns True when the item is relevant to the Landkreis @kreis
'''
info = find_one(lambda e: 'area' in e.keys(), item['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
# Note: Some items may have multiple Landkreise
values = list(map(lambda e: e['valueName'], geocode['geocode']))
return kreis in values
def parse_data(text, filter_):
'''
Reads the remote response, parses it and returns a list of warnings.
'''
data = json.loads(text)
return [to_warning(raw) for raw in find_all(filter_, data)]
class WarningBot:
def __init__(self):
self._warnings0 = []
self._warnings1 = []
self._client = None
self._refresh_timeout = 630 # 15min
self._load_config()
async def connect(self):
'''
Starts the "event loop" of the bot
'''
self._client = aioxmpp.PresenceManagedClient(
self._jid,
aioxmpp.make_security_layer(self._password))
async with self._client.connected() as stream:
logging.info('Client connected to server')
# In case you want a nice avatar
if self._avatar:
log.info('Setting avatar')
with open(self._avatar, 'rb') as f:
image_data = f.read()
avatar_set = aioxmpp.avatar.AvatarSet()
avatar_set.add_avatar_image('image/png', image_bytes=image_data)
await selg.avatar.publish_avatar_set(avatar_set)
# Set some presence information
self._client.set_presence(
aioxmpp.PresenceState(available=True,
show=PresenceShow.DND),
'Gibt dir im Notfall Bescheid')
# Start our fetch-send loop
# NOTE: Originally, I wanted to use a cronjob and
# signal.signal(...) for this but you can't
# use async in event handlers
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._periodic_requests())
logging.info('Periodic ticker started')
await periodic
async def _periodic_requests(self):
'''
"Executes" every self._refresh_timeout seconds to fetch all
configured warnings and send them to the users, if there are
any new ones.
'''
while True:
logging.debug('Refreshing warning list')
await self._fetch_warnings()
await asyncio.sleep(self._refresh_timeout)
def _filter_func(self):
return lambda item: landkreis_filter(self._landkreis, item)
async def _fetch_warnings(self):
'''
Fetches all warnings and tries to find new ones
to send notifications.
'''
self._warnings1 = self._warnings0
self._warnings0 = []
for source in self._sources:
req = requests.get(source)
self._warnings0 = parse_data(req.text, self._filter_func())
# Find new warnings and send the new ones
ids = map(lambda x: x.id, self._warnings1)
for warning in self._warnings0:
if warning.id in ids:
continue
await self._send_notification(warning)
async def _send_notification(self, warning):
'''
Send a warning to all the recipients
'''
body = '*{}*\n\n{}'.format(warning.headline, warning.description)
# Smells like script injection, but okay
body = body.replace('<br>', '\n')
for recipient in self._recipients:
msg = aioxmpp.stanza.Message(
to=aioxmpp.JID.fromstr(recipient),
type_=aioxmpp.MessageType.CHAT)
msg.body[None] = body
await self._client.send(msg)
def _load_config(self):
# Load config
config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf'
config = configparser.ConfigParser()
config.read(config_path)
# Configure sources
self._sources = sources_from_config(config)
self._landkreis = config['General']['Landkreis']
self._recipients = config['General']['Recipients'].split(',')
self._refresh_timeout = int(config['General']['Timeout'])
# Bot Config
self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
self._password = config['Bot']['Password']
self._avatar = dict_get_fallback(config['Bot'], 'Avatar', None)
def main():
bot = WarningBot()
loop = asyncio.get_event_loop()
loop.run_until_complete(bot.connect())
loop.close()
if __name__ == '__main__':
main()

41
janine/sources.py Normal file
View File

@@ -0,0 +1,41 @@
from janine.utils import dict_get_fallback
class WarningSources:
'''
A collection of sources of the BBK
'''
@staticmethod
def bbk_dwd():
return 'https://warnung.bund.de/bbk.dwd/unwetter.json'
@staticmethod
def bbk_mowas():
return 'https://warnung.bund.de/bbk.mowas/gefahrendurchsagen.json'
@staticmethod
def bbk_biwapp():
return 'https://warnung.bund.de/bbk.biwapp/warnmeldungen.json'
@staticmethod
def bbk_ihp():
return 'https://warnung.bund.de/bbk.lhp/hochwassermeldungen.json'
@staticmethod
def source_by_name(name):
return {
'IHP': WarningSources.bbk_ihp(),
'DWD': WarningSources.bbk_dwd(),
'MOWAS': WarningSources.bbk_mowas(),
'BIWAPP': WarningSources.bbk_biwapp()
}[name]
def sources_from_config(config):
sources = []
for module in ('IHP', 'DWD', 'BIWAPP', 'MOWAS'):
option = dict_get_fallback(config['General'],
module,
'n')
if option == 'y':
sources.append(WarningSources.source_by_name(module))
return sources

27
janine/utils.py Normal file
View File

@@ -0,0 +1,27 @@
def find_one(func, array):
'''
Utility function
Return the first element in array for which func returns True.
'''
for e in array:
if func(e):
return e
return None
def find_all(func, array):
'''
Utility function
Return all elements in array for which func returns True.
'''
return [e for e in array if func(e)]
def dict_get_fallback(d, key, fallback):
'''
Utility function
Returns d[key] if key exists. Else, return fallback
'''
return d[key] if key in d.keys() else fallback