diff --git a/janine/modules/janine/helpers.py b/janine/modules/janine/helpers.py index d8e95ea..e4698a0 100644 --- a/janine/modules/janine/helpers.py +++ b/janine/modules/janine/helpers.py @@ -1,4 +1,4 @@ -''' +""" This file is part of JANINE. JANINE is free software: you can redistribute it and/or modify @@ -13,58 +13,62 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with JANINE. If not, see . -''' +""" import datetime + def pad_time_component(c): - ''' + """ If we have 12:08, it gets turned into 12:8, so we need to pad the components with a leading zero, if there is none. - ''' + """ if len(c) != 2: - return f'0{c}' + return f"0{c}" return c + def format_time(time_str): - ''' + """ Reformat ISO style time data to a more readable format. - ''' + """ try: date = datetime.datetime.fromisoformat(time_str) except ValueError: return time_str - return f'{date.day}.{date.month}.{date.year} {pad_time_component(date.hour)}:{pad_time_component(date.minute)}' + return f"{date.day}.{date.month}.{date.year} {pad_time_component(date.hour)}:{pad_time_component(date.minute)}" + def format_warning(warning): - ''' + """ Send a warning to all the recipients - ''' + """ # Reformat the message a bit effective_time = format_time(warning.effective_from) expiry_time = format_time(warning.expires) - body = f'''*{warning.headline}* + body = f"""*{warning.headline}* ({effective_time} bis {expiry_time}) -{warning.description}''' +{warning.description}""" if warning.instruction: - body = f'''{body} + body = f"""{body} -{warning.instruction}''' +{warning.instruction}""" # Smells like script injection, but okay - body = body.replace('
', '\n') - body = body.replace('
', '\n') + body = body.replace("
", "\n") + body = body.replace("
", "\n") return body + def find_one(func, array): - ''' + """ Utility function Return the first element in array for which func returns True. - ''' + """ for e in array: if func(e): return e diff --git a/janine/modules/janine/janine.py b/janine/modules/janine/janine.py index 552030f..8fddde0 100644 --- a/janine/modules/janine/janine.py +++ b/janine/modules/janine/janine.py @@ -1,4 +1,4 @@ -''' +""" This file is part of JANINE. JANINE is free software: you can redistribute it and/or modify @@ -13,7 +13,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with JANINE. If not, see . -''' +""" import logging import asyncio from urllib3.exceptions import MaxRetryError @@ -26,9 +26,10 @@ from janine.modules.janine.helpers import format_warning import requests import aioxmpp -logger = logging.getLogger('mira.modules.janine.janine') +logger = logging.getLogger("mira.modules.janine.janine") + +NAME = "janine" -NAME = 'janine' class JanineModule(BaseModule): __instance = None @@ -39,60 +40,61 @@ class JanineModule(BaseModule): JanineModule(base, **kwargs) return JanineModule.__instance - + def __init__(self, base, **kwargs): if JanineModule.__instance != None: - raise Exception('Trying to init singleton twice') + raise Exception("Trying to init singleton twice") super().__init__(base, **kwargs) JanineModule.__instance = self self._subcommand_table = { - 'subscribe': self._subscribe, - 'unsubscribe': self._unsubscribe, - 'hilfe': self._help, - '*': self._any + "subscribe": self._subscribe, + "unsubscribe": self._unsubscribe, + "hilfe": self._help, + "*": self._any, } - self._channels = self._stm.get_data('channels') + self._channels = self._stm.get_data("channels") if not self._channels: # TODO: Move out of the constructor. Perform this asynchronously, # and just refuse to process commands before we're done. Start the # request loop afterwards. - logger.info('Channels do not exist. Downloading...') + logger.info("Channels do not exist. Downloading...") req = requests.get(MiscDataSources.channels()) channels = req.json() - self._channels = [channels[key].get('NAME', '') for key in channels] - self._stm.set_data('channels', self._channels) - logger.info('Done') + self._channels = [channels[key].get("NAME", "") for key in channels] + self._stm.set_data("channels", self._channels) + logger.info("Done") - self._warnings0 = list(map(stub_warning, self._stm.get_data('warnings'))) + self._warnings0 = list(map(stub_warning, self._stm.get_data("warnings"))) self._warnings1 = [] - self._refresh_timeout = self.get_option('refresh_timeout', 15 * 60) - self._sources = list(map(WarningSources.source_by_name, self._config['sources'])) + self._refresh_timeout = self.get_option("refresh_timeout", 15 * 60) + self._sources = list( + map(WarningSources.source_by_name, self._config["sources"]) + ) loop = asyncio.get_event_loop() periodic = loop.create_task(self._periodic_ticker()) async def _periodic_ticker(self): - ''' + """ "Executes" every self._refresh_timeout seconds to fetch all configured warnings and send them to the users, if there are any new ones. - ''' + """ while True: - logger.debug('Refreshing warning list') + logger.debug("Refreshing warning list") await self._request_warnings() - self._stm.set_data('warnings', - [x.id for x in self._warnings0]) + self._stm.set_data("warnings", [x.id for x in self._warnings0]) await asyncio.sleep(self._refresh_timeout) - + async def _request_warnings(self): - ''' + """ Requests warnings from all configured warning sources and sends new ones as notifications - ''' + """ self._warnings1 = self._warnings0 self._warnings0 = [] for source in self._sources: @@ -100,7 +102,7 @@ class JanineModule(BaseModule): req = requests.get(source) self._warnings0 += parse_data(req.json()) except MaxRetryError: - logger.warn('Connection timeout for request to %s', source) + logger.warn("Connection timeout for request to %s", source) continue # Find new warnings and send them out @@ -115,18 +117,21 @@ class JanineModule(BaseModule): async def _subscribe(self, cmd, msg): if len(cmd) < 2: - self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben') + self.send_message(msg.from_, "Du hast keinen Landkreis angegeben") return - landkreis = ' '.join(cmd[1:]) + landkreis = " ".join(cmd[1:]) if not landkreis in self._channels: - self.send_message(msg.from_, 'Der angegebene Landkreis "%s" existiert nicht' % (landkreis)) + self.send_message( + msg.from_, 'Der angegebene Landkreis "%s" existiert nicht' % (landkreis) + ) return bare = str(msg.from_.bare()) if self._sum.is_subscribed_to(bare, landkreis): - self.send_message(msg.from_, - 'Du hast den "%s" bereits abonniert' % (landkreis)) + self.send_message( + msg.from_, 'Du hast den "%s" bereits abonniert' % (landkreis) + ) return self._sum.add_subscription_for(bare, landkreis) @@ -137,33 +142,40 @@ class JanineModule(BaseModule): async def _unsubscribe(self, cmd, msg): if len(cmd) < 2: - self.send_message(msg.from_, 'Du hast keinen Landkreis angegeben') + self.send_message(msg.from_, "Du hast keinen Landkreis angegeben") return - landkreis = ' '.join(cmd[1:]) + landkreis = " ".join(cmd[1:]) bare = str(msg.from_.bare()) if not self._sum.is_subscribed_to(bare, landkreis): self.send_message(msg.from_, 'Du hast "%s" nicht abonniert' % (landkreis)) return self._sum.remove_subscription_for(bare, landkreis) - self.send_message(msg.from_, 'Du erhälst nun keine Nachrichten zu "%s" mehr' % (landkreis)) + self.send_message( + msg.from_, 'Du erhälst nun keine Nachrichten zu "%s" mehr' % (landkreis) + ) async def _help(self, cmd, msg): - body = '''Verfügbare Befehle: + body = """Verfügbare Befehle: subscribe - Abonniere einen Landkreis unsubscribe - Entferne das Abonnement zu einem Landkreis -help - Gebe diese Hilfe aus''' +help - Gebe diese Hilfe aus""" self.send_message(msg.from_, body) async def _any(self, cmd, msg): if not cmd: - self.send_message(msg.from_, - 'Ich bin die Jabber Anwendung für Notfallinformations- und -Nachrichten-Empfang') + self.send_message( + msg.from_, + "Ich bin die Jabber Anwendung für Notfallinformations- und -Nachrichten-Empfang", + ) else: - self.send_message(msg.from_, - 'Unbekannter Befehl "%s". "janine hilfe" gibt alle bekannten Befehle aus' % (cmd[0])) + self.send_message( + msg.from_, + 'Unbekannter Befehl "%s". "janine hilfe" gibt alle bekannten Befehle aus' + % (cmd[0]), + ) def get_instance(base, **kwargs): diff --git a/janine/modules/janine/sources.py b/janine/modules/janine/sources.py index 85c59c4..cf9abae 100644 --- a/janine/modules/janine/sources.py +++ b/janine/modules/janine/sources.py @@ -1,4 +1,4 @@ -''' +""" This file is part of JANINE. JANINE is free software: you can redistribute it and/or modify @@ -13,44 +13,48 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with JANINE. If not, see . -''' +""" + class WarningSources: - ''' + """ A collection of sources of the BBK - ''' + """ + @staticmethod def bbk_dwd(): - return 'https://warnung.bund.de/bbk.dwd/unwetter.json' + return "https://warnung.bund.de/bbk.dwd/unwetter.json" @staticmethod def bbk_mowas(): - return 'https://warnung.bund.de/bbk.mowas/gefahrendurchsagen.json' + return "https://warnung.bund.de/bbk.mowas/gefahrendurchsagen.json" @staticmethod def bbk_biwapp(): - return 'https://warnung.bund.de/bbk.biwapp/warnmeldungen.json' + return "https://warnung.bund.de/bbk.biwapp/warnmeldungen.json" @staticmethod def bbk_ihp(): - return 'https://warnung.bund.de/bbk.lhp/hochwassermeldungen.json' + return "https://warnung.bund.de/bbk.lhp/hochwassermeldungen.json" @staticmethod def source_by_name(name): return { - 'IHP': WarningSources.bbk_ihp(), - 'DWD': WarningSources.bbk_dwd(), - 'MOWAS': WarningSources.bbk_mowas(), - 'BIWAPP': WarningSources.bbk_biwapp() - }[name] + "IHP": WarningSources.bbk_ihp(), + "DWD": WarningSources.bbk_dwd(), + "MOWAS": WarningSources.bbk_mowas(), + "BIWAPP": WarningSources.bbk_biwapp(), + }[name] + class MiscDataSources: - ''' + """ A collection of other data sources for various use cases - ''' + """ + @staticmethod def channels(): - ''' + """ These are the valid names to retrieve warnings for - ''' - return 'https://warnung.bund.de/assets/json/converted_gemeinden.json' + """ + return "https://warnung.bund.de/assets/json/converted_gemeinden.json" diff --git a/janine/modules/janine/warnings.py b/janine/modules/janine/warnings.py index b3ab96f..74cb3a2 100644 --- a/janine/modules/janine/warnings.py +++ b/janine/modules/janine/warnings.py @@ -1,4 +1,4 @@ -''' +""" Copyright (C) 2021 Alexander "PapaTutuWawa" This program is free software: you can redistribute it and/or modify @@ -13,70 +13,82 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . -''' +""" from collections import namedtuple from janine.modules.janine.helpers import find_one -Warning_ = namedtuple('Warning_', ['id', - 'sent', - 'effective_from', - 'expires', - 'urgency', - 'sender', - 'headline', - 'description', - 'instruction', - 'landkreise']) +Warning_ = namedtuple( + "Warning_", + [ + "id", + "sent", + "effective_from", + "expires", + "urgency", + "sender", + "headline", + "description", + "instruction", + "landkreise", + ], +) def stub_warning(id_): - ''' + """ Returns a stubbed warning for loading warnings from disk. The only real attribute is the @id_ . - ''' - return Warning_(id=id_, - sent='', - effective_from='', - expires='', - urgency='', - sender='', - headline='', - description='', - instruction='', - landkreise=[]) + """ + return Warning_( + id=id_, + sent="", + effective_from="", + expires="", + urgency="", + sender="", + headline="", + description="", + instruction="", + landkreise=[], + ) + def to_warning(data): - ''' + """ Returns a Warning given the raw data - ''' - info = find_one(lambda x: 'headline' in x.keys(), data['info']) - return Warning_(id=data['identifier'], - sent=data['sent'], - # Not all items have to have those - effective_from=info.get('effective', 'N/A'), - # Not all items have to have those - expires=info.get('expires', 'N/A'), - urgency=info['urgency'], - # Not all items have to have those - sender=info.get('senderName', 'N/A'), - headline=info['headline'], - description=info['description'], - instruction=info.get('instruction', ''), - landkreise=get_landkreise(data)) + """ + info = find_one(lambda x: "headline" in x.keys(), data["info"]) + return Warning_( + id=data["identifier"], + sent=data["sent"], + # Not all items have to have those + effective_from=info.get("effective", "N/A"), + # Not all items have to have those + expires=info.get("expires", "N/A"), + urgency=info["urgency"], + # Not all items have to have those + sender=info.get("senderName", "N/A"), + headline=info["headline"], + description=info["description"], + instruction=info.get("instruction", ""), + landkreise=get_landkreise(data), + ) + def get_landkreise(data): - ''' + """ Returns the list of Landkreise relevant to the warning in @data - ''' - info = find_one(lambda e: 'area' in e.keys(), data['info']) - geocode = find_one(lambda e: 'geocode' in e.keys(), info['area']) + """ + info = find_one(lambda e: "area" in e.keys(), data["info"]) + geocode = find_one(lambda e: "geocode" in e.keys(), info["area"]) # Note: Some items may have multiple Landkreise - return [e['valueName'] for e in geocode['geocode']] + return [e["valueName"] for e in geocode["geocode"]] + def parse_data(data): - ''' + """ Reads the remote response, parses it and returns a list of warnings. - ''' + """ return [to_warning(el) for el in data]