Format using black

This commit is contained in:
PapaTutuWawa 2025-01-13 21:54:13 +00:00
parent 4f76ad144f
commit 7d91041091
11 changed files with 137 additions and 67 deletions

View File

@ -5,6 +5,7 @@ from typing import Any
from microkodi.helpers import recursive_dict_merge from microkodi.helpers import recursive_dict_merge
@dataclass @dataclass
class Config: class Config:
host: str host: str
@ -44,6 +45,7 @@ class Config:
# Webhook to trigger to turn on power to the TV # Webhook to trigger to turn on power to the TV
tv_power_webhook: str | None tv_power_webhook: str | None
def load_config(config_path: Path | None) -> Config: def load_config(config_path: Path | None) -> Config:
if config_path is None: if config_path is None:
config_data = {} config_data = {}

View File

@ -4,16 +4,22 @@ from functools import wraps
P = ParamSpec("P") P = ParamSpec("P")
T = TypeVar("T") T = TypeVar("T")
def after(func: Callable[[], None]) -> Callable[P, T]: def after(func: Callable[[], None]) -> Callable[P, T]:
"""Runs @func after the decorated function exits.""" """Runs @func after the decorated function exits."""
def decorator(f: Callable[P, T]) -> Callable[P, T]: def decorator(f: Callable[P, T]) -> Callable[P, T]:
def inner(*args: P.args, **kwargs: P.kwargs) -> T: def inner(*args: P.args, **kwargs: P.kwargs) -> T:
ret = f(*args, **kwargs) ret = f(*args, **kwargs)
func() func()
return ret return ret
return inner return inner
return decorator return decorator
@dataclass @dataclass
class KodiTime: class KodiTime:
hours: int hours: int
@ -23,6 +29,7 @@ class KodiTime:
def to_seconds(self) -> int: def to_seconds(self) -> int:
return self.hours * 3600 + self.minutes * 59 + self.seconds return self.hours * 3600 + self.minutes * 59 + self.seconds
def seconds_to_kodi_format(seconds: int) -> KodiTime: def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that.""" """Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600 hours = seconds // 3600
@ -42,6 +49,7 @@ def find(l: list[T], pred: Callable[[T], bool]) -> T | None:
return i return i
return None return None
def recursive_dict_merge(base: dict, update: dict) -> dict: def recursive_dict_merge(base: dict, update: dict) -> dict:
unique_keys = set([*base.keys(), *update.keys()]) unique_keys = set([*base.keys(), *update.keys()])
out = {} out = {}

View File

@ -12,6 +12,7 @@ from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo from microkodi.programs import Program, PlayerInfo
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]: def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return { return {
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -19,6 +20,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
"id": id, "id": id,
} }
class JsonRpcObject: class JsonRpcObject:
logger: logging.Logger logger: logging.Logger
@ -71,9 +73,8 @@ class ApplicationRpcObject:
return "LOL" return "LOL"
else: else:
return "Unknown" return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"] return {prop: _get_property(prop) for prop in params["properties"]}
}
class PlayerRpcObject(JsonRpcObject): class PlayerRpcObject(JsonRpcObject):
@ -122,14 +123,16 @@ class PlayerRpcObject(JsonRpcObject):
def get_active_players(self, params: dict[str, Any]) -> Any: def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active: if not self.is_active:
return [] return []
return [{"playerid":1,"playertype":"internal","type":"video"}] return [{"playerid": 1, "playertype": "internal", "type": "video"}]
def play_pause(self, params: dict[str, Any]) -> Any: def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active: if not self.is_active:
self.logger.warn("Trying to toggle player that is not running") self.logger.warn("Trying to toggle player that is not running")
return "OK" return "OK"
if params["play"] != "toggle": if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"]) self.logger.warn(
"Trying to call PlayPause with unknown play: '%s'", params["play"]
)
return "Err" return "Err"
playing = self._active_program.is_playing() playing = self._active_program.is_playing()
@ -167,27 +170,29 @@ class PlayerRpcObject(JsonRpcObject):
# Find out what player class to use # Find out what player class to use
scheme_configuration = config.players.get(url.scheme) scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None: if scheme_configuration is None:
I.get("DataBridge").notification.emit(f"No player available for {url.scheme}") I.get("DataBridge").notification.emit(
f"No player available for {url.scheme}"
)
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme) self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
player_class_name = scheme_configuration.get(url.netloc) player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None: if player_class_name is None:
player_class_name = scheme_configuration.get("*") player_class_name = scheme_configuration.get("*")
if player_class_name is None: if player_class_name is None:
I.get("DataBridge").notification.emit(f"No player available for {url.netloc}") I.get("DataBridge").notification.emit(
f"No player available for {url.netloc}"
)
self.logger.warn("No player was picked for url '%s'", url) self.logger.warn("No player was picked for url '%s'", url)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
# Try to import the class # Try to import the class
*module_parts, class_name = player_class_name.split(".") *module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts) module_name = ".".join(module_parts)
program_cls = None program_cls = None
if module_name not in sys.modules: if module_name not in sys.modules:
self.logger.debug("Trying to import %s to get class %s", module_name, class_name) self.logger.debug(
"Trying to import %s to get class %s", module_name, class_name
)
spec = importlib.util.find_spec(module) spec = importlib.util.find_spec(module)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module sys.modules[module_name] = module
@ -197,9 +202,7 @@ class PlayerRpcObject(JsonRpcObject):
if program_cls is None: if program_cls is None:
I.get("DataBridge").notification.emit("Could not start player") I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name) self.logger.warn("Class %s not found in module %s", class_name, module_name)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
if self._active_program is not None: if self._active_program is not None:
if isinstance(self._active_program, program_cls): if isinstance(self._active_program, program_cls):
@ -224,9 +227,7 @@ class PlayerRpcObject(JsonRpcObject):
return name if self.is_active else "" return name if self.is_active else ""
player_info = self._active_program.get_player_info() player_info = self._active_program.get_player_info()
item = { item = {prop: _get_property(prop, player_info) for prop in params["properties"]}
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return { return {
"item": { "item": {
**item, **item,
@ -235,13 +236,19 @@ class PlayerRpcObject(JsonRpcObject):
} }
def get_properties(self, params: dict[str, Any]) -> Any: def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]: def _get_property(
name: str, player_info: PlayerInfo
) -> str | bool | dict[str, Any]:
if name == "currentsubtitle": if name == "currentsubtitle":
return None return None
elif name == "partymode": elif name == "partymode":
return False return False
elif name == "percentage": elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0 return (
player_info.position / player_info.runtime
if player_info.runtime > 0
else 0.0
)
elif name == "playlistid": elif name == "playlistid":
return 0 return 0
elif name == "position": elif name == "position":
@ -281,10 +288,10 @@ class PlayerRpcObject(JsonRpcObject):
else: else:
self.logger.error("Unknown param '%s'", name) self.logger.error("Unknown param '%s'", name)
return "Unknown" return "Unknown"
player_info = self._active_program.get_player_info() player_info = self._active_program.get_player_info()
return { return {prop: _get_property(prop, player_info) for prop in params["properties"]}
prop: _get_property(prop, player_info) for prop in params["properties"]
}
class PlaylistRpcObject(JsonRpcObject): class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any: def handle(self, method: str, params: dict[str, Any]) -> Any:
@ -297,15 +304,18 @@ class PlaylistRpcObject(JsonRpcObject):
def clear(self, params: dict[str, Any]) -> Any: def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"] playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid) self.logger.warn(
"Asked to empty playlist %d but we're just pretending to do so", playlistid
)
return "OK" return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any: def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}] return [{"playlistid": 1, "type": "video"}]
def get_items(self, params: dict[str, Any]) -> Any: def get_items(self, params: dict[str, Any]) -> Any:
return "ERR" return "ERR"
class GlobalMethodHandler: class GlobalMethodHandler:
objects: dict[str, JsonRpcObject] objects: dict[str, JsonRpcObject]
@ -330,7 +340,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
logger = logging.getLogger("JsonRpcHandler") logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path) logger.debug("GET %s", self.path)
def do_POST(self): def do_POST(self):
logger = logging.getLogger("JsonRpcHandler") logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc": if self.path != "/jsonrpc":
@ -361,18 +370,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
if authenticated: if authenticated:
auth: str | None = self.headers.get("Authorization") auth: str | None = self.headers.get("Authorization")
if auth is None: if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method) logger.warn(
"Client provided no Authorization header. Method: %s", method
)
self.send_error(401) self.send_error(401)
return return
auth_type, auth_payload = auth.split(" ") auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic": if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method) logger.warn(
"Client provided no Basic authorization. Method: %s", method
)
self.send_error(401) self.send_error(401)
return return
credentials = base64.b64decode(auth_payload).decode("utf-8") credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234": if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method) logger.warn(
"Rejecting request due to wrong credentials. Method: %s", method
)
self.send_error(403) self.send_error(403)
return return

View File

@ -51,14 +51,19 @@ def run_kodi_server():
init_method = getattr(module, "init") init_method = getattr(module, "init")
init_method() init_method()
server = HTTPServer server = HTTPServer
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler) httpd = HTTPServer(
(
config.host,
config.port,
),
JsonRpcHandler,
)
logger.info("Starting server on %s:%i", config.host, config.port) logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever() httpd.serve_forever()
logger.info("Shutting down server") logger.info("Shutting down server")
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file") parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
@ -99,5 +104,6 @@ def main():
del engine del engine
sys.exit(exit_code) sys.exit(exit_code)
if __name__ == '__main__':
if __name__ == "__main__":
main() main()

View File

@ -2,6 +2,7 @@ from threading import Thread
import subprocess import subprocess
from typing import Callable from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen: def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen): def _run(process: subprocess.Popen):
process.wait() process.wait()

View File

@ -4,6 +4,7 @@ import logging
from microkodi.helpers import KodiTime from microkodi.helpers import KodiTime
@dataclass @dataclass
class PlayerInfo: class PlayerInfo:
runtime: int runtime: int
@ -11,6 +12,7 @@ class PlayerInfo:
title: str title: str
playing: bool playing: bool
class Program: class Program:
logger: logging.Logger logger: logging.Logger

View File

@ -12,6 +12,7 @@ from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I from microkodi.repository import I
class MpvConfig: class MpvConfig:
# Mapping of file extensions to a potential transformer function # Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -19,14 +20,23 @@ class MpvConfig:
def __init__(self): def __init__(self):
self._ext_map = {} self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_ext_transformer(
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext) self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("MpvConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext) return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig()) I.register("MpvConfig", MpvConfig())
class MpvProgram(Program): class MpvProgram(Program):
_process: subprocess.Popen _process: subprocess.Popen
@ -45,9 +55,14 @@ class MpvProgram(Program):
def __mpv_command(self, args: list[str]) -> dict[str, Any]: def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc") ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({ command = (
json.dumps(
{
"command": args, "command": args,
}) + "\n" }
)
+ "\n"
)
ipc_logger.debug("-> %s", command) ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try: try:
@ -61,7 +76,6 @@ class MpvProgram(Program):
ipc_logger.debug("<- %s", resp_raw) ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw) return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any: def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default) return self.__mpv_command(["get_property", prop]).get("data", default)

View File

@ -15,6 +15,7 @@ from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit" EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig: class VlcConfig:
# Mapping of file extensions to a potential transformer function # Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -29,18 +30,30 @@ class VlcConfig:
self._event_listeners = {} self._event_listeners = {}
self._domain_map = {} self._domain_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_ext_transformer(
logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext) self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext) return self._ext_map.get(ext)
def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_domain_transformer(
logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain) self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for the %s domain", domain
)
self._domain_map[domain] = transformer self._domain_map[domain] = transformer
def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_domain_transformer(
self, domain: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain) return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]): def register_event_listener(self, event: str, callback: Callable[[], None]):
@ -54,8 +67,11 @@ class VlcConfig:
for listener in self._event_listeners.get(event, []): for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event) logger.debug("Running listener %s for event %s", listener.__name__, event)
listener() listener()
I.register("VlcConfig", VlcConfig()) I.register("VlcConfig", VlcConfig())
class VlcProgram(Program): class VlcProgram(Program):
_process: subprocess.Popen _process: subprocess.Popen
@ -139,7 +155,9 @@ class VlcProgram(Program):
I.get("DataBridge").notification.emit("VLC exited with an error") I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None self._process = None
def __vlc_command(self, command: str, params: dict[str, Any] | None = None) -> str | None: def __vlc_command(
self, command: str, params: dict[str, Any] | None = None
) -> str | None:
try: try:
req = requests.get( req = requests.get(
f"http://127.0.0.1:9090/requests/status.xml", f"http://127.0.0.1:9090/requests/status.xml",
@ -166,7 +184,7 @@ class VlcProgram(Program):
"seek", "seek",
{ {
"val": timestamp.to_seconds(), "val": timestamp.to_seconds(),
} },
) )
def exit(self): def exit(self):
@ -192,7 +210,9 @@ class VlcProgram(Program):
root = dom.getElementsByTagName("root")[0] root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data) time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data) length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing" playing = (
root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
)
except Exception as ex: except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex) self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text) self.logger.debug("Response body: %s", req.text)

View File

@ -1,5 +1,6 @@
from typing import Any from typing import Any
class _Repository: class _Repository:
_data: dict[str, Any] _data: dict[str, Any]
@ -12,4 +13,5 @@ class _Repository:
def get(self, key: str) -> Any | None: def get(self, key: str) -> Any | None:
return self._data.get(key) return self._data.get(key)
I = _Repository() I = _Repository()

View File

@ -37,7 +37,6 @@ class TVHandler:
tc.power_on() tc.power_on()
return config.cec return config.cec
def turn_on_tv(self): def turn_on_tv(self):
if self.enable_power_if_configured(): if self.enable_power_if_configured():
self.logger.debug("Waiting 500ms for TV to get power...") self.logger.debug("Waiting 500ms for TV to get power...")

View File

@ -2,6 +2,7 @@ from datetime import datetime
from PySide6.QtCore import QObject, Signal, Slot from PySide6.QtCore import QObject, Signal, Slot
class DataBridge(QObject): class DataBridge(QObject):
"""Bridge between the QML app and the Python "host".""" """Bridge between the QML app and the Python "host"."""