Compare commits

..

10 Commits
cec ... master

15 changed files with 236 additions and 111 deletions

5
.gitignore vendored
View File

@ -1,7 +1,12 @@
# Python
**/*.egg-info **/*.egg-info
**/__pycache__/ **/__pycache__/
.venv/ .venv/
# Tooling
.mypy_cache/
.ruff_cache/
# Testing # Testing
config.json config.json
scripts/ scripts/

View File

@ -1,10 +0,0 @@
import cec
class CECHandler:
def __init__(self):
cec.init()
def power_on_if_needed(self):
tc = cec.Device(cec.CECDEVICE_TV)
if not tc.is_on():
tc.power_on()

View File

@ -5,6 +5,7 @@ from typing import Any
from microkodi.helpers import recursive_dict_merge from microkodi.helpers import recursive_dict_merge
@dataclass @dataclass
class Config: class Config:
host: str host: str
@ -40,7 +41,11 @@ class Config:
# Enables the use of CEC # Enables the use of CEC
cec: bool cec: bool
# Webhook to trigger to turn on power to the TV
tv_power_webhook: str | None
def load_config(config_path: Path | None) -> Config: def load_config(config_path: Path | None) -> Config:
if config_path is None: if config_path is None:
config_data = {} config_data = {}
@ -68,4 +73,5 @@ def load_config(config_path: Path | None) -> Config:
), ),
options=config_data.get("options", {}), options=config_data.get("options", {}),
cec=config_data.get("cec", False), cec=config_data.get("cec", False),
tv_power_webhook=config_data.get("tv_power_webhook"),
) )

View File

@ -1,19 +1,24 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import TypeVar, ParamSpec, Callable from typing import TypeVar, ParamSpec, Callable
from functools import wraps
P = ParamSpec("P") P = ParamSpec("P")
T = TypeVar("T") T = TypeVar("T")
def after(func: Callable[[], None]) -> Callable[P, T]: def after(func: Callable[[], None]) -> Callable[P, T]:
"""Runs @func after the decorated function exits.""" """Runs @func after the decorated function exits."""
def decorator(f: Callable[P, T]) -> Callable[P, T]: def decorator(f: Callable[P, T]) -> Callable[P, T]:
def inner(*args: P.args, **kwargs: P.kwargs) -> T: def inner(*args: P.args, **kwargs: P.kwargs) -> T:
ret = f(*args, **kwargs) ret = f(*args, **kwargs)
func() func()
return ret return ret
return inner return inner
return decorator return decorator
@dataclass @dataclass
class KodiTime: class KodiTime:
hours: int hours: int
@ -23,6 +28,7 @@ class KodiTime:
def to_seconds(self) -> int: def to_seconds(self) -> int:
return self.hours * 3600 + self.minutes * 59 + self.seconds return self.hours * 3600 + self.minutes * 59 + self.seconds
def seconds_to_kodi_format(seconds: int) -> KodiTime: def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that.""" """Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600 hours = seconds // 3600
@ -36,12 +42,6 @@ def seconds_to_kodi_format(seconds: int) -> KodiTime:
) )
def find(l: list[T], pred: Callable[[T], bool]) -> T | None:
for i in l:
if pred(i):
return i
return None
def recursive_dict_merge(base: dict, update: dict) -> dict: def recursive_dict_merge(base: dict, update: dict) -> dict:
unique_keys = set([*base.keys(), *update.keys()]) unique_keys = set([*base.keys(), *update.keys()])
out = {} out = {}
@ -55,4 +55,4 @@ def recursive_dict_merge(base: dict, update: dict) -> dict:
out[key] = update[key] out[key] = update[key]
else: else:
out[key] = recursive_dict_merge(base[key], update[key]) out[key] = recursive_dict_merge(base[key], update[key])
return out return out

View File

@ -1,16 +1,18 @@
import json import json
from typing import Any, Callable from typing import Any
from dataclasses import dataclass
from urllib.parse import urlparse from urllib.parse import urlparse
import logging import logging
import base64 import base64
import sys import sys
import importlib
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
from microkodi.repository import I from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo from microkodi.programs import Program, PlayerInfo
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
from microkodi.config import Config
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]: def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return { return {
@ -19,6 +21,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
"id": id, "id": id,
} }
class JsonRpcObject: class JsonRpcObject:
logger: logging.Logger logger: logging.Logger
@ -71,9 +74,8 @@ class ApplicationRpcObject:
return "LOL" return "LOL"
else: else:
return "Unknown" return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"] return {prop: _get_property(prop) for prop in params["properties"]}
}
class PlayerRpcObject(JsonRpcObject): class PlayerRpcObject(JsonRpcObject):
@ -122,14 +124,16 @@ class PlayerRpcObject(JsonRpcObject):
def get_active_players(self, params: dict[str, Any]) -> Any: def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active: if not self.is_active:
return [] return []
return [{"playerid":1,"playertype":"internal","type":"video"}] return [{"playerid": 1, "playertype": "internal", "type": "video"}]
def play_pause(self, params: dict[str, Any]) -> Any: def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active: if not self.is_active:
self.logger.warn("Trying to toggle player that is not running") self.logger.warn("Trying to toggle player that is not running")
return "OK" return "OK"
if params["play"] != "toggle": if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"]) self.logger.warn(
"Trying to call PlayPause with unknown play: '%s'", params["play"]
)
return "Err" return "Err"
playing = self._active_program.is_playing() playing = self._active_program.is_playing()
@ -153,8 +157,7 @@ class PlayerRpcObject(JsonRpcObject):
def open(self, params: dict[str, Any]) -> Any: def open(self, params: dict[str, Any]) -> Any:
# Turn on the TV # Turn on the TV
config: Config = I.get("Config") config: Config = I.get("Config")
if config.cec: I.get("TVHandler").turn_on_tv()
I.get("CECHandler").power_on_if_needed()
I.get("DataBridge").set_loading(True) I.get("DataBridge").set_loading(True)
url = urlparse(params["item"]["file"]) url = urlparse(params["item"]["file"])
@ -168,40 +171,40 @@ class PlayerRpcObject(JsonRpcObject):
# Find out what player class to use # Find out what player class to use
scheme_configuration = config.players.get(url.scheme) scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None: if scheme_configuration is None:
I.get("DataBridge").notification.emit(f"No player available for {url.scheme}") I.get("DataBridge").notification.emit(
f"No player available for {url.scheme}"
)
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme) self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
player_class_name = scheme_configuration.get(url.netloc) player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None: if player_class_name is None:
player_class_name = scheme_configuration.get("*") player_class_name = scheme_configuration.get("*")
if player_class_name is None: if player_class_name is None:
I.get("DataBridge").notification.emit(f"No player available for {url.netloc}") I.get("DataBridge").notification.emit(
f"No player available for {url.netloc}"
)
self.logger.warn("No player was picked for url '%s'", url) self.logger.warn("No player was picked for url '%s'", url)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
# Try to import the class # Try to import the class
*module_parts, class_name = player_class_name.split(".") *module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts) module_name = ".".join(module_parts)
program_cls = None program_cls = None
if module_name not in sys.modules: if module_name not in sys.modules:
self.logger.debug("Trying to import %s to get class %s", module_name, class_name) self.logger.debug(
spec = importlib.util.find_spec(module) "Trying to import %s to get class %s", module_name, class_name
)
spec = importlib.util.find_spec(module_name)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module sys.modules[module_name] = module
spec.loader.exec_module(module) spec.loader.exec_module(module)
program_cls = getattr(sys.modules[module_name], class_name, None) program_cls = getattr(sys.modules[module_name], class_name, None)
if program_cls is None: if program_cls is None:
I.get("DataBridge").notification.emit("Could not start player") I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name) self.logger.warn("Class %s not found in module %s", class_name, module_name)
return { return {"error": "invalid protocol"}
"error": "invalid protocol"
}
if self._active_program is not None: if self._active_program is not None:
if isinstance(self._active_program, program_cls): if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open") self._active_program.stop("playerrpcobject.open")
@ -225,9 +228,7 @@ class PlayerRpcObject(JsonRpcObject):
return name if self.is_active else "" return name if self.is_active else ""
player_info = self._active_program.get_player_info() player_info = self._active_program.get_player_info()
item = { item = {prop: _get_property(prop, player_info) for prop in params["properties"]}
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return { return {
"item": { "item": {
**item, **item,
@ -236,13 +237,19 @@ class PlayerRpcObject(JsonRpcObject):
} }
def get_properties(self, params: dict[str, Any]) -> Any: def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]: def _get_property(
name: str, player_info: PlayerInfo
) -> str | bool | dict[str, Any]:
if name == "currentsubtitle": if name == "currentsubtitle":
return None return None
elif name == "partymode": elif name == "partymode":
return False return False
elif name == "percentage": elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0 return (
player_info.position / player_info.runtime
if player_info.runtime > 0
else 0.0
)
elif name == "playlistid": elif name == "playlistid":
return 0 return 0
elif name == "position": elif name == "position":
@ -282,10 +289,10 @@ class PlayerRpcObject(JsonRpcObject):
else: else:
self.logger.error("Unknown param '%s'", name) self.logger.error("Unknown param '%s'", name)
return "Unknown" return "Unknown"
player_info = self._active_program.get_player_info() player_info = self._active_program.get_player_info()
return { return {prop: _get_property(prop, player_info) for prop in params["properties"]}
prop: _get_property(prop, player_info) for prop in params["properties"]
}
class PlaylistRpcObject(JsonRpcObject): class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any: def handle(self, method: str, params: dict[str, Any]) -> Any:
@ -298,15 +305,18 @@ class PlaylistRpcObject(JsonRpcObject):
def clear(self, params: dict[str, Any]) -> Any: def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"] playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid) self.logger.warn(
"Asked to empty playlist %d but we're just pretending to do so", playlistid
)
return "OK" return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any: def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}] return [{"playlistid": 1, "type": "video"}]
def get_items(self, params: dict[str, Any]) -> Any: def get_items(self, params: dict[str, Any]) -> Any:
return "ERR" return "ERR"
class GlobalMethodHandler: class GlobalMethodHandler:
objects: dict[str, JsonRpcObject] objects: dict[str, JsonRpcObject]
@ -331,7 +341,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
logger = logging.getLogger("JsonRpcHandler") logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path) logger.debug("GET %s", self.path)
def do_POST(self): def do_POST(self):
logger = logging.getLogger("JsonRpcHandler") logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc": if self.path != "/jsonrpc":
@ -362,18 +371,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
if authenticated: if authenticated:
auth: str | None = self.headers.get("Authorization") auth: str | None = self.headers.get("Authorization")
if auth is None: if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method) logger.warn(
"Client provided no Authorization header. Method: %s", method_name
)
self.send_error(401) self.send_error(401)
return return
auth_type, auth_payload = auth.split(" ") auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic": if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method) logger.warn(
"Client provided no Basic authorization. Method: %s", method_name
)
self.send_error(401) self.send_error(401)
return return
credentials = base64.b64decode(auth_payload).decode("utf-8") credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234": if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method) logger.warn(
"Rejecting request due to wrong credentials. Method: %s", method_name
)
self.send_error(403) self.send_error(403)
return return
@ -392,4 +407,4 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
self.wfile.write(response_body) self.wfile.write(response_body)
self.wfile.flush() self.wfile.flush()
self.log_request(code="200") self.log_request(code="200")

View File

@ -5,7 +5,6 @@ import threading
import argparse import argparse
from pathlib import Path from pathlib import Path
import importlib.util import importlib.util
import time
from PySide6.QtGui import QGuiApplication from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine from PySide6.QtQml import QQmlApplicationEngine
@ -13,7 +12,7 @@ from PySide6.QtQml import QQmlApplicationEngine
from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler
from microkodi.ui.bridge import DataBridge from microkodi.ui.bridge import DataBridge
from microkodi.config import Config, load_config from microkodi.config import Config, load_config
from microkodi.cec_handler import CECHandler from microkodi.cec_handler import TVHandler
from microkodi.repository import I from microkodi.repository import I
@ -25,16 +24,14 @@ def run_kodi_server():
I.register("GlobalMethodHandler", method_handler) I.register("GlobalMethodHandler", method_handler)
# Setup CEC # Setup CEC
if config.cec: I.register("TVHandler", TVHandler())
I.register("CECHandler", CECHandler())
logger.info("Enabling CEC support")
# Load extra plugins # Load extra plugins
if config.scripts: if config.scripts:
logger.info("Loading scripts...") logger.info("Loading scripts...")
for name, path in config.scripts.items(): for name, path in config.scripts.items():
logger.debug("Trying to load %s (%s)", name, path) logger.debug("Trying to load %s (%s)", name, path)
if not Path(path).exists(): if not Path(path).exists():
logger.warning("Failed to load %s: File does not exist", name) logger.warning("Failed to load %s: File does not exist", name)
continue continue
@ -53,22 +50,25 @@ def run_kodi_server():
init_method = getattr(module, "init") init_method = getattr(module, "init")
init_method() init_method()
httpd = HTTPServer(
(
server = HTTPServer config.host,
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler) config.port,
),
JsonRpcHandler,
)
logger.info("Starting server on %s:%i", config.host, config.port) logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever() httpd.serve_forever()
logger.info("Shutting down server") logger.info("Shutting down server")
if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file") parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
parser.add_argument("--debug", action="store_true", default=False) parser.add_argument("--debug", action="store_true", default=False)
options = parser.parse_args() options = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.debug else logging.INFO) logging.basicConfig(level=logging.DEBUG if options.debug else logging.INFO)
logger = logging.getLogger("ui")
# Load the config # Load the config
config = load_config(options.config) config = load_config(options.config)
@ -99,4 +99,8 @@ if __name__ == "__main__":
engine.rootObjects()[0].setProperty("bridge", bridge) engine.rootObjects()[0].setProperty("bridge", bridge)
exit_code = app.exec() exit_code = app.exec()
del engine del engine
sys.exit(exit_code) sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@ -2,12 +2,13 @@ from threading import Thread
import subprocess import subprocess
from typing import Callable from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen: def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen): def _run(process: subprocess.Popen):
process.wait() process.wait()
when_done() when_done()
process = subprocess.Popen(args) process = subprocess.Popen(args)
t = Thread(target=_run, args=(process,)) t = Thread(target=_run, args=(process,))
t.start() t.start()
return process return process

View File

@ -4,6 +4,7 @@ import logging
from microkodi.helpers import KodiTime from microkodi.helpers import KodiTime
@dataclass @dataclass
class PlayerInfo: class PlayerInfo:
runtime: int runtime: int
@ -11,6 +12,7 @@ class PlayerInfo:
title: str title: str
playing: bool playing: bool
class Program: class Program:
logger: logging.Logger logger: logging.Logger
@ -42,4 +44,4 @@ class Program:
raise NotImplementedError() raise NotImplementedError()
def seek(self, timestamp: KodiTime): def seek(self, timestamp: KodiTime):
raise NotImplementedError() raise NotImplementedError()

View File

@ -1,8 +1,7 @@
import subprocess import subprocess
from typing import Any, Callable from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse, urldefrag, unquote from urllib.parse import ParseResult, urlunparse
import logging import logging
from dataclasses import dataclass
import os import os
import json import json
import socket import socket
@ -12,6 +11,7 @@ from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I from microkodi.repository import I
class MpvConfig: class MpvConfig:
# Mapping of file extensions to a potential transformer function # Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -19,14 +19,23 @@ class MpvConfig:
def __init__(self): def __init__(self):
self._ext_map = {} self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_ext_transformer(
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext) self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("MpvConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext) return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig()) I.register("MpvConfig", MpvConfig())
class MpvProgram(Program): class MpvProgram(Program):
_process: subprocess.Popen _process: subprocess.Popen
@ -45,9 +54,14 @@ class MpvProgram(Program):
def __mpv_command(self, args: list[str]) -> dict[str, Any]: def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc") ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({ command = (
"command": args, json.dumps(
}) + "\n" {
"command": args,
}
)
+ "\n"
)
ipc_logger.debug("-> %s", command) ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try: try:
@ -61,7 +75,6 @@ class MpvProgram(Program):
ipc_logger.debug("<- %s", resp_raw) ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw) return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any: def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default) return self.__mpv_command(["get_property", prop]).get("data", default)
@ -138,4 +151,4 @@ class MpvProgram(Program):
position=int(self.__read_prop("playback-time", "0")), position=int(self.__read_prop("playback-time", "0")),
title=self.__read_prop("media-title", ""), title=self.__read_prop("media-title", ""),
playing=not self.__read_prop("pause", False), playing=not self.__read_prop("pause", False),
) )

View File

@ -15,6 +15,7 @@ from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit" EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig: class VlcConfig:
# Mapping of file extensions to a potential transformer function # Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] _ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -22,25 +23,37 @@ class VlcConfig:
# Mapping of domains to a potential transformer function # Mapping of domains to a potential transformer function
_domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]] _domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
_event_listeners: dict[str, Callable[[], None]] _event_listeners: dict[str, Callable[[], None]]
def __init__(self): def __init__(self):
self._ext_map = {} self._ext_map = {}
self._event_listeners = {} self._event_listeners = {}
self._domain_map = {} self._domain_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_ext_transformer(
logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext) self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext) return self._ext_map.get(ext)
def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]): def register_domain_transformer(
logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain) self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for the %s domain", domain
)
self._domain_map[domain] = transformer self._domain_map[domain] = transformer
def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None: def get_domain_transformer(
self, domain: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain) return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]): def register_event_listener(self, event: str, callback: Callable[[], None]):
@ -54,8 +67,11 @@ class VlcConfig:
for listener in self._event_listeners.get(event, []): for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event) logger.debug("Running listener %s for event %s", listener.__name__, event)
listener() listener()
I.register("VlcConfig", VlcConfig()) I.register("VlcConfig", VlcConfig())
class VlcProgram(Program): class VlcProgram(Program):
_process: subprocess.Popen _process: subprocess.Popen
@ -139,10 +155,12 @@ class VlcProgram(Program):
I.get("DataBridge").notification.emit("VLC exited with an error") I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None self._process = None
def __vlc_command(self, command: str, params: dict[str, Any] | None = None) -> str | None: def __vlc_command(
self, command: str, params: dict[str, Any] | None = None
) -> str | None:
try: try:
req = requests.get( req = requests.get(
f"http://127.0.0.1:9090/requests/status.xml", "http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password), auth=("", self._vlc_password),
params={ params={
"command": command, "command": command,
@ -166,7 +184,7 @@ class VlcProgram(Program):
"seek", "seek",
{ {
"val": timestamp.to_seconds(), "val": timestamp.to_seconds(),
} },
) )
def exit(self): def exit(self):
@ -192,7 +210,9 @@ class VlcProgram(Program):
root = dom.getElementsByTagName("root")[0] root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data) time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data) length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing" playing = (
root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
)
except Exception as ex: except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex) self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text) self.logger.debug("Response body: %s", req.text)
@ -209,4 +229,4 @@ class VlcProgram(Program):
# TODO # TODO
title="TODO: Find out", title="TODO: Find out",
playing=playing, playing=playing,
) )

View File

@ -1,5 +1,6 @@
from typing import Any from typing import Any
class _Repository: class _Repository:
_data: dict[str, Any] _data: dict[str, Any]
@ -12,4 +13,5 @@ class _Repository:
def get(self, key: str) -> Any | None: def get(self, key: str) -> Any | None:
return self._data.get(key) return self._data.get(key)
I = _Repository()
I = _Repository() # noqa:E741

45
microkodi/tv.py Normal file
View File

@ -0,0 +1,45 @@
import logging
import time
import cec
import requests
from microkodi.config import Config
from microkodi.repository import I
class TVHandler:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger("TVHandler")
config: Config = I.get("Config")
if config.cec:
cec.init()
self.logger.debug("CEC initialised")
def enable_power_if_configured(self) -> bool:
config: Config = I.get("Config")
if config.tv_power_webhook is not None:
try:
requests.put(config.tv_power_webhook)
self.logger.debug("Triggered webhook to enable power to the TV")
except Exception:
self.logger.warn("Failed to enable power to the TV")
return config.tv_power_webhook is not None
def power_on_if_needed(self) -> bool:
config: Config = I.get("Config")
if config.cec:
tc = cec.Device(cec.CECDEVICE_TV)
if not tc.is_on():
tc.power_on()
return config.cec
def turn_on_tv(self):
if self.enable_power_if_configured():
self.logger.debug("Waiting 500ms for TV to get power...")
time.sleep(500)
self.power_on_if_needed()

View File

@ -2,6 +2,7 @@ from datetime import datetime
from PySide6.QtCore import QObject, Signal, Slot from PySide6.QtCore import QObject, Signal, Slot
class DataBridge(QObject): class DataBridge(QObject):
"""Bridge between the QML app and the Python "host".""" """Bridge between the QML app and the Python "host"."""
@ -19,4 +20,4 @@ class DataBridge(QObject):
@Slot(result=str) @Slot(result=str)
def currentTime(self) -> str: def currentTime(self) -> str:
return datetime.now().strftime("%H:%M") return datetime.now().strftime("%H:%M")

View File

@ -4,9 +4,22 @@ version = "0.1.0"
dependencies = [ dependencies = [
"pyside6", "pyside6",
"requests", "requests",
"yt-dlp",
"cec" "cec"
] ]
requires-python = ">= 3.11"
[tools.build] [project.optional-dependencies]
packages = ["microkodi"] youtube = [
"yt-dlp"
]
dev = [
"mypy",
"black",
"ruff"
]
[project.scripts]
microkodi = "microkodi.main:main"
[tool.setuptools.packages.find]
exclude = ["contrib"]

View File

@ -7,10 +7,8 @@ from microkodi.helpers import recursive_dict_merge
import yt_dlp import yt_dlp
DEFAULT_CONFIG = { DEFAULT_CONFIG = {"format": "bestvideo[width<=1920]+bestaudio", "ytdlp_options": {}}
"format": "bestvideo[width<=1920]+bestaudio",
"ytdlp_options": {}
}
def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]: def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
logger = logging.getLogger("Youtube") logger = logging.getLogger("Youtube")
@ -24,21 +22,26 @@ def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
with yt_dlp.YoutubeDL(opts) as ytdl: with yt_dlp.YoutubeDL(opts) as ytdl:
info = ytdl.extract_info(urlunparse(url), download=False) info = ytdl.extract_info(urlunparse(url), download=False)
user_agent = None #user_agent = None
audio_url = None audio_url = None
video_url = None video_url = None
for format in info["requested_formats"]: for format in info["requested_formats"]:
if format["width"] is None: if format["width"] is None:
audio_url = format["url"] audio_url = format["url"]
else: else:
user_agent = format["http_headers"]["User-Agent"] #user_agent = format["http_headers"]["User-Agent"]
video_url = format["url"] video_url = format["url"]
args = [ args = (
f'--input-slave={audio_url}', [
] if audio_url else None f"--input-slave={audio_url}",
]
if audio_url
else None
)
return args, urlparse(video_url) return args, urlparse(video_url)
def init(): def init():
# Create the config # Create the config
config: Config = I.get("Config") config: Config = I.get("Config")
@ -53,4 +56,9 @@ def init():
# Register the transformers # Register the transformers
I.get("VlcConfig").register_domain_transformer("youtu.be", youtube_url_transformer) I.get("VlcConfig").register_domain_transformer("youtu.be", youtube_url_transformer)
I.get("VlcConfig").register_domain_transformer("www.youtube.com", youtube_url_transformer) I.get("VlcConfig").register_domain_transformer(
"youtube.com", youtube_url_transformer
)
I.get("VlcConfig").register_domain_transformer(
"www.youtube.com", youtube_url_transformer
)