feat: Allow subscribtion to Landkreise

This commit is contained in:
PapaTutuWawa 2020-09-17 15:32:12 +02:00
parent e130de8f92
commit 1b36a13993
2 changed files with 139 additions and 40 deletions

View File

@ -3,7 +3,6 @@ IHP = y # Hochwasserwarnungen
DWD = y # Unwetterwarnungen
MOWAS = y # Gefahrendurchsagen
BIWAPP = y # Warnmeldungen
Landkreis = Stuttgart # Warnungen für diesen Landkreis weiterleiten
Recipients = some.user@xmpp.server,other@xmpp.server # Empfänger
Timeout=630 # Zeit in Sekunden nach welcher nach neuen Warnungen geschaut wird
DataDir = /etc/janine/data # Verzeichnis für persistente Daten

View File

@ -28,7 +28,8 @@ Warning = namedtuple('Warning', ['id',
'sender',
'headline',
'description',
'instruction'])
'instruction',
'landkreise'])
def to_warning(data):
'''
@ -46,31 +47,32 @@ def to_warning(data):
sender=info.get('senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=info.get('instruction', 'N/A'))
instruction=info.get('instruction', 'N/A'),
landkreise=get_landkreise(data))
def landkreis_filter(kreis, item):
def get_landkreise(data):
'''
Returns True when the item is relevant to the Landkreis @kreis
Returns the list of Landkreise relevant to the warning in @data
'''
info = find_one(lambda e: 'area' in e.keys(), item['info'])
info = find_one(lambda e: 'area' in e.keys(), data['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
# Note: Some items may have multiple Landkreise
values = list(map(lambda e: e['valueName'], geocode['geocode']))
return kreis in values
return list(map(lambda e: e['valueName'], geocode['geocode']))
def parse_data(text, filter_):
def parse_data(text):
'''
Reads the remote response, parses it and returns a list of warnings.
'''
data = json.loads(text)
return [to_warning(raw) for raw in find_all(filter_, data)]
return [to_warning(el) for el in data]
class WarningBot:
def __init__(self):
self._warnings0 = []
self._warnings1 = []
self._client = None
self._warn_clients = {}
self._refresh_timeout = 630 # 15min
# Configuration stuff
@ -87,30 +89,25 @@ class WarningBot:
aioxmpp.make_security_layer(self._password))
# First we check if the entered Landkreis is valid
self._channels = []
channels = {}
channel_file = os.path.join(self._data_dir, 'channels.json')
if not os.path.exists(channel_file):
log.debug('Requesting search channels')
req = requests.get(MiscDataSources.channels())
channels = json.loads(req.text)
self._channels = list(map(lambda key: channels[key].get('NAME', ''),
channels.keys()))
try:
with open(channel_file, 'w') as f:
f.write(json.dumps(channels))
f.write(json.dumps(self._channels))
except Exception as err:
log.error('Failed to cache channel data:')
log.error(str(err))
else:
with open(channel_file, 'r') as f:
channels = json.loads(f.read())
for key in channels.keys():
if channels[key].get('NAME', '') == self._landkreis:
log.info('Landkreis valid')
break
else:
log.error('Invalid Landkreis')
return
self._channels = json.loads(f.read())
async with self._client.connected() as stream:
logging.info('Client connected to server')
@ -130,7 +127,14 @@ class WarningBot:
aioxmpp.PresenceState(available=True,
show=PresenceShow.DND),
self._status)
# Register the message handler
self._client.stream.register_message_callback(
aioxmpp.MessageType.CHAT,
None,
self._handle_message)
logging.info('Message handler registered')
# Start our fetch-send loop
# NOTE: Originally, I wanted to use a cronjob and
# signal.signal(...) for this but you can't
@ -140,6 +144,100 @@ class WarningBot:
logging.info('Periodic ticker started')
await periodic
def __is_message_valid(self, msg):
if msg.type_ != aioxmpp.MessageType.CHAT:
return False
if not msg.body:
return False
# TODO: Make this configurable
if msg.from_.domain != self._jid.domain:
return False
return True
def __make_msg(self, to, body):
msg = aioxmpp.Message(
type_=aioxmpp.MessageType.CHAT,
to=to)
msg.body[None] = body
return msg
def _handle_message(self, msg):
# Handle cases we don't want to deal with
if not self.__is_message_valid(msg):
return
cmd_parts = str(msg.body.any()).split(' ')
cmd = cmd_parts[0].lower()
if cmd == 'subscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
# Check if the entered Landkreis is valid
landkreis = ' '.join(cmd_parts[1:])
if not landkreis in self._channels:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Der angegebene Landkreis ist ungültig'))
return
if not landkreis in self._warn_clients.keys():
self._warn_clients[landkreis] = []
self._warn_clients[landkreis].append(str(msg.from_))
# TODO: Flush self._warn_clients to disk
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du erhälst nun Nachrichten zu {landkreis} von mir'))
# Send all known warnings for the landkreis to the user
for warning in self._warnings0:
if landkreis in warning.landkreise:
body = self._format_warning(warning)
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=body))
elif cmd == 'unsubscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
landkreis = ' '.join(cmd_parts[1:])
if not landreis in self._warn_clients.keys():
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
return
if str(msg.from_) in self._warn_clients[landkreis]:
filter_ = lambda x: x != str(msg.from_)
self._warn_clients[landkreis] = list(filter(filter_,
self._warn_clients[landkreis]))
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir'))
else:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
elif cmd == 'help':
body = 'Verfügbare Befehle:\n\nsubscribe <Landkreis> - Abonniere einen Landkreis\nunsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis\nhelp - Gebe diese Hilfe aus'
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=body))
async def _periodic_requests(self):
'''
"Executes" every self._refresh_timeout seconds to fetch all
@ -151,9 +249,6 @@ class WarningBot:
await self._fetch_warnings()
await asyncio.sleep(self._refresh_timeout)
def _filter_func(self):
return lambda item: landkreis_filter(self._landkreis, item)
async def _fetch_warnings(self):
'''
Fetches all warnings and tries to find new ones
@ -163,15 +258,23 @@ class WarningBot:
self._warnings0 = []
for source in self._sources:
req = requests.get(source)
self._warnings0 = parse_data(req.text, self._filter_func())
self._warnings0 = parse_data(req.text)
# Find new warnings and send the new ones
ids = map(lambda x: x.id, self._warnings1)
for warning in self._warnings0:
if warning.id in ids:
continue
await self._send_notification(warning)
if len(set(warning.landkreise).intersection(self._warn_clients.keys())):
body = self._format_warning(warning)
for landkreis in warning.landkreise:
for to in self._warn_clients.get(landkreis, []):
msg = aioxmpp.stanza.Message(
to=aioxmpp.JID.fromstr(to),
type_=aioxmpp.MessageType.CHAT)
msg.body[None] = body
await self._client.send(msg)
def __time_format(self, time_str):
'''
@ -193,27 +296,19 @@ class WarningBot:
date.hour,
date.minute)
async def _send_notification(self, warning):
def _format_warning(self, warning):
'''
Send a warning to all the recipients
'''
# Reformat the message a bit
effective_time = self.__time_format(warning.effective_from)
expiry_time = self.__time_format(warning.expires)
body = '*{}*\n({} bis {})\n\n{}'.format(warning.headline,
effective_time,
expiry_time,
warning.description)
body = f'*{warning.headline}*\n({effective_time} bis {expiry_time})\n\n{warning.description}'
# Smells like script injection, but okay
body = body.replace('<br>', '\n')
body = body.replace('<br/>', '\n')
for recipient in self._recipients:
msg = aioxmpp.stanza.Message(
to=aioxmpp.JID.fromstr(recipient),
type_=aioxmpp.MessageType.CHAT)
msg.body[None] = body
await self._client.send(msg)
return body
def _load_config(self):
# Load config
@ -224,9 +319,14 @@ class WarningBot:
# Configure sources
self._sources = sources_from_config(config)
self._data_dir = config['General'].get('DataDir', '/etc/janine')
self._landkreis = config['General']['Landkreis']
self._recipients = config['General']['Recipients'].split(',')
self._refresh_timeout = int(config['General']['Timeout'])
# Warning data
client_file = os.path.join(self._data_dir, 'clients.json')
if os.path.exists(client_file):
with open(client_file, 'r') as cf:
self._warn_clients = json.loads(cf.read())
# Bot Config
self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])