refactor: Use the mira framework

This commit is contained in:
PapaTutuWawa 2021-06-13 00:17:06 +02:00
parent cea5b5d61f
commit f3b9bcf776
10 changed files with 266 additions and 482 deletions

View File

@ -6,23 +6,28 @@ Ein Jabber-Bot, der Meldungen vom BBK an konfigurierte Accounts
weiterleitet. Die Daten stammen von [hier](https://warnung.bund.de/meldungen) weiterleitet. Die Daten stammen von [hier](https://warnung.bund.de/meldungen)
und umfassen das IHP, den DWD, das MoWaS und die BIWAPP. und umfassen das IHP, den DWD, das MoWaS und die BIWAPP.
JANINE ist ein Modul, welches im Kontext des Bot-Frameworks [mira](https://git.polynom.me/PapaTutuWawa/mira)
verwendet wird.
## Dependencies ## Dependencies
JANINE benötigt JANINE benötigt
- `aioxmpp>=0.11.0` - `mira>=0.1.0`
- `requests>=2.24.0` - `requests>=2.24.0`
## Verwendung ## Verwendung
Instaliert wird JANINE per `python setup.py install`. ```toml
# [...]
Die Datei `janine.example.conf` erklärt alle notwendigen [[modules]]
Einstellungen. name = "mira.modules.janine.janine"
# Diese Vier sind die gültigen Quellen
sources = ["IHP", "DWD", "MOWAS", "BIWAPP"]
Um den Bot zu starten, verwendet man einfach `janine`. Als optionaler # [...]
Parameter kann noch der Pfad zur Konfiguration übergeben werden. ```
Standardmäßig wird versucht `/etc/janine/janine.conf` zu laden.
## Bugs ## Bugs

View File

@ -1,14 +0,0 @@
[General]
IHP = y # Hochwasserwarnungen
DWD = y # Unwetterwarnungen
MOWAS = y # Gefahrendurchsagen
BIWAPP = y # Warnmeldungen
Timeout=630 # Zeit in Sekunden nach welcher nach neuen Warnungen geschaut wird
DataDir = /etc/janine/data # Verzeichnis für persistente Daten
SameDomain = True # Soll der Bot nur auf Nachrichten von der selben Domain antworten
[Bot]
Avatar = /etc/janine/avatar.png # Bot Avatar (Optional)
JID = janine@some.server.xmpp # Bot Account
Password = super_secret_password # Bot Passwort
Status = Gibt dir im Notfall Bescheid # Statusnachricht (Optional)

View File

@ -1,16 +0,0 @@
'''
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
JANINE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''

View File

@ -1,406 +0,0 @@
'''
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
JANINE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''
import os
import sys
import json
import configparser
import logging
import asyncio
from collections import namedtuple
from optparse import OptionParser
from stat import S_IRUSR, S_IWUSR
import aioxmpp
from aioxmpp.structs import PresenceShow
import requests
from janine.utils import find_one, make_msg, format_warning
from janine.sources import sources_from_config, MiscDataSources
log = logging.getLogger('janine')
log.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler())
Warning_ = namedtuple('Warning_', ['id',
'sent',
'effective_from',
'expires',
'urgency',
'sender',
'headline',
'description',
'instruction',
'landkreise'])
def stub_warning(id_):
'''
Returns a stubbed warning for loading warnings from disk.
The only real attribute is the @id_ .
'''
return Warning_(id=id_,
sent='',
effective_from='',
expires='',
urgency='',
sender='',
headline='',
description='',
instruction='',
landkreise=[])
def to_warning(data):
'''
Returns a Warning given the raw data
'''
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
return Warning_(id=data['identifier'],
sent=data['sent'],
# Not all items have to have those
effective_from=info.get('effective', 'N/A'),
# Not all items have to have those
expires=info.get('expires', 'N/A'),
urgency=info['urgency'],
# Not all items have to have those
sender=info.get('senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=info.get('instruction', ''),
landkreise=get_landkreise(data))
def get_landkreise(data):
'''
Returns the list of Landkreise relevant to the warning in @data
'''
info = find_one(lambda e: 'area' in e.keys(), data['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
# Note: Some items may have multiple Landkreise
return list(map(lambda e: e['valueName'], geocode['geocode']))
def parse_data(text):
'''
Reads the remote response, parses it and returns a list of warnings.
'''
data = json.loads(text)
return [to_warning(el) for el in data]
class WarningBot:
'''
This class represents the actual bot. The only things
to be done is call connect() after creating an instance.
'''
def __init__(self, config_file):
self._warnings0 = []
self._warnings1 = []
self._client = None
self._warn_clients = {}
self._refresh_timeout = 630 # 15min
# Configuration stuff
self._data_dir = ''
self._client_store = ''
self._warning_store = ''
self._load_config(config_file)
async def connect(self):
'''
Starts the "event loop" of the bot
'''
self._client = aioxmpp.PresenceManagedClient(
self._jid,
aioxmpp.make_security_layer(self._password))
log.debug('Connecting to server')
async with self._client.connected():
log.info('Client connected to server')
# In case you want a nice avatar
if self._avatar:
log.debug('Setting avatar')
with open(self._avatar, 'rb') as avatar_file:
image_data = avatar_file.read()
avatar_set = aioxmpp.avatar.AvatarSet()
avatar_set.add_avatar_image('image/png', image_bytes=image_data)
await (self._client.summon(aioxmpp.avatar.AvatarService)
.publish_avatar_set(avatar_set))
log.info('Avatar set')
# Set some presence information
self._client.set_presence(
aioxmpp.PresenceState(available=True,
show=PresenceShow.CHAT),
self._status)
# Enable Carbons
await self._client.summon(aioxmpp.CarbonsClient).enable()
log.info('Message carbons enabled')
# Register the message handler
self._client.stream.register_message_callback(
aioxmpp.MessageType.CHAT,
None,
self._handle_message)
log.info('Message handler registered')
# Start our fetch-send loop
# NOTE: Originally, I wanted to use a cronjob and
# signal.signal(...) for this but you can't
# use async in event handlers
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._periodic_requests())
log.info('Periodic ticker started')
await periodic
def __is_message_valid(self, msg):
'''
Returns True on messages we want to handle. False otherwise.
'''
if msg.type_ != aioxmpp.MessageType.CHAT:
return False
if not msg.body:
return False
if msg.from_.domain != self._jid.domain and self._same_domain:
return False
return True
def _handle_message(self, msg):
# Handle cases we don't want to deal with
if not self.__is_message_valid(msg):
return
# Send a deliverability receipt
receipt = aioxmpp.mdr.compose_receipt(msg)
self._client.enqueue(receipt)
cmd_parts = str(msg.body.any()).split(' ')
cmd = cmd_parts[0].lower()
if cmd == 'subscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
# Check if the entered Landkreis is valid
landkreis = ' '.join(cmd_parts[1:])
if not landkreis in self._channels:
self._client.enqueue(make_msg(
to=msg.from_,
body='Der angegebene Landkreis ist ungültig'))
return
if landkreis not in self._warn_clients.keys():
self._warn_clients[landkreis] = []
self._warn_clients[landkreis].append(str(msg.from_.bare()))
self._client.enqueue(make_msg(
to=msg.from_,
body=f'Du erhälst nun Nachrichten zu {landkreis} von mir'))
with open(self._client_store, 'w') as clients_file:
clients_file.write(json.dumps(self._warn_clients))
# Send all known warnings for the landkreis to the user
for warning in self._warnings0:
if landkreis in warning.landkreise:
body = format_warning(warning)
self._client.enqueue(make_msg(
to=msg.from_,
body=body))
elif cmd == 'unsubscribe':
# Do we have a landkreis?
if len(cmd_parts) < 2:
self._client.enqueue(make_msg(
to=msg.from_,
body='Du hast keinen Landkreis angegeben'))
return
landkreis = ' '.join(cmd_parts[1:])
if landkreis not in self._warn_clients:
self._client.enqueue(make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
return
if str(msg.from_.bare()) in self._warn_clients[landkreis]:
filter_ = lambda x: x != str(msg.from_.bare())
self._warn_clients[landkreis] = list(filter(filter_,
self._warn_clients[landkreis]))
self._client.enqueue(make_msg(
to=msg.from_,
body=f'Du erhälst keine Nachrichten zu {landkreis} mehr von mir'))
if len(self._warn_clients[landkreis]) == 0:
del self._warn_clients[landkreis]
else:
self._client.enqueue(make_msg(
to=msg.from_,
body=f'Du hast {landkreis} nicht abonniert'))
elif cmd == 'help':
body = '''Verfügbare Befehle:
subscribe <Landkreis> - Abonniere einen Landkreis
unsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis
help - Gebe diese Hilfe aus'''
self._client.enqueue(make_msg(
to=msg.from_,
body=body))
else:
self._client.enqueue(make_msg(
to=msg.from_,
body='Diesen Befehl kenne ich nicht... Mit "help" kannst du alle Befehle sehen, die ich kenne.'))
async def _periodic_requests(self):
'''
"Executes" every self._refresh_timeout seconds to fetch all
configured warnings and send them to the users, if there are
any new ones.
'''
while True:
log.debug('Refreshing warning list')
await self._fetch_warnings()
# Flush the warnings to disk
ids = [x.id for x in self._warnings0]
with open(self._warnings_file, 'w') as warnings_file:
warnings_file.write(json.dumps(ids))
await asyncio.sleep(self._refresh_timeout)
async def _fetch_warnings(self):
'''
Fetches all warnings and tries to find new ones
to send notifications.
'''
self._warnings1 = self._warnings0
self._warnings0 = []
for source in self._sources:
try:
req = requests.get(source)
self._warnings0 += parse_data(req.text)
except urllib3.exceptions.MaxRetryError:
log.warn('Connection timeout for request to %s', source)
continue
# Find new warnings and send the new ones
ids = [x.id for x in self._warnings1]
for warning in self._warnings0:
if warning.id in ids:
continue
# We need to use a set as a warning can apply to more than one
# Landkreis
if set(warning.landkreise).intersection(self._warn_clients.keys()):
body = format_warning(warning)
for landkreis in warning.landkreise:
for to in self._warn_clients.get(landkreis, []):
await self._client.send(make_msg(
aioxmpp.JID.fromstr(to),
body))
def _load_config(self, config_file):
# Load config
config = configparser.ConfigParser()
config.read(config_file)
# Configure sources
self._sources = sources_from_config(config)
self._data_dir = config['General'].get('DataDir', '/etc/janine/data')
self._refresh_timeout = int(config['General']['Timeout'])
self._same_domain = config['General'].get('SameDomain', 'True') == 'True'
# Persistent data
## Subscribed clients
self._client_store = os.path.join(self._data_dir, 'clients.json')
if os.path.exists(self._client_store):
with open(self._client_store, 'r') as clients_file:
self._warn_clients = json.loads(clients_file.read())
log.info('Clients read from disk')
else:
with open(self._client_store, 'w') as clients_file:
clients_file.write('{}')
os.chmod(self._client_store, S_IRUSR | S_IWUSR)
log.info('Clients file created with 0600')
## Warnings
self._warnings_file = os.path.join(self._data_dir, 'warnings.json')
if os.path.exists(self._warnings_file):
with open(self._warnings_file, 'r') as warnings_file:
self._warnings0 = list(map(stub_warning,
json.loads(warnings_file.read())))
log.info('Warnings read from disk')
## Landkreise
self._channels = []
self._channel_file = os.path.join(self._data_dir, 'channels.json')
if not os.path.exists(self._channel_file):
log.info('Requesting search channels')
req = requests.get(MiscDataSources.channels())
channels = json.loads(req.text)
self._channels = [channels[key].get('NAME', '') for key in channels]
try:
with open(self._channel_file, 'w') as channel_file:
channel_file.write(json.dumps(self._channels))
except Exception as err:
log.error('Failed to cache channel data:')
log.error(str(err))
else:
with open(self._channel_file, 'r') as channel_file:
self._channels = json.loads(channel_file.read())
log.info('Search channels read from disk')
# Bot Config
self._jid = aioxmpp.JID.fromstr(config['Bot']['JID'])
self._password = config['Bot']['Password']
self._avatar = config['Bot'].get('Avatar', None)
self._status = config['Bot'].get('Status', 'Warnt dich vor Katastrophen')
def main():
'''
Main function.
'''
parser = OptionParser()
parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False)
(options, args) = parser.parse_args()
if options.debug:
log.setLevel(logging.DEBUG)
if len(args) != 0:
config_file = args[0]
else:
config_file = '/etc/janine/janine.conf'
bot = WarningBot(config_file)
loop = asyncio.get_event_loop()
loop.run_until_complete(bot.connect())
loop.close()
if __name__ == '__main__':
main()

View File

View File

@ -14,22 +14,8 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>. along with JANINE. If not, see <https://www.gnu.org/licenses/>.
''' '''
import datetime import datetime
import aioxmpp
def make_msg(to, body):
'''
Wrapper for creating a message object to enqueue or send.
'''
msg = aioxmpp.Message(
type_=aioxmpp.MessageType.CHAT,
to=to)
msg.body[None] = body
return msg
def pad_time_component(c): def pad_time_component(c):
''' '''
If we have 12:08, it gets turned into 12:8, so we need to pad If we have 12:08, it gets turned into 12:8, so we need to pad
@ -83,11 +69,3 @@ def find_one(func, array):
if func(e): if func(e):
return e return e
return None return None
def find_all(func, array):
'''
Utility function
Return all elements in array for which func returns True.
'''
return [e for e in array if func(e)]

View File

@ -0,0 +1,170 @@
'''
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
JANINE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''
import logging
import asyncio
from urllib3.exceptions import MaxRetryError
from mira.module import BaseModule
from mira.modules.janine.sources import MiscDataSources, WarningSources
from mira.modules.janine.warnings import parse_data, stub_warning
from mira.modules.janine.helpers import format_warning
import requests
import aioxmpp
logger = logging.getLogger('mira.modules.janine.janine')
NAME = 'janine'
class JanineModule(BaseModule):
__instance = None
@staticmethod
def get_instance(base, **kwargs):
if JanineModule.__instance == None:
JanineModule(base, **kwargs)
return JanineModule.__instance
def __init__(self, base, **kwargs):
if JanineModule.__instance != None:
raise Exception('Trying to init singleton twice')
super().__init__(base, **kwargs)
JanineModule.__instance = self
self._subcommand_table = {
'subscribe': self._subscribe,
'unsubscribe': self._unsubscribe,
'hilfe': self._help,
'*': self._any
}
self._channels = self._stm.get_data('channels')
if not self._channels:
# TODO: Move out of the constructor. Perform this asynchronously,
# and just refuse to process commands before we're done. Start the
# request loop afterwards.
logger.info('Channels do not exist. Downloading...')
req = requests.get(MiscDataSources.channels())
channels = req.json()
self._channels = [channels[key].get('NAME', '') for key in channels]
self._stm.set_data('channels', self._channels)
logger.info('Done')
self._warnings0 = list(map(stub_warning, self._stm.get_data('warnings')))
self._warnings1 = []
self._refresh_timeout = self.get_option('refresh_timeout', 15 * 60)
self._sources = list(map(WarningSources.source_by_name, self._config['sources']))
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._periodic_ticker())
async def _periodic_ticker(self):
'''
"Executes" every self._refresh_timeout seconds to fetch all
configured warnings and send them to the users, if there are
any new ones.
'''
while True:
logger.debug('Refreshing warning list')
await self._request_warnings()
self._stm.set_data('warnings',
[x.id for x in self._warnings0])
await asyncio.sleep(self._refresh_timeout)
async def _request_warnings(self):
'''
Requests warnings from all configured warning sources and
sends new ones as notifications
'''
self._warnings1 = self._warnings0
self._warnings0 = []
for source in self._sources:
try:
req = requests.get(source)
self._warnings0 += parse_data(req.json())
except MaxRetryError:
logger.warn('Connection timeout for request to %s', source)
continue
# Find new warnings and send them out
ids = [x.id for x in self._warnings1]
for warning in self._warnings0:
if warning.id in ids:
continue
for jid, _ in self._sum.get_subscriptions_for_keywords(warning.landkreise):
body = format_warning(warning)
self.send_message(aioxmpp.JID.fromstr(jid), body)
async def _subscribe(self, cmd, msg):
if len(cmd) < 2:
self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben')
return
landkreis = ' '.join(cmd[1:])
if not landkreis in self._channels:
self.send_message(msg.from_, 'Der angegebene Landkreis "%s" existiert nicht' % (landkreis))
return
bare = str(msg.from_.bare())
if self._sum.is_subscribed_to(bare, landkreis):
self.send_message(msg.from_,
'Du hast den "%s" bereits abonniert' % (landkreis))
return
self._sum.add_subscription_for(bare, landkreis)
for warning in self._warnings0:
if landkreis in warning.landkreise:
body = format_warning(warning)
self.send_message(msg.from_, body)
async def _unsubscribe(self, cmd, msg):
if len(cmd) < 2:
self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben')
return
landkreis = ' '.join(cmd[1:])
bare = str(msg.from_.bare())
if not self._sum.is_subscribed_to(bare, landkreis):
self.send_message(msg.from_, 'Du hast "%s" nicht abonniert' % (landkreis))
return
self._sum.remove_subscription_for(bare, landkreis)
self.send_message(msg.from_, 'Du erhälst nun keine Nachrichten zu "%s" mehr' % (landkreis))
async def _help(self, cmd, msg):
body = '''Verfügbare Befehle:
subscribe <Landkreis> - Abonniere einen Landkreis
unsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis
help - Gebe diese Hilfe aus'''
self.send_message(msg.from_, body)
async def _any(self, cmd, msg):
if not cmd:
self.send_message(msg.from_,
'Ich bin die Jabber Anwendung für Notfallinformations- und -Nachrichten-Empfang')
else:
self.send_message(msg.from_,
'Unbekannter Befehl "%s". "janine hilfe" gibt alle bekannten Befehle aus' % (cmd[0]))
def get_instance(base, **kwargs):
return JanineModule.get_instance(base, **kwargs)

View File

@ -54,13 +54,3 @@ class MiscDataSources:
These are the valid names to retrieve warnings for These are the valid names to retrieve warnings for
''' '''
return 'https://warnung.bund.de/assets/json/converted_gemeinden.json' return 'https://warnung.bund.de/assets/json/converted_gemeinden.json'
def sources_from_config(config):
sources = []
for module in ('IHP', 'DWD', 'BIWAPP', 'MOWAS'):
option = config['General'].get(module, 'n')
if option == 'y':
sources.append(WarningSources.source_by_name(module))
return sources

View File

@ -0,0 +1,82 @@
'''
Copyright (C) 2021 Alexander "PapaTutuWawa"
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from collections import namedtuple
from mira.modules.janine.helpers import find_one
Warning_ = namedtuple('Warning_', ['id',
'sent',
'effective_from',
'expires',
'urgency',
'sender',
'headline',
'description',
'instruction',
'landkreise'])
def stub_warning(id_):
'''
Returns a stubbed warning for loading warnings from disk.
The only real attribute is the @id_ .
'''
return Warning_(id=id_,
sent='',
effective_from='',
expires='',
urgency='',
sender='',
headline='',
description='',
instruction='',
landkreise=[])
def to_warning(data):
'''
Returns a Warning given the raw data
'''
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
return Warning_(id=data['identifier'],
sent=data['sent'],
# Not all items have to have those
effective_from=info.get('effective', 'N/A'),
# Not all items have to have those
expires=info.get('expires', 'N/A'),
urgency=info['urgency'],
# Not all items have to have those
sender=info.get('senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=info.get('instruction', ''),
landkreise=get_landkreise(data))
def get_landkreise(data):
'''
Returns the list of Landkreise relevant to the warning in @data
'''
info = find_one(lambda e: 'area' in e.keys(), data['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
# Note: Some items may have multiple Landkreise
return [e['valueName'] for e in geocode['geocode']]
def parse_data(data):
'''
Reads the remote response, parses it and returns a list of warnings.
'''
return [to_warning(el) for el in data]

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name = 'janine', name = 'janine',
version = '0.4.0', version = '1.0.0',
description = 'An XMPP bot relaying data from the IHP, DWD, MOWAS and BIWAP', description = 'An XMPP bot relaying data from the IHP, DWD, MOWAS and BIWAP',
url = 'https://git.polynom.me/PapaTutuWawa/janine', url = 'https://git.polynom.me/PapaTutuWawa/janine',
author = 'Alexander "PapaTutuWawa"', author = 'Alexander "PapaTutuWawa"',
@ -13,10 +13,5 @@ setup(
'aioxmpp>=0.11.0', 'aioxmpp>=0.11.0',
'requests>=2.23.0' 'requests>=2.23.0'
], ],
zip_safe=True, zip_safe=True
entry_points={
'console_scripts': [
'janine = janine.janine:main'
]
}
) )