fix: Format using black

This commit is contained in:
PapaTutuWawa 2021-06-15 20:24:00 +02:00
parent b8be375ddb
commit 79280d9d59
4 changed files with 156 additions and 124 deletions

View File

@ -1,4 +1,4 @@
'''
"""
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
@ -13,58 +13,62 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''
"""
import datetime
def pad_time_component(c):
'''
"""
If we have 12:08, it gets turned into 12:8, so we need to pad
the components with a leading zero, if there is none.
'''
"""
if len(c) != 2:
return f'0{c}'
return f"0{c}"
return c
def format_time(time_str):
'''
"""
Reformat ISO style time data to a more
readable format.
'''
"""
try:
date = datetime.datetime.fromisoformat(time_str)
except ValueError:
return time_str
return f'{date.day}.{date.month}.{date.year} {pad_time_component(date.hour)}:{pad_time_component(date.minute)}'
return f"{date.day}.{date.month}.{date.year} {pad_time_component(date.hour)}:{pad_time_component(date.minute)}"
def format_warning(warning):
'''
"""
Send a warning to all the recipients
'''
"""
# Reformat the message a bit
effective_time = format_time(warning.effective_from)
expiry_time = format_time(warning.expires)
body = f'''*{warning.headline}*
body = f"""*{warning.headline}*
({effective_time} bis {expiry_time})
{warning.description}'''
{warning.description}"""
if warning.instruction:
body = f'''{body}
body = f"""{body}
{warning.instruction}'''
{warning.instruction}"""
# Smells like script injection, but okay
body = body.replace('<br>', '\n')
body = body.replace('<br/>', '\n')
body = body.replace("<br>", "\n")
body = body.replace("<br/>", "\n")
return body
def find_one(func, array):
'''
"""
Utility function
Return the first element in array for which func returns True.
'''
"""
for e in array:
if func(e):
return e

View File

@ -1,4 +1,4 @@
'''
"""
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
@ -13,7 +13,7 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''
"""
import logging
import asyncio
from urllib3.exceptions import MaxRetryError
@ -26,9 +26,10 @@ from janine.modules.janine.helpers import format_warning
import requests
import aioxmpp
logger = logging.getLogger('mira.modules.janine.janine')
logger = logging.getLogger("mira.modules.janine.janine")
NAME = "janine"
NAME = 'janine'
class JanineModule(BaseModule):
__instance = None
@ -42,57 +43,58 @@ class JanineModule(BaseModule):
def __init__(self, base, **kwargs):
if JanineModule.__instance != None:
raise Exception('Trying to init singleton twice')
raise Exception("Trying to init singleton twice")
super().__init__(base, **kwargs)
JanineModule.__instance = self
self._subcommand_table = {
'subscribe': self._subscribe,
'unsubscribe': self._unsubscribe,
'hilfe': self._help,
'*': self._any
"subscribe": self._subscribe,
"unsubscribe": self._unsubscribe,
"hilfe": self._help,
"*": self._any,
}
self._channels = self._stm.get_data('channels')
self._channels = self._stm.get_data("channels")
if not self._channels:
# TODO: Move out of the constructor. Perform this asynchronously,
# and just refuse to process commands before we're done. Start the
# request loop afterwards.
logger.info('Channels do not exist. Downloading...')
logger.info("Channels do not exist. Downloading...")
req = requests.get(MiscDataSources.channels())
channels = req.json()
self._channels = [channels[key].get('NAME', '') for key in channels]
self._stm.set_data('channels', self._channels)
logger.info('Done')
self._channels = [channels[key].get("NAME", "") for key in channels]
self._stm.set_data("channels", self._channels)
logger.info("Done")
self._warnings0 = list(map(stub_warning, self._stm.get_data('warnings')))
self._warnings0 = list(map(stub_warning, self._stm.get_data("warnings")))
self._warnings1 = []
self._refresh_timeout = self.get_option('refresh_timeout', 15 * 60)
self._sources = list(map(WarningSources.source_by_name, self._config['sources']))
self._refresh_timeout = self.get_option("refresh_timeout", 15 * 60)
self._sources = list(
map(WarningSources.source_by_name, self._config["sources"])
)
loop = asyncio.get_event_loop()
periodic = loop.create_task(self._periodic_ticker())
async def _periodic_ticker(self):
'''
"""
"Executes" every self._refresh_timeout seconds to fetch all
configured warnings and send them to the users, if there are
any new ones.
'''
"""
while True:
logger.debug('Refreshing warning list')
logger.debug("Refreshing warning list")
await self._request_warnings()
self._stm.set_data('warnings',
[x.id for x in self._warnings0])
self._stm.set_data("warnings", [x.id for x in self._warnings0])
await asyncio.sleep(self._refresh_timeout)
async def _request_warnings(self):
'''
"""
Requests warnings from all configured warning sources and
sends new ones as notifications
'''
"""
self._warnings1 = self._warnings0
self._warnings0 = []
for source in self._sources:
@ -100,7 +102,7 @@ class JanineModule(BaseModule):
req = requests.get(source)
self._warnings0 += parse_data(req.json())
except MaxRetryError:
logger.warn('Connection timeout for request to %s', source)
logger.warn("Connection timeout for request to %s", source)
continue
# Find new warnings and send them out
@ -115,18 +117,21 @@ class JanineModule(BaseModule):
async def _subscribe(self, cmd, msg):
if len(cmd) < 2:
self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben')
self.send_message(msg.from_, "Du hast keinen Landkreis angegeben")
return
landkreis = ' '.join(cmd[1:])
landkreis = " ".join(cmd[1:])
if not landkreis in self._channels:
self.send_message(msg.from_, 'Der angegebene Landkreis "%s" existiert nicht' % (landkreis))
self.send_message(
msg.from_, 'Der angegebene Landkreis "%s" existiert nicht' % (landkreis)
)
return
bare = str(msg.from_.bare())
if self._sum.is_subscribed_to(bare, landkreis):
self.send_message(msg.from_,
'Du hast den "%s" bereits abonniert' % (landkreis))
self.send_message(
msg.from_, 'Du hast den "%s" bereits abonniert' % (landkreis)
)
return
self._sum.add_subscription_for(bare, landkreis)
@ -137,33 +142,40 @@ class JanineModule(BaseModule):
async def _unsubscribe(self, cmd, msg):
if len(cmd) < 2:
self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben')
self.send_message(msg.from_, "Du hast keinen Landkreis angegeben")
return
landkreis = ' '.join(cmd[1:])
landkreis = " ".join(cmd[1:])
bare = str(msg.from_.bare())
if not self._sum.is_subscribed_to(bare, landkreis):
self.send_message(msg.from_, 'Du hast "%s" nicht abonniert' % (landkreis))
return
self._sum.remove_subscription_for(bare, landkreis)
self.send_message(msg.from_, 'Du erhälst nun keine Nachrichten zu "%s" mehr' % (landkreis))
self.send_message(
msg.from_, 'Du erhälst nun keine Nachrichten zu "%s" mehr' % (landkreis)
)
async def _help(self, cmd, msg):
body = '''Verfügbare Befehle:
body = """Verfügbare Befehle:
subscribe <Landkreis> - Abonniere einen Landkreis
unsubscribe <Landkreis> - Entferne das Abonnement zu einem Landkreis
help - Gebe diese Hilfe aus'''
help - Gebe diese Hilfe aus"""
self.send_message(msg.from_, body)
async def _any(self, cmd, msg):
if not cmd:
self.send_message(msg.from_,
'Ich bin die Jabber Anwendung für Notfallinformations- und -Nachrichten-Empfang')
self.send_message(
msg.from_,
"Ich bin die Jabber Anwendung für Notfallinformations- und -Nachrichten-Empfang",
)
else:
self.send_message(msg.from_,
'Unbekannter Befehl "%s". "janine hilfe" gibt alle bekannten Befehle aus' % (cmd[0]))
self.send_message(
msg.from_,
'Unbekannter Befehl "%s". "janine hilfe" gibt alle bekannten Befehle aus'
% (cmd[0]),
)
def get_instance(base, **kwargs):

View File

@ -1,4 +1,4 @@
'''
"""
This file is part of JANINE.
JANINE is free software: you can redistribute it and/or modify
@ -13,44 +13,48 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with JANINE. If not, see <https://www.gnu.org/licenses/>.
'''
"""
class WarningSources:
'''
"""
A collection of sources of the BBK
'''
"""
@staticmethod
def bbk_dwd():
return 'https://warnung.bund.de/bbk.dwd/unwetter.json'
return "https://warnung.bund.de/bbk.dwd/unwetter.json"
@staticmethod
def bbk_mowas():
return 'https://warnung.bund.de/bbk.mowas/gefahrendurchsagen.json'
return "https://warnung.bund.de/bbk.mowas/gefahrendurchsagen.json"
@staticmethod
def bbk_biwapp():
return 'https://warnung.bund.de/bbk.biwapp/warnmeldungen.json'
return "https://warnung.bund.de/bbk.biwapp/warnmeldungen.json"
@staticmethod
def bbk_ihp():
return 'https://warnung.bund.de/bbk.lhp/hochwassermeldungen.json'
return "https://warnung.bund.de/bbk.lhp/hochwassermeldungen.json"
@staticmethod
def source_by_name(name):
return {
'IHP': WarningSources.bbk_ihp(),
'DWD': WarningSources.bbk_dwd(),
'MOWAS': WarningSources.bbk_mowas(),
'BIWAPP': WarningSources.bbk_biwapp()
}[name]
"IHP": WarningSources.bbk_ihp(),
"DWD": WarningSources.bbk_dwd(),
"MOWAS": WarningSources.bbk_mowas(),
"BIWAPP": WarningSources.bbk_biwapp(),
}[name]
class MiscDataSources:
'''
"""
A collection of other data sources for various use cases
'''
"""
@staticmethod
def channels():
'''
"""
These are the valid names to retrieve warnings for
'''
return 'https://warnung.bund.de/assets/json/converted_gemeinden.json'
"""
return "https://warnung.bund.de/assets/json/converted_gemeinden.json"

View File

@ -1,4 +1,4 @@
'''
"""
Copyright (C) 2021 Alexander "PapaTutuWawa"
This program is free software: you can redistribute it and/or modify
@ -13,70 +13,82 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
"""
from collections import namedtuple
from janine.modules.janine.helpers import find_one
Warning_ = namedtuple('Warning_', ['id',
'sent',
'effective_from',
'expires',
'urgency',
'sender',
'headline',
'description',
'instruction',
'landkreise'])
Warning_ = namedtuple(
"Warning_",
[
"id",
"sent",
"effective_from",
"expires",
"urgency",
"sender",
"headline",
"description",
"instruction",
"landkreise",
],
)
def stub_warning(id_):
'''
"""
Returns a stubbed warning for loading warnings from disk.
The only real attribute is the @id_ .
'''
return Warning_(id=id_,
sent='',
effective_from='',
expires='',
urgency='',
sender='',
headline='',
description='',
instruction='',
landkreise=[])
"""
return Warning_(
id=id_,
sent="",
effective_from="",
expires="",
urgency="",
sender="",
headline="",
description="",
instruction="",
landkreise=[],
)
def to_warning(data):
'''
"""
Returns a Warning given the raw data
'''
info = find_one(lambda x: 'headline' in x.keys(), data['info'])
return Warning_(id=data['identifier'],
sent=data['sent'],
# Not all items have to have those
effective_from=info.get('effective', 'N/A'),
# Not all items have to have those
expires=info.get('expires', 'N/A'),
urgency=info['urgency'],
# Not all items have to have those
sender=info.get('senderName', 'N/A'),
headline=info['headline'],
description=info['description'],
instruction=info.get('instruction', ''),
landkreise=get_landkreise(data))
"""
info = find_one(lambda x: "headline" in x.keys(), data["info"])
return Warning_(
id=data["identifier"],
sent=data["sent"],
# Not all items have to have those
effective_from=info.get("effective", "N/A"),
# Not all items have to have those
expires=info.get("expires", "N/A"),
urgency=info["urgency"],
# Not all items have to have those
sender=info.get("senderName", "N/A"),
headline=info["headline"],
description=info["description"],
instruction=info.get("instruction", ""),
landkreise=get_landkreise(data),
)
def get_landkreise(data):
'''
"""
Returns the list of Landkreise relevant to the warning in @data
'''
info = find_one(lambda e: 'area' in e.keys(), data['info'])
geocode = find_one(lambda e: 'geocode' in e.keys(), info['area'])
"""
info = find_one(lambda e: "area" in e.keys(), data["info"])
geocode = find_one(lambda e: "geocode" in e.keys(), info["area"])
# Note: Some items may have multiple Landkreise
return [e['valueName'] for e in geocode['geocode']]
return [e["valueName"] for e in geocode["geocode"]]
def parse_data(data):
'''
"""
Reads the remote response, parses it and returns a list of warnings.
'''
"""
return [to_warning(el) for el in data]