diff --git a/microkodi/config.py b/microkodi/config.py index 2b614c3..0307620 100644 --- a/microkodi/config.py +++ b/microkodi/config.py @@ -5,6 +5,7 @@ from typing import Any from microkodi.helpers import recursive_dict_merge + @dataclass class Config: host: str @@ -43,7 +44,8 @@ class Config: # Webhook to trigger to turn on power to the TV tv_power_webhook: str | None - + + def load_config(config_path: Path | None) -> Config: if config_path is None: config_data = {} diff --git a/microkodi/helpers.py b/microkodi/helpers.py index e231295..639622f 100644 --- a/microkodi/helpers.py +++ b/microkodi/helpers.py @@ -4,16 +4,22 @@ from functools import wraps P = ParamSpec("P") T = TypeVar("T") + + def after(func: Callable[[], None]) -> Callable[P, T]: """Runs @func after the decorated function exits.""" + def decorator(f: Callable[P, T]) -> Callable[P, T]: def inner(*args: P.args, **kwargs: P.kwargs) -> T: ret = f(*args, **kwargs) func() return ret + return inner + return decorator + @dataclass class KodiTime: hours: int @@ -23,6 +29,7 @@ class KodiTime: def to_seconds(self) -> int: return self.hours * 3600 + self.minutes * 59 + self.seconds + def seconds_to_kodi_format(seconds: int) -> KodiTime: """Convert seconds into hours, minutes, and seconds as Kodi wants that.""" hours = seconds // 3600 @@ -42,6 +49,7 @@ def find(l: list[T], pred: Callable[[T], bool]) -> T | None: return i return None + def recursive_dict_merge(base: dict, update: dict) -> dict: unique_keys = set([*base.keys(), *update.keys()]) out = {} @@ -55,4 +63,4 @@ def recursive_dict_merge(base: dict, update: dict) -> dict: out[key] = update[key] else: out[key] = recursive_dict_merge(base[key], update[key]) - return out \ No newline at end of file + return out diff --git a/microkodi/jsonrpc.py b/microkodi/jsonrpc.py index 3a90b98..20029c3 100644 --- a/microkodi/jsonrpc.py +++ b/microkodi/jsonrpc.py @@ -12,6 +12,7 @@ from microkodi.repository import I from microkodi.programs import Program, PlayerInfo from microkodi.helpers import KodiTime, seconds_to_kodi_format, after + def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]: return { "jsonrpc": "2.0", @@ -19,6 +20,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]: "id": id, } + class JsonRpcObject: logger: logging.Logger @@ -71,9 +73,8 @@ class ApplicationRpcObject: return "LOL" else: return "Unknown" - return { - prop: _get_property(prop) for prop in params["properties"] - } + + return {prop: _get_property(prop) for prop in params["properties"]} class PlayerRpcObject(JsonRpcObject): @@ -122,14 +123,16 @@ class PlayerRpcObject(JsonRpcObject): def get_active_players(self, params: dict[str, Any]) -> Any: if not self.is_active: return [] - return [{"playerid":1,"playertype":"internal","type":"video"}] + return [{"playerid": 1, "playertype": "internal", "type": "video"}] def play_pause(self, params: dict[str, Any]) -> Any: if not self.is_active: self.logger.warn("Trying to toggle player that is not running") return "OK" if params["play"] != "toggle": - self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"]) + self.logger.warn( + "Trying to call PlayPause with unknown play: '%s'", params["play"] + ) return "Err" playing = self._active_program.is_playing() @@ -167,40 +170,40 @@ class PlayerRpcObject(JsonRpcObject): # Find out what player class to use scheme_configuration = config.players.get(url.scheme) if scheme_configuration is None: - I.get("DataBridge").notification.emit(f"No player available for {url.scheme}") + I.get("DataBridge").notification.emit( + f"No player available for {url.scheme}" + ) self.logger.warn("Client requested unknown scheme: '%s'", url.scheme) - return { - "error": "invalid protocol" - } + return {"error": "invalid protocol"} player_class_name = scheme_configuration.get(url.netloc) if player_class_name is None: player_class_name = scheme_configuration.get("*") if player_class_name is None: - I.get("DataBridge").notification.emit(f"No player available for {url.netloc}") + I.get("DataBridge").notification.emit( + f"No player available for {url.netloc}" + ) self.logger.warn("No player was picked for url '%s'", url) - return { - "error": "invalid protocol" - } + return {"error": "invalid protocol"} # Try to import the class *module_parts, class_name = player_class_name.split(".") module_name = ".".join(module_parts) program_cls = None if module_name not in sys.modules: - self.logger.debug("Trying to import %s to get class %s", module_name, class_name) + self.logger.debug( + "Trying to import %s to get class %s", module_name, class_name + ) spec = importlib.util.find_spec(module) module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) - program_cls = getattr(sys.modules[module_name], class_name, None) + program_cls = getattr(sys.modules[module_name], class_name, None) if program_cls is None: I.get("DataBridge").notification.emit("Could not start player") self.logger.warn("Class %s not found in module %s", class_name, module_name) - return { - "error": "invalid protocol" - } - + return {"error": "invalid protocol"} + if self._active_program is not None: if isinstance(self._active_program, program_cls): self._active_program.stop("playerrpcobject.open") @@ -224,9 +227,7 @@ class PlayerRpcObject(JsonRpcObject): return name if self.is_active else "" player_info = self._active_program.get_player_info() - item = { - prop: _get_property(prop, player_info) for prop in params["properties"] - } + item = {prop: _get_property(prop, player_info) for prop in params["properties"]} return { "item": { **item, @@ -235,13 +236,19 @@ class PlayerRpcObject(JsonRpcObject): } def get_properties(self, params: dict[str, Any]) -> Any: - def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]: + def _get_property( + name: str, player_info: PlayerInfo + ) -> str | bool | dict[str, Any]: if name == "currentsubtitle": return None elif name == "partymode": return False elif name == "percentage": - return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0 + return ( + player_info.position / player_info.runtime + if player_info.runtime > 0 + else 0.0 + ) elif name == "playlistid": return 0 elif name == "position": @@ -281,10 +288,10 @@ class PlayerRpcObject(JsonRpcObject): else: self.logger.error("Unknown param '%s'", name) return "Unknown" + player_info = self._active_program.get_player_info() - return { - prop: _get_property(prop, player_info) for prop in params["properties"] - } + return {prop: _get_property(prop, player_info) for prop in params["properties"]} + class PlaylistRpcObject(JsonRpcObject): def handle(self, method: str, params: dict[str, Any]) -> Any: @@ -297,15 +304,18 @@ class PlaylistRpcObject(JsonRpcObject): def clear(self, params: dict[str, Any]) -> Any: playlistid: int = params["playlistid"] - self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid) + self.logger.warn( + "Asked to empty playlist %d but we're just pretending to do so", playlistid + ) return "OK" def get_playlists(self, params: dict[str, Any]) -> Any: - return [{"playlistid":1,"type":"video"}] + return [{"playlistid": 1, "type": "video"}] def get_items(self, params: dict[str, Any]) -> Any: return "ERR" + class GlobalMethodHandler: objects: dict[str, JsonRpcObject] @@ -330,7 +340,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler): logger = logging.getLogger("JsonRpcHandler") logger.debug("GET %s", self.path) - def do_POST(self): logger = logging.getLogger("JsonRpcHandler") if self.path != "/jsonrpc": @@ -361,18 +370,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler): if authenticated: auth: str | None = self.headers.get("Authorization") if auth is None: - logger.warn("Client provided no Authorization header. Method: %s", method) + logger.warn( + "Client provided no Authorization header. Method: %s", method + ) self.send_error(401) return auth_type, auth_payload = auth.split(" ") if auth_type != "Basic": - logger.warn("Client provided no Basic authorization. Method: %s", method) + logger.warn( + "Client provided no Basic authorization. Method: %s", method + ) self.send_error(401) return credentials = base64.b64decode(auth_payload).decode("utf-8") if credentials != "kodi:1234": - logger.warn("Rejecting request due to wrong credentials. Method: %s", method) + logger.warn( + "Rejecting request due to wrong credentials. Method: %s", method + ) self.send_error(403) return @@ -391,4 +406,4 @@ class JsonRpcHandler(BaseHTTPRequestHandler): self.wfile.write(response_body) self.wfile.flush() - self.log_request(code="200") \ No newline at end of file + self.log_request(code="200") diff --git a/microkodi/main.py b/microkodi/main.py index d8ff12a..a39b9f5 100644 --- a/microkodi/main.py +++ b/microkodi/main.py @@ -32,7 +32,7 @@ def run_kodi_server(): logger.info("Loading scripts...") for name, path in config.scripts.items(): - logger.debug("Trying to load %s (%s)", name, path) + logger.debug("Trying to load %s (%s)", name, path) if not Path(path).exists(): logger.warning("Failed to load %s: File does not exist", name) continue @@ -51,14 +51,19 @@ def run_kodi_server(): init_method = getattr(module, "init") init_method() - - server = HTTPServer - httpd = HTTPServer((config.host, config.port,), JsonRpcHandler) + httpd = HTTPServer( + ( + config.host, + config.port, + ), + JsonRpcHandler, + ) logger.info("Starting server on %s:%i", config.host, config.port) httpd.serve_forever() logger.info("Shutting down server") + def main(): parser = argparse.ArgumentParser() parser.add_argument("--config", "-c", type=Path, help="Location of the config file") @@ -99,5 +104,6 @@ def main(): del engine sys.exit(exit_code) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/microkodi/process.py b/microkodi/process.py index a6320fe..cdb4bab 100644 --- a/microkodi/process.py +++ b/microkodi/process.py @@ -2,12 +2,13 @@ from threading import Thread import subprocess from typing import Callable + def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen: def _run(process: subprocess.Popen): - process.wait() + process.wait() when_done() process = subprocess.Popen(args) t = Thread(target=_run, args=(process,)) t.start() - return process \ No newline at end of file + return process diff --git a/microkodi/programs/__init__.py b/microkodi/programs/__init__.py index 8aa854b..525267d 100644 --- a/microkodi/programs/__init__.py +++ b/microkodi/programs/__init__.py @@ -4,6 +4,7 @@ import logging from microkodi.helpers import KodiTime + @dataclass class PlayerInfo: runtime: int @@ -11,6 +12,7 @@ class PlayerInfo: title: str playing: bool + class Program: logger: logging.Logger @@ -42,4 +44,4 @@ class Program: raise NotImplementedError() def seek(self, timestamp: KodiTime): - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() diff --git a/microkodi/programs/mpv.py b/microkodi/programs/mpv.py index ff0f026..d61891d 100644 --- a/microkodi/programs/mpv.py +++ b/microkodi/programs/mpv.py @@ -12,6 +12,7 @@ from microkodi.config import Config from microkodi.programs import PlayerInfo, Program from microkodi.repository import I + class MpvConfig: # Mapping of file extensions to a potential transformer function _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] @@ -19,14 +20,23 @@ class MpvConfig: def __init__(self): self._ext_map = {} - def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): - logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext) + def register_ext_transformer( + self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]] + ): + logging.getLogger("MpvConfig").debug( + "Registering transformer for .%s URLs", ext + ) self._ext_map[ext] = transformer - def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: + def get_ext_transformer( + self, ext: str + ) -> Callable[[ParseResult], tuple[list[str], str]] | None: return self._ext_map.get(ext) + + I.register("MpvConfig", MpvConfig()) + class MpvProgram(Program): _process: subprocess.Popen @@ -45,9 +55,14 @@ class MpvProgram(Program): def __mpv_command(self, args: list[str]) -> dict[str, Any]: ipc_logger = logging.getLogger("mpv-ipc") - command = json.dumps({ - "command": args, - }) + "\n" + command = ( + json.dumps( + { + "command": args, + } + ) + + "\n" + ) ipc_logger.debug("-> %s", command) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: try: @@ -61,7 +76,6 @@ class MpvProgram(Program): ipc_logger.debug("<- %s", resp_raw) return json.loads(resp_raw) - def __read_prop(self, prop: str, default: Any) -> Any: return self.__mpv_command(["get_property", prop]).get("data", default) @@ -138,4 +152,4 @@ class MpvProgram(Program): position=int(self.__read_prop("playback-time", "0")), title=self.__read_prop("media-title", ""), playing=not self.__read_prop("pause", False), - ) \ No newline at end of file + ) diff --git a/microkodi/programs/vlc.py b/microkodi/programs/vlc.py index 4ab9500..f15aea4 100644 --- a/microkodi/programs/vlc.py +++ b/microkodi/programs/vlc.py @@ -15,6 +15,7 @@ from microkodi.helpers import KodiTime EVENT_PLAYER_EXIT = "post-player-exit" + class VlcConfig: # Mapping of file extensions to a potential transformer function _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] @@ -22,25 +23,37 @@ class VlcConfig: # Mapping of domains to a potential transformer function _domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] - _event_listeners: dict[str, Callable[[], None]] + _event_listeners: dict[str, Callable[[], None]] def __init__(self): self._ext_map = {} self._event_listeners = {} self._domain_map = {} - def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): - logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext) + def register_ext_transformer( + self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]] + ): + logging.getLogger("VlcConfig").debug( + "Registering transformer for .%s URLs", ext + ) self._ext_map[ext] = transformer - def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: + def get_ext_transformer( + self, ext: str + ) -> Callable[[ParseResult], tuple[list[str], str]] | None: return self._ext_map.get(ext) - def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): - logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain) + def register_domain_transformer( + self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]] + ): + logging.getLogger("VlcConfig").debug( + "Registering transformer for the %s domain", domain + ) self._domain_map[domain] = transformer - def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: + def get_domain_transformer( + self, domain: str + ) -> Callable[[ParseResult], tuple[list[str], str]] | None: return self._domain_map.get(domain) def register_event_listener(self, event: str, callback: Callable[[], None]): @@ -54,8 +67,11 @@ class VlcConfig: for listener in self._event_listeners.get(event, []): logger.debug("Running listener %s for event %s", listener.__name__, event) listener() + + I.register("VlcConfig", VlcConfig()) + class VlcProgram(Program): _process: subprocess.Popen @@ -139,7 +155,9 @@ class VlcProgram(Program): I.get("DataBridge").notification.emit("VLC exited with an error") self._process = None - def __vlc_command(self, command: str, params: dict[str, Any] | None = None) -> str | None: + def __vlc_command( + self, command: str, params: dict[str, Any] | None = None + ) -> str | None: try: req = requests.get( f"http://127.0.0.1:9090/requests/status.xml", @@ -166,7 +184,7 @@ class VlcProgram(Program): "seek", { "val": timestamp.to_seconds(), - } + }, ) def exit(self): @@ -192,7 +210,9 @@ class VlcProgram(Program): root = dom.getElementsByTagName("root")[0] time = int(root.getElementsByTagName("time")[0].childNodes[0].data) length = int(root.getElementsByTagName("length")[0].childNodes[0].data) - playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing" + playing = ( + root.getElementsByTagName("state")[0].childNodes[0].data == "playing" + ) except Exception as ex: self.logger.warning("Failed to parse vlc API response: %s", ex) self.logger.debug("Response body: %s", req.text) @@ -209,4 +229,4 @@ class VlcProgram(Program): # TODO title="TODO: Find out", playing=playing, - ) \ No newline at end of file + ) diff --git a/microkodi/repository.py b/microkodi/repository.py index 7778804..d2bd0e1 100644 --- a/microkodi/repository.py +++ b/microkodi/repository.py @@ -1,5 +1,6 @@ from typing import Any + class _Repository: _data: dict[str, Any] @@ -12,4 +13,5 @@ class _Repository: def get(self, key: str) -> Any | None: return self._data.get(key) -I = _Repository() \ No newline at end of file + +I = _Repository() diff --git a/microkodi/tv.py b/microkodi/tv.py index 068d70b..32d2192 100644 --- a/microkodi/tv.py +++ b/microkodi/tv.py @@ -37,10 +37,9 @@ class TVHandler: tc.power_on() return config.cec - def turn_on_tv(self): if self.enable_power_if_configured(): self.logger.debug("Waiting 500ms for TV to get power...") time.sleep(500) - self.power_on_if_needed() \ No newline at end of file + self.power_on_if_needed() diff --git a/microkodi/ui/bridge.py b/microkodi/ui/bridge.py index 53b4766..4bfd00f 100644 --- a/microkodi/ui/bridge.py +++ b/microkodi/ui/bridge.py @@ -2,6 +2,7 @@ from datetime import datetime from PySide6.QtCore import QObject, Signal, Slot + class DataBridge(QObject): """Bridge between the QML app and the Python "host".""" @@ -19,4 +20,4 @@ class DataBridge(QObject): @Slot(result=str) def currentTime(self) -> str: - return datetime.now().strftime("%H:%M") \ No newline at end of file + return datetime.now().strftime("%H:%M")