Compare commits

...

10 Commits
cec ... master

14 changed files with 246 additions and 96 deletions

5
.gitignore vendored
View File

@ -1,7 +1,12 @@
# Python
**/*.egg-info
**/__pycache__/
.venv/
# Tooling
.mypy_cache/
.ruff_cache/
# Testing
config.json
scripts/

View File

@ -5,6 +5,7 @@ from typing import Any
from microkodi.helpers import recursive_dict_merge
@dataclass
class Config:
host: str
@ -37,7 +38,14 @@ class Config:
# The entire configuration file for use in user scripts
options: dict[str, Any]
# Enables the use of CEC
cec: bool
# Webhook to trigger to turn on power to the TV
tv_power_webhook: str | None
def load_config(config_path: Path | None) -> Config:
if config_path is None:
config_data = {}
@ -64,4 +72,6 @@ def load_config(config_path: Path | None) -> Config:
config_data.get("players", {}),
),
options=config_data.get("options", {}),
cec=config_data.get("cec", False),
tv_power_webhook=config_data.get("tv_power_webhook"),
)

View File

@ -1,19 +1,24 @@
from dataclasses import dataclass
from typing import TypeVar, ParamSpec, Callable
from functools import wraps
P = ParamSpec("P")
T = TypeVar("T")
def after(func: Callable[[], None]) -> Callable[P, T]:
"""Runs @func after the decorated function exits."""
def decorator(f: Callable[P, T]) -> Callable[P, T]:
def inner(*args: P.args, **kwargs: P.kwargs) -> T:
ret = f(*args, **kwargs)
func()
return ret
return inner
return decorator
@dataclass
class KodiTime:
hours: int
@ -23,6 +28,7 @@ class KodiTime:
def to_seconds(self) -> int:
return self.hours * 3600 + self.minutes * 59 + self.seconds
def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600
@ -36,12 +42,6 @@ def seconds_to_kodi_format(seconds: int) -> KodiTime:
)
def find(l: list[T], pred: Callable[[T], bool]) -> T | None:
for i in l:
if pred(i):
return i
return None
def recursive_dict_merge(base: dict, update: dict) -> dict:
unique_keys = set([*base.keys(), *update.keys()])
out = {}
@ -55,4 +55,4 @@ def recursive_dict_merge(base: dict, update: dict) -> dict:
out[key] = update[key]
else:
out[key] = recursive_dict_merge(base[key], update[key])
return out
return out

View File

@ -1,16 +1,18 @@
import json
from typing import Any, Callable
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse
import logging
import base64
import sys
import importlib
from http.server import BaseHTTPRequestHandler
from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
from microkodi.config import Config
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return {
@ -19,6 +21,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
"id": id,
}
class JsonRpcObject:
logger: logging.Logger
@ -71,9 +74,8 @@ class ApplicationRpcObject:
return "LOL"
else:
return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"]
}
return {prop: _get_property(prop) for prop in params["properties"]}
class PlayerRpcObject(JsonRpcObject):
@ -122,14 +124,16 @@ class PlayerRpcObject(JsonRpcObject):
def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active:
return []
return [{"playerid":1,"playertype":"internal","type":"video"}]
return [{"playerid": 1, "playertype": "internal", "type": "video"}]
def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to toggle player that is not running")
return "OK"
if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"])
self.logger.warn(
"Trying to call PlayPause with unknown play: '%s'", params["play"]
)
return "Err"
playing = self._active_program.is_playing()
@ -151,6 +155,10 @@ class PlayerRpcObject(JsonRpcObject):
@after(lambda: I.get("DataBridge").set_loading(False))
def open(self, params: dict[str, Any]) -> Any:
# Turn on the TV
config: Config = I.get("Config")
I.get("TVHandler").turn_on_tv()
I.get("DataBridge").set_loading(True)
url = urlparse(params["item"]["file"])
@ -161,43 +169,42 @@ class PlayerRpcObject(JsonRpcObject):
url = urlparse(url.query)
# Find out what player class to use
config: Config = I.get("Config")
scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None:
I.get("DataBridge").notification.emit(f"No player available for {url.scheme}")
I.get("DataBridge").notification.emit(
f"No player available for {url.scheme}"
)
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None:
player_class_name = scheme_configuration.get("*")
if player_class_name is None:
I.get("DataBridge").notification.emit(f"No player available for {url.netloc}")
I.get("DataBridge").notification.emit(
f"No player available for {url.netloc}"
)
self.logger.warn("No player was picked for url '%s'", url)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
# Try to import the class
*module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts)
program_cls = None
if module_name not in sys.modules:
self.logger.debug("Trying to import %s to get class %s", module_name, class_name)
spec = importlib.util.find_spec(module)
self.logger.debug(
"Trying to import %s to get class %s", module_name, class_name
)
spec = importlib.util.find_spec(module_name)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
program_cls = getattr(sys.modules[module_name], class_name, None)
program_cls = getattr(sys.modules[module_name], class_name, None)
if program_cls is None:
I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
if self._active_program is not None:
if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open")
@ -221,9 +228,7 @@ class PlayerRpcObject(JsonRpcObject):
return name if self.is_active else ""
player_info = self._active_program.get_player_info()
item = {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
item = {prop: _get_property(prop, player_info) for prop in params["properties"]}
return {
"item": {
**item,
@ -232,13 +237,19 @@ class PlayerRpcObject(JsonRpcObject):
}
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]:
def _get_property(
name: str, player_info: PlayerInfo
) -> str | bool | dict[str, Any]:
if name == "currentsubtitle":
return None
elif name == "partymode":
return False
elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0
return (
player_info.position / player_info.runtime
if player_info.runtime > 0
else 0.0
)
elif name == "playlistid":
return 0
elif name == "position":
@ -278,10 +289,10 @@ class PlayerRpcObject(JsonRpcObject):
else:
self.logger.error("Unknown param '%s'", name)
return "Unknown"
player_info = self._active_program.get_player_info()
return {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return {prop: _get_property(prop, player_info) for prop in params["properties"]}
class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any:
@ -294,15 +305,18 @@ class PlaylistRpcObject(JsonRpcObject):
def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid)
self.logger.warn(
"Asked to empty playlist %d but we're just pretending to do so", playlistid
)
return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}]
return [{"playlistid": 1, "type": "video"}]
def get_items(self, params: dict[str, Any]) -> Any:
return "ERR"
class GlobalMethodHandler:
objects: dict[str, JsonRpcObject]
@ -327,7 +341,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path)
def do_POST(self):
logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc":
@ -358,18 +371,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
if authenticated:
auth: str | None = self.headers.get("Authorization")
if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method)
logger.warn(
"Client provided no Authorization header. Method: %s", method_name
)
self.send_error(401)
return
auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method)
logger.warn(
"Client provided no Basic authorization. Method: %s", method_name
)
self.send_error(401)
return
credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method)
logger.warn(
"Rejecting request due to wrong credentials. Method: %s", method_name
)
self.send_error(403)
return
@ -388,4 +407,4 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
self.wfile.write(response_body)
self.wfile.flush()
self.log_request(code="200")
self.log_request(code="200")

View File

@ -5,7 +5,6 @@ import threading
import argparse
from pathlib import Path
import importlib.util
import time
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
@ -13,6 +12,7 @@ from PySide6.QtQml import QQmlApplicationEngine
from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler
from microkodi.ui.bridge import DataBridge
from microkodi.config import Config, load_config
from microkodi.cec_handler import TVHandler
from microkodi.repository import I
@ -23,12 +23,15 @@ def run_kodi_server():
method_handler = GlobalMethodHandler()
I.register("GlobalMethodHandler", method_handler)
# Setup CEC
I.register("TVHandler", TVHandler())
# Load extra plugins
if config.scripts:
logger.info("Loading scripts...")
for name, path in config.scripts.items():
logger.debug("Trying to load %s (%s)", name, path)
logger.debug("Trying to load %s (%s)", name, path)
if not Path(path).exists():
logger.warning("Failed to load %s: File does not exist", name)
continue
@ -47,22 +50,25 @@ def run_kodi_server():
init_method = getattr(module, "init")
init_method()
server = HTTPServer
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler)
httpd = HTTPServer(
(
config.host,
config.port,
),
JsonRpcHandler,
)
logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever()
logger.info("Shutting down server")
if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
parser.add_argument("--debug", action="store_true", default=False)
options = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.debug else logging.INFO)
logger = logging.getLogger("ui")
# Load the config
config = load_config(options.config)
@ -93,4 +99,8 @@ if __name__ == "__main__":
engine.rootObjects()[0].setProperty("bridge", bridge)
exit_code = app.exec()
del engine
sys.exit(exit_code)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@ -2,12 +2,13 @@ from threading import Thread
import subprocess
from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen):
process.wait()
process.wait()
when_done()
process = subprocess.Popen(args)
t = Thread(target=_run, args=(process,))
t.start()
return process
return process

View File

@ -4,6 +4,7 @@ import logging
from microkodi.helpers import KodiTime
@dataclass
class PlayerInfo:
runtime: int
@ -11,6 +12,7 @@ class PlayerInfo:
title: str
playing: bool
class Program:
logger: logging.Logger
@ -42,4 +44,4 @@ class Program:
raise NotImplementedError()
def seek(self, timestamp: KodiTime):
raise NotImplementedError()
raise NotImplementedError()

View File

@ -1,8 +1,7 @@
import subprocess
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse, urldefrag, unquote
from urllib.parse import ParseResult, urlunparse
import logging
from dataclasses import dataclass
import os
import json
import socket
@ -12,6 +11,7 @@ from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
class MpvConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -19,14 +19,23 @@ class MpvConfig:
def __init__(self):
self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext)
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("MpvConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig())
class MpvProgram(Program):
_process: subprocess.Popen
@ -45,9 +54,14 @@ class MpvProgram(Program):
def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({
"command": args,
}) + "\n"
command = (
json.dumps(
{
"command": args,
}
)
+ "\n"
)
ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try:
@ -61,7 +75,6 @@ class MpvProgram(Program):
ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default)
@ -138,4 +151,4 @@ class MpvProgram(Program):
position=int(self.__read_prop("playback-time", "0")),
title=self.__read_prop("media-title", ""),
playing=not self.__read_prop("pause", False),
)
)

View File

@ -15,6 +15,7 @@ from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@ -22,25 +23,37 @@ class VlcConfig:
# Mapping of domains to a potential transformer function
_domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
_event_listeners: dict[str, Callable[[], None]]
_event_listeners: dict[str, Callable[[], None]]
def __init__(self):
self._ext_map = {}
self._event_listeners = {}
self._domain_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext)
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain)
def register_domain_transformer(
self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for the %s domain", domain
)
self._domain_map[domain] = transformer
def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_domain_transformer(
self, domain: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]):
@ -54,8 +67,11 @@ class VlcConfig:
for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event)
listener()
I.register("VlcConfig", VlcConfig())
class VlcProgram(Program):
_process: subprocess.Popen
@ -77,7 +93,7 @@ class VlcProgram(Program):
return self._process is not None and self._process.returncode is None
def is_playing(self) -> bool:
return self.get_player_info().is_playing
return self.get_player_info().playing
def resume(self):
self.__vlc_command("pl_pause")
@ -139,10 +155,12 @@ class VlcProgram(Program):
I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None
def __vlc_command(self, command: str, params: dict[str, Any] | None = None) -> str | None:
def __vlc_command(
self, command: str, params: dict[str, Any] | None = None
) -> str | None:
try:
req = requests.get(
f"http://127.0.0.1:9090/requests/status.xml",
"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
params={
"command": command,
@ -166,7 +184,7 @@ class VlcProgram(Program):
"seek",
{
"val": timestamp.to_seconds(),
}
},
)
def exit(self):
@ -192,7 +210,9 @@ class VlcProgram(Program):
root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
playing = (
root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
)
except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text)
@ -209,4 +229,4 @@ class VlcProgram(Program):
# TODO
title="TODO: Find out",
playing=playing,
)
)

View File

@ -1,5 +1,6 @@
from typing import Any
class _Repository:
_data: dict[str, Any]
@ -12,4 +13,5 @@ class _Repository:
def get(self, key: str) -> Any | None:
return self._data.get(key)
I = _Repository()
I = _Repository() # noqa:E741

45
microkodi/tv.py Normal file
View File

@ -0,0 +1,45 @@
import logging
import time
import cec
import requests
from microkodi.config import Config
from microkodi.repository import I
class TVHandler:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger("TVHandler")
config: Config = I.get("Config")
if config.cec:
cec.init()
self.logger.debug("CEC initialised")
def enable_power_if_configured(self) -> bool:
config: Config = I.get("Config")
if config.tv_power_webhook is not None:
try:
requests.put(config.tv_power_webhook)
self.logger.debug("Triggered webhook to enable power to the TV")
except Exception:
self.logger.warn("Failed to enable power to the TV")
return config.tv_power_webhook is not None
def power_on_if_needed(self) -> bool:
config: Config = I.get("Config")
if config.cec:
tc = cec.Device(cec.CECDEVICE_TV)
if not tc.is_on():
tc.power_on()
return config.cec
def turn_on_tv(self):
if self.enable_power_if_configured():
self.logger.debug("Waiting 500ms for TV to get power...")
time.sleep(500)
self.power_on_if_needed()

View File

@ -2,6 +2,7 @@ from datetime import datetime
from PySide6.QtCore import QObject, Signal, Slot
class DataBridge(QObject):
"""Bridge between the QML app and the Python "host"."""
@ -19,4 +20,4 @@ class DataBridge(QObject):
@Slot(result=str)
def currentTime(self) -> str:
return datetime.now().strftime("%H:%M")
return datetime.now().strftime("%H:%M")

View File

@ -4,8 +4,22 @@ version = "0.1.0"
dependencies = [
"pyside6",
"requests",
"cec"
]
requires-python = ">= 3.11"
[project.optional-dependencies]
youtube = [
"yt-dlp"
]
dev = [
"mypy",
"black",
"ruff"
]
[tools.build]
packages = ["microkodi"]
[project.scripts]
microkodi = "microkodi.main:main"
[tool.setuptools.packages.find]
exclude = ["contrib"]

View File

@ -7,10 +7,8 @@ from microkodi.helpers import recursive_dict_merge
import yt_dlp
DEFAULT_CONFIG = {
"format": "bestvideo[width<=1920]+bestaudio",
"ytdlp_options": {}
}
DEFAULT_CONFIG = {"format": "bestvideo[width<=1920]+bestaudio", "ytdlp_options": {}}
def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
logger = logging.getLogger("Youtube")
@ -24,21 +22,26 @@ def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
with yt_dlp.YoutubeDL(opts) as ytdl:
info = ytdl.extract_info(urlunparse(url), download=False)
user_agent = None
#user_agent = None
audio_url = None
video_url = None
for format in info["requested_formats"]:
if format["width"] is None:
audio_url = format["url"]
else:
user_agent = format["http_headers"]["User-Agent"]
#user_agent = format["http_headers"]["User-Agent"]
video_url = format["url"]
args = [
f'--input-slave={audio_url}',
] if audio_url else None
args = (
[
f"--input-slave={audio_url}",
]
if audio_url
else None
)
return args, urlparse(video_url)
def init():
# Create the config
config: Config = I.get("Config")
@ -53,4 +56,9 @@ def init():
# Register the transformers
I.get("VlcConfig").register_domain_transformer("youtu.be", youtube_url_transformer)
I.get("VlcConfig").register_domain_transformer("www.youtube.com", youtube_url_transformer)
I.get("VlcConfig").register_domain_transformer(
"youtube.com", youtube_url_transformer
)
I.get("VlcConfig").register_domain_transformer(
"www.youtube.com", youtube_url_transformer
)