JANINE/janine/janine.py

363 lines
13 KiB
Python

import os
import sys
import json
import datetime
import configparser
import threading
import logging
import signal
import asyncio
from collections import namedtuple
import aioxmpp
from aioxmpp.structs import PresenceShow
import requests
from janine.utils import find_one, find_all
from janine.sources import sources_from_config, MiscDataSources
log = logging.getLogger('janine')
#log.basicConfig(level=logging.INFO)
Warning = namedtuple('Warning', ['id',
'sent',
'effective_from',
'expires',
'urgency',
'sender',
'headline',
'description',
'instruction',
'landkreise'])
def to_warning(data):
'''
Returns a Warning given the raw data
'''
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
return Warning(id=data['identifier'],
sent=data['sent'],
# Not all items have to have those
effective_from=info.get('effective', 'N/A'),
# Not all items have to have those
expires=info.get('expires', 'N/A'),
urgency=info['urgency'],
# Not all items have to have those
sender=info.get('senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=info.get('instruction', 'N/A'),
landkreise=get_landkreise(data))
def get_landkreise(data):
'''
Returns the list of Landkreise relevant to the warning in @data
'''
info = find_one(lambda e: 'area' in e.keys(), data['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
# Note: Some items may have multiple Landkreise
return list(map(lambda e: e['valueName'], geocode['geocode']))
def parse_data(text):
'''
Reads the remote response, parses it and returns a list of warnings.
'''
data = json.loads(text)
return [to_warning(el) for el in data]
class WarningBot:
def __init__(self):
self._warnings0 = []
self._warnings1 = []
self._client = None
self._warn_clients = {}
self._refresh_timeout = 630 # 15min
# Configuration stuff
self._data_dir = ''
self._client_store = ''
self._warning_store = ''
self._load_config()
async def connect(self):
'''
Starts the "event loop" of the bot
'''
self._client = aioxmpp.PresenceManagedClient(
self._jid,
aioxmpp.make_security_layer(self._password))
async with self._client.connected() as stream:
logging.info('Client connected to server')
# In case you want a nice avatar
if self._avatar:
log.info('Setting avatar')
with open(self._avatar, 'rb') as f:
image_data = f.read()
avatar_set = aioxmpp.avatar.AvatarSet()
avatar_set.add_avatar_image('image/png', image_bytes=image_data)
await selg.avatar.publish_avatar_set(avatar_set)
# Set some presence information
self._client.set_presence(
aioxmpp.PresenceState(available=True,
show=PresenceShow.DND),
self._status)
# Register the message handler
self._client.stream.register_message_callback(
aioxmpp.MessageType.CHAT,
None,
self._handle_message)
logging.info('Message handler registered')
# Start our fetch-send loop
# NOTE: Originally, I wanted to use a cronjob and
# signal.signal(...) for this but you can't
# use async in event handlers
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._periodic_requests())
logging.info('Periodic ticker started')
await periodic
def __is_message_valid(self, msg):
'''
Returns True on messages we want to handle. False otherwise.
'''
if msg.type_ != aioxmpp.MessageType.CHAT:
return False
if not msg.body:
return False
if msg.from_.domain != self._jid.domain and self._same_domain:
return False
return True
def __make_msg(self, to, body):
'''
Wrapper for creating a message object to enqueue or send.
'''
msg = aioxmpp.Message(
type_=aioxmpp.MessageType.CHAT,
to=to)
msg.body[None] = body
return msg
def _handle_message(self, msg):
# Handle cases we don't want to deal with
if not self.__is_message_valid(msg):
return
cmd_parts = str(msg.body.any()).split(' ')
cmd = cmd_parts[0].lower()
if cmd == 'subscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
# Check if the entered Landkreis is valid
landkreis = ' '.join(cmd_parts[1:])
if not landkreis in self._channels:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Der angegebene Landkreis ist ungültig'))
return
if not landkreis in self._warn_clients.keys():
self._warn_clients[landkreis] = []
self._warn_clients[landkreis].append(str(msg.from_.bare()))
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du erhälst nun Nachrichten zu {landkreis} von mir'))
with open(self._client_store, 'w') as cf:
cf.write(json.dumps(self._warn_clients))
# Send all known warnings for the landkreis to the user
for warning in self._warnings0:
if landkreis in warning.landkreise:
body = self._format_warning(warning)
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=body))
elif cmd == 'unsubscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
landkreis = ' '.join(cmd_parts[1:])
if not landkreis in self._warn_clients:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
return
if str(msg.from_.bare()) in self._warn_clients[landkreis]:
filter_ = lambda x: x != str(msg.from_.bare())
self._warn_clients[landkreis] = list(filter(filter_,
self._warn_clients[landkreis]))
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir'))
if len(self._warn_clients[landkreis]) == 0:
del self._warn_clients[landkreis]
else:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
elif cmd == 'help':
body = 'Verfügbare Befehle:\n\nsubscribe <Landkreis> - Abonniere einen Landkreis\nunsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis\nhelp - Gebe diese Hilfe aus'
self._client.enqueue(self.__make_msg(
to=msg.from_,
body=body))
else:
self._client.enqueue(self.__make_msg(
to=msg.from_,
body='Diesen Befehl kenne ich nicht... Mit "help" kannst du alle Befehle sehen, die ich kenne.'))
async def _periodic_requests(self):
'''
"Executes" every self._refresh_timeout seconds to fetch all
configured warnings and send them to the users, if there are
any new ones.
'''
while True:
logging.debug('Refreshing warning list')
await self._fetch_warnings()
await asyncio.sleep(self._refresh_timeout)
async def _fetch_warnings(self):
'''
Fetches all warnings and tries to find new ones
to send notifications.
'''
self._warnings1 = self._warnings0
self._warnings0 = []
for source in self._sources:
req = requests.get(source)
self._warnings0 = parse_data(req.text)
# Find new warnings and send the new ones
ids = map(lambda x: x.id, self._warnings1)
for warning in self._warnings0:
if warning.id in ids:
continue
if len(set(warning.landkreise).intersection(self._warn_clients.keys())):
body = self._format_warning(warning)
for landkreis in warning.landkreise:
for to in self._warn_clients.get(landkreis, []):
msg = aioxmpp.stanza.Message(
to=aioxmpp.JID.fromstr(to),
type_=aioxmpp.MessageType.CHAT)
msg.body[None] = body
await self._client.send(msg)
def __time_format(self, time_str):
'''
Reformat ISO style time data to a more
readable format.
'''
date = None
try:
date = datetime.datetime.fromisoformat(time_str)
except:
pass
if not date:
return time_str
return '{}.{}.{} {}:{}'.format(date.day,
date.month,
date.year,
date.hour,
date.minute)
def _format_warning(self, warning):
'''
Send a warning to all the recipients
'''
# Reformat the message a bit
effective_time = self.__time_format(warning.effective_from)
expiry_time = self.__time_format(warning.expires)
body = f'*{warning.headline}*\n({effective_time} bis {expiry_time})\n\n{warning.description}'
# Smells like script injection, but okay
body = body.replace('<br>', '\n')
body = body.replace('<br/>', '\n')
return body
def _load_config(self):
# Load config
config_path = sys.argv[1] if len(sys.argv) == 2 else '/etc/janine/janine.conf'
config = configparser.ConfigParser()
config.read(config_path)
# Configure sources
self._sources = sources_from_config(config)
self._data_dir = config['General'].get('DataDir', '/etc/janine')
self._recipients = config['General']['Recipients'].split(',')
self._refresh_timeout = int(config['General']['Timeout'])
self._same_domain = config['General'].get('SameDomain', 'True') == 'True'
# Persistent data
# Subscribed clients
self._client_store = os.path.join(self._data_dir, 'clients.json')
if os.path.exists(self._client_store):
with open(self._client_store, 'r') as cf:
self._warn_clients = json.loads(cf.read())
# Landkreise
self._channels = []
channels = {}
channel_file = os.path.join(self._data_dir, 'channels.json')
if not os.path.exists(channel_file):
log.debug('Requesting search channels')
req = requests.get(MiscDataSources.channels())
channels = json.loads(req.text)
self._channels = list(map(lambda key: channels[key].get('NAME', ''),
channels.keys()))
try:
with open(channel_file, 'w') as f:
f.write(json.dumps(self._channels))
except Exception as err:
log.error('Failed to cache channel data:')
log.error(str(err))
else:
with open(channel_file, 'r') as f:
self._channels = json.loads(f.read())
# Bot Config
self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
self._password = config['Bot']['Password']
self._avatar = config['Bot'].get('Avatar', None)
self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen')
def main():
bot = WarningBot()
loop = asyncio.get_event_loop()
loop.run_until_complete(bot.connect())
loop.close()
if __name__ == '__main__':
main()