Compare commits

..

26 Commits

Author SHA1 Message Date
c353ed5eb7 Format and lint youtube.py 2025-01-13 22:02:19 +00:00
71ee104658 Ignore tooling caches 2025-01-13 21:59:25 +00:00
fad2d4f312 Lint using ruff 2025-01-13 21:58:40 +00:00
7d91041091 Format using black 2025-01-13 21:54:13 +00:00
4f76ad144f Improve pyproject.toml 2025-01-13 21:53:51 +00:00
2cf210bce6 Allow turning on the TV using a webhook 2025-01-13 22:51:02 +01:00
50d5e40a46 Add youtube.com to the Youtube plugin 2025-01-13 22:34:13 +01:00
5b010124d7 Hopefully exclude the contrib directory 2025-01-12 21:44:00 +00:00
ddb2730249 Fix vlc is_playing 2025-01-12 22:38:42 +01:00
597a10543d Integrate with CEC 2025-01-12 22:38:32 +01:00
fcfe97ad89 Show the loading spinner before doing anything in Player.Open 2025-01-12 22:01:53 +01:00
0f5756b064 Do not ignore the Youtube script 2025-01-12 16:18:33 +01:00
ba04f03d71 Add Youtube script 2025-01-12 16:18:01 +01:00
58e2b09b8a Add config field for script config 2025-01-12 16:17:39 +01:00
f590e752be Implement seeking for vlc 2025-01-12 13:56:33 +01:00
a90047d14c Fix exception when stopping mpv/vlc 2025-01-12 01:13:40 +00:00
f2c96389ad Pass args in start-microkodi.sh 2025-01-12 00:39:53 +00:00
ad231134af Remove the udev stuff 2025-01-12 00:25:06 +00:00
03f143cdcb Run via sway 2025-01-12 01:24:22 +01:00
b0d16ffd49 Add notifications 2025-01-12 01:24:14 +01:00
934c4183aa Add udev stuff 2025-01-12 00:21:59 +01:00
a8ac092277 Improve look of the clock 2025-01-11 23:57:02 +01:00
a29a8c208b Fix typo in the vlc implementation 2025-01-11 23:56:41 +01:00
ec3563f6f4 Remove unused imports 2025-01-11 23:56:14 +01:00
2cc997797c Add a simple clock 2025-01-11 22:25:38 +00:00
9eaa40f809 Implement VLC as a player backend 2025-01-11 22:06:00 +00:00
17 changed files with 696 additions and 64 deletions

8
.gitignore vendored
View File

@@ -1,7 +1,13 @@
# Python
**/*.egg-info
**/__pycache__/
.venv/
# Tooling
.mypy_cache/
.ruff_cache/
# Testing
config.json
scripts/
scripts/
!scripts/youtube.py

12
contrib/sway.cfg Normal file
View File

@@ -0,0 +1,12 @@
# Prevent window resizing when we open the player apps
workspace_layout stacking
# Hide the window titlebar
default_border none
default_floating_border none
font pango:monospace 0
titlebar_padding 1
titlebar_border_thickness 0
# Start microkodi
exec bash <repo path>/start-microkodi.sh

View File

@@ -1,6 +1,10 @@
from dataclasses import dataclass
from pathlib import Path
import json
from typing import Any
from microkodi.helpers import recursive_dict_merge
@dataclass
class Config:
@@ -16,12 +20,32 @@ class Config:
# Extra args to pass to mpv
mpv_args: list[str]
# Path to the vlc binary
vlc: str
# Extra arguments to pass to vlc
vlc_args: list[str]
# Wallpapers to show
wallpapers: list[str]
# Additional scripts to load
scripts: dict[str, str]
# Player configuration
# URL scheme -> netloc (or '*' for fallback) -> fully-qualified player class
players: dict[str, dict[str, str]]
# The entire configuration file for use in user scripts
options: dict[str, Any]
# Enables the use of CEC
cec: bool
# Webhook to trigger to turn on power to the TV
tv_power_webhook: str | None
def load_config(config_path: Path | None) -> Config:
if config_path is None:
config_data = {}
@@ -35,6 +59,19 @@ def load_config(config_path: Path | None) -> Config:
mpv=config_data.get("mpv", "/usr/bin/mpv"),
mpv_ipc_sock=config_data.get("mpv_ipc_sock", "/tmp/mpv.sock"),
mpv_args=config_data.get("mpv_args", []),
vlc=config_data.get("vlc", "/usr/bin/vlc"),
vlc_args=config_data.get("vlc_args", []),
wallpapers=config_data.get("wallpapers", []),
scripts=config_data.get("scripts", {}),
players=recursive_dict_merge(
{
"https": {
"*": "microkodi.programs.vlc.VlcProgram",
},
},
config_data.get("players", {}),
),
options=config_data.get("options", {}),
cec=config_data.get("cec", False),
tv_power_webhook=config_data.get("tv_power_webhook"),
)

View File

@@ -1,4 +1,23 @@
from dataclasses import dataclass
from typing import TypeVar, ParamSpec, Callable
P = ParamSpec("P")
T = TypeVar("T")
def after(func: Callable[[], None]) -> Callable[P, T]:
"""Runs @func after the decorated function exits."""
def decorator(f: Callable[P, T]) -> Callable[P, T]:
def inner(*args: P.args, **kwargs: P.kwargs) -> T:
ret = f(*args, **kwargs)
func()
return ret
return inner
return decorator
@dataclass
class KodiTime:
@@ -6,6 +25,10 @@ class KodiTime:
minutes: int
seconds: int
def to_seconds(self) -> int:
return self.hours * 3600 + self.minutes * 59 + self.seconds
def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600
@@ -17,3 +40,19 @@ def seconds_to_kodi_format(seconds: int) -> KodiTime:
minutes=minutes,
seconds=seconds,
)
def recursive_dict_merge(base: dict, update: dict) -> dict:
unique_keys = set([*base.keys(), *update.keys()])
out = {}
for key in unique_keys:
if key in base and key not in update:
out[key] = base[key]
elif key not in base and key in update:
out[key] = update[key]
else:
if not isinstance(update[key], dict):
out[key] = update[key]
else:
out[key] = recursive_dict_merge(base[key], update[key])
return out

View File

@@ -3,13 +3,16 @@ from typing import Any
from urllib.parse import urlparse
import logging
import base64
import sys
import importlib
from http.server import BaseHTTPRequestHandler
from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo
from microkodi.programs.mpv import MpvProgram
from microkodi.helpers import seconds_to_kodi_format
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
from microkodi.config import Config
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return {
@@ -18,6 +21,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
"id": id,
}
class JsonRpcObject:
logger: logging.Logger
@@ -70,9 +74,8 @@ class ApplicationRpcObject:
return "LOL"
else:
return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"]
}
return {prop: _get_property(prop) for prop in params["properties"]}
class PlayerRpcObject(JsonRpcObject):
@@ -99,20 +102,38 @@ class PlayerRpcObject(JsonRpcObject):
return self.open(params)
elif method == "PlayPause":
return self.play_pause(params)
elif method == "Seek":
return self.seek(params)
else:
return "Unknown method"
def seek(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to seek player that is not active")
return "ERR"
time_raw = params["value"]["time"]
kodi_time = KodiTime(
hours=time_raw["hours"],
minutes=time_raw["minutes"],
seconds=time_raw["seconds"],
)
self._active_program.seek(kodi_time)
return "Ok"
def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active:
return []
return [{"playerid":1,"playertype":"internal","type":"video"}]
return [{"playerid": 1, "playertype": "internal", "type": "video"}]
def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to toggle player that is not running")
return "OK"
if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"])
self.logger.warn(
"Trying to call PlayPause with unknown play: '%s'", params["play"]
)
return "Err"
playing = self._active_program.is_playing()
@@ -132,10 +153,14 @@ class PlayerRpcObject(JsonRpcObject):
self._active_program.stop("playerrpcobject.stop")
return "OK"
@after(lambda: I.get("DataBridge").set_loading(False))
def open(self, params: dict[str, Any]) -> Any:
url = urlparse(params["item"]["file"])
# Turn on the TV
config: Config = I.get("Config")
I.get("TVHandler").turn_on_tv()
program_cls = None
I.get("DataBridge").set_loading(True)
url = urlparse(params["item"]["file"])
# Handle plugins
if url.scheme == "plugin":
@@ -143,15 +168,43 @@ class PlayerRpcObject(JsonRpcObject):
self.logger.debug("Rewriting provided URL to %s", url.query)
url = urlparse(url.query)
if url.scheme == "https":
program_cls = MpvProgram
# Find out what player class to use
scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None:
I.get("DataBridge").notification.emit(
f"No player available for {url.scheme}"
)
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return {"error": "invalid protocol"}
player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None:
player_class_name = scheme_configuration.get("*")
if player_class_name is None:
I.get("DataBridge").notification.emit(
f"No player available for {url.netloc}"
)
self.logger.warn("No player was picked for url '%s'", url)
return {"error": "invalid protocol"}
# Try to import the class
*module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts)
program_cls = None
if module_name not in sys.modules:
self.logger.debug(
"Trying to import %s to get class %s", module_name, class_name
)
spec = importlib.util.find_spec(module_name)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
program_cls = getattr(sys.modules[module_name], class_name, None)
if program_cls is None:
return {
"error": "invalid protocol"
}
I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name)
return {"error": "invalid protocol"}
I.get("DataBridge").set_loading(True)
if self._active_program is not None:
if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open")
@@ -175,9 +228,7 @@ class PlayerRpcObject(JsonRpcObject):
return name if self.is_active else ""
player_info = self._active_program.get_player_info()
item = {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
item = {prop: _get_property(prop, player_info) for prop in params["properties"]}
return {
"item": {
**item,
@@ -186,13 +237,19 @@ class PlayerRpcObject(JsonRpcObject):
}
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]:
def _get_property(
name: str, player_info: PlayerInfo
) -> str | bool | dict[str, Any]:
if name == "currentsubtitle":
return None
elif name == "partymode":
return False
elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0
return (
player_info.position / player_info.runtime
if player_info.runtime > 0
else 0.0
)
elif name == "playlistid":
return 0
elif name == "position":
@@ -232,10 +289,10 @@ class PlayerRpcObject(JsonRpcObject):
else:
self.logger.error("Unknown param '%s'", name)
return "Unknown"
player_info = self._active_program.get_player_info()
return {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return {prop: _get_property(prop, player_info) for prop in params["properties"]}
class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any:
@@ -248,15 +305,18 @@ class PlaylistRpcObject(JsonRpcObject):
def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid)
self.logger.warn(
"Asked to empty playlist %d but we're just pretending to do so", playlistid
)
return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}]
return [{"playlistid": 1, "type": "video"}]
def get_items(self, params: dict[str, Any]) -> Any:
return "ERR"
class GlobalMethodHandler:
objects: dict[str, JsonRpcObject]
@@ -281,7 +341,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path)
def do_POST(self):
logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc":
@@ -312,18 +371,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
if authenticated:
auth: str | None = self.headers.get("Authorization")
if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method)
logger.warn(
"Client provided no Authorization header. Method: %s", method_name
)
self.send_error(401)
return
auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method)
logger.warn(
"Client provided no Basic authorization. Method: %s", method_name
)
self.send_error(401)
return
credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method)
logger.warn(
"Rejecting request due to wrong credentials. Method: %s", method_name
)
self.send_error(403)
return
@@ -342,4 +407,4 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
self.wfile.write(response_body)
self.wfile.flush()
self.log_request(code="200")
self.log_request(code="200")

View File

@@ -12,23 +12,26 @@ from PySide6.QtQml import QQmlApplicationEngine
from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler
from microkodi.ui.bridge import DataBridge
from microkodi.config import Config, load_config
from microkodi.cec_handler import TVHandler
from microkodi.repository import I
def run_kodi_server():
config: Config = I.get("Config")
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("JsonRPCServer")
method_handler = GlobalMethodHandler()
I.register("GlobalMethodHandler", method_handler)
# Setup CEC
I.register("TVHandler", TVHandler())
# Load extra plugins
if config.scripts:
logger.info("Loading scripts...")
for name, path in config.scripts.items():
logger.debug("Trying to load %s (%s)", name, path)
logger.debug("Trying to load %s (%s)", name, path)
if not Path(path).exists():
logger.warning("Failed to load %s: File does not exist", name)
continue
@@ -47,19 +50,26 @@ def run_kodi_server():
init_method = getattr(module, "init")
init_method()
server = HTTPServer
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler)
httpd = HTTPServer(
(
config.host,
config.port,
),
JsonRpcHandler,
)
logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever()
logger.info("Shutting down server")
if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
parser.add_argument("--debug", action="store_true", default=False)
options = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.debug else logging.INFO)
# Load the config
config = load_config(options.config)
I.register("Config", config)
@@ -78,6 +88,10 @@ if __name__ == "__main__":
"wallpapers",
config.wallpapers,
)
engine.rootContext().setContextProperty(
"initialTime",
bridge.currentTime(),
)
engine.loadFromModule("qml", "Main")
if not engine.rootObjects():
sys.exit(-1)
@@ -85,4 +99,8 @@ if __name__ == "__main__":
engine.rootObjects()[0].setProperty("bridge", bridge)
exit_code = app.exec()
del engine
sys.exit(exit_code)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -2,12 +2,13 @@ from threading import Thread
import subprocess
from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen):
process.wait()
process.wait()
when_done()
process = subprocess.Popen(args)
t = Thread(target=_run, args=(process,))
t.start()
return process
return process

View File

@@ -2,6 +2,9 @@ from dataclasses import dataclass
from urllib.parse import ParseResult
import logging
from microkodi.helpers import KodiTime
@dataclass
class PlayerInfo:
runtime: int
@@ -9,6 +12,7 @@ class PlayerInfo:
title: str
playing: bool
class Program:
logger: logging.Logger
@@ -38,3 +42,6 @@ class Program:
def get_player_info(self) -> PlayerInfo:
raise NotImplementedError()
def seek(self, timestamp: KodiTime):
raise NotImplementedError()

View File

@@ -1,8 +1,7 @@
import subprocess
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse, urldefrag, unquote
from urllib.parse import ParseResult, urlunparse
import logging
from dataclasses import dataclass
import os
import json
import socket
@@ -12,6 +11,7 @@ from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
class MpvConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@@ -19,15 +19,23 @@ class MpvConfig:
def __init__(self):
self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext)
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("MpvConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig())
class MpvProgram(Program):
_process: subprocess.Popen
@@ -44,15 +52,16 @@ class MpvProgram(Program):
self._mpv_extra_args = config.mpv_args
self._mpv_ipc_sock = config.mpv_ipc_sock
def register_url_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
self.logger.debug("Registering transformer for .%s URLs", ext)
self._ext_map[ext] = transformer
def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({
"command": args,
}) + "\n"
command = (
json.dumps(
{
"command": args,
}
)
+ "\n"
)
ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try:
@@ -66,7 +75,6 @@ class MpvProgram(Program):
ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default)
@@ -113,16 +121,18 @@ class MpvProgram(Program):
def _when_mpv_exit(self):
self.logger.info("MPV has exited")
self._process = None
I.get("DataBridge").set_loading(False)
if self._process.returncode != 0:
I.get("DataBridge").notification.emit("mpv exited with an error")
self._process = None
def pause(self):
self.__mpv_command(["set_property", "pause", True])
def stop(self, src):
if self.is_active():
self._process.terminate()
self._process = None
def exit(self):
self.stop("mpvplayer.exit")
@@ -141,4 +151,4 @@ class MpvProgram(Program):
position=int(self.__read_prop("playback-time", "0")),
title=self.__read_prop("media-title", ""),
playing=not self.__read_prop("pause", False),
)
)

232
microkodi/programs/vlc.py Normal file
View File

@@ -0,0 +1,232 @@
from xml.dom.minidom import parseString
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse
import subprocess
import logging
import secrets
import requests
from microkodi.process import nonblocking_run
from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
# Mapping of domains to a potential transformer function
_domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
_event_listeners: dict[str, Callable[[], None]]
def __init__(self):
self._ext_map = {}
self._event_listeners = {}
self._domain_map = {}
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
def register_domain_transformer(
self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for the %s domain", domain
)
self._domain_map[domain] = transformer
def get_domain_transformer(
self, domain: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]):
if event not in self._event_listeners:
self._event_listeners[event] = []
logging.getLogger("VlcConfig").debug("Registering event listener for %s", event)
self._event_listeners[event].append(callback)
def run_event_listeners(self, event: str):
logger = logging.getLogger("VlcConfig")
for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event)
listener()
I.register("VlcConfig", VlcConfig())
class VlcProgram(Program):
_process: subprocess.Popen
_vlc_bin_path: str
_vlc_extra_args: list[str]
_vlc_password: str | None
def __init__(self):
super().__init__()
config: Config = I.get("Config")
self._process = None
self._vlc_bin_path = config.vlc
self._vlc_extra_args = config.vlc_args
self._vlc_password = None
def is_active(self) -> bool:
return self._process is not None and self._process.returncode is None
def is_playing(self) -> bool:
return self.get_player_info().playing
def resume(self):
self.__vlc_command("pl_pause")
def play(self, url: ParseResult):
self.stop("mpvplayer.play")
# Allow transforming the domain
extra_args = []
domain_transformer = I.get("VlcConfig").get_domain_transformer(url.netloc)
if domain_transformer is not None:
args, new_url = domain_transformer(url)
extra_args += args
url = new_url
# Allow preprocessing the URL
last_segment = url.path.split("/")[-1]
final_url = ""
if "." in last_segment:
*_, ext = last_segment.split(".")
transformer = I.get("VlcConfig").get_ext_transformer(ext)
if transformer is not None:
self.logger.info("Transforming URL due to extension: %s", ext)
args, final_url = transformer(url)
extra_args += args
self.logger.info("New url: %s", final_url)
else:
final_url = urlunparse(url)
else:
final_url = urlunparse(url)
# Start vlc
self._vlc_password = secrets.token_hex(32)
cmdline = [
self._vlc_bin_path,
*self._vlc_extra_args,
"-f",
"--qt-minimal-view",
"--no-osd",
"--intf=qt",
"--extraintf=http",
"--http-host=127.0.0.1",
"--http-port=9090",
f"--http-password={self._vlc_password}",
"--quiet",
"--play-and-exit",
*extra_args,
final_url,
]
self.logger.debug("Cmdline: %s", " ".join(cmdline))
self._process = nonblocking_run(cmdline, self._when_vlc_exit)
def _when_vlc_exit(self):
self.logger.info("vlc has exited")
I.get("VlcConfig").run_event_listeners(EVENT_PLAYER_EXIT)
I.get("DataBridge").set_loading(False)
if self._process.returncode != 0:
I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None
def __vlc_command(
self, command: str, params: dict[str, Any] | None = None
) -> str | None:
try:
req = requests.get(
"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
params={
"command": command,
**(params or {}),
},
)
return req.text
except Exception as ex:
self.logger.warning("Failed to command vlc API: %s", ex)
return None
def pause(self):
self.__vlc_command("pl_pause")
def stop(self, src):
if self.is_active():
self._process.terminate()
def seek(self, timestamp: KodiTime):
self.__vlc_command(
"seek",
{
"val": timestamp.to_seconds(),
},
)
def exit(self):
self.stop("mpvplayer.exit")
def get_player_info(self) -> PlayerInfo:
try:
req = requests.get(
"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
)
dom = parseString(req.text)
except Exception as ex:
self.logger.warning("Failed to query vlc API: %s", ex)
return PlayerInfo(
runtime=0,
position=0,
title="",
playing=False,
)
try:
root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = (
root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
)
except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text)
return PlayerInfo(
runtime=0,
position=0,
title="",
playing=False,
)
return PlayerInfo(
runtime=length,
position=time,
# TODO
title="TODO: Find out",
playing=playing,
)

View File

@@ -12,6 +12,7 @@ Window {
property var wallpaperIndex: 0
property var isLoading: false
property var bridge
property var currentTime: initialTime
Connections {
target: bridge
@@ -19,9 +20,14 @@ Window {
function onIsLoading(loading) {
isLoading = loading
}
function onNotification(text) {
notificationModel.append({"message": text})
}
}
Image {
id: wallpaperImage
height: window.height
width: window.width
visible: true
@@ -44,9 +50,68 @@ Window {
}
Timer {
interval: 5 * 60 * 1000
running: true
interval: 30 * 1000
running: !isLoading
repeat: true
onTriggered: wallpaperIndex = (wallpaperIndex + 1) % wallpapers.length
}
Timer {
interval: 5 * 1000
running: true
repeat: true
onTriggered: currentTime = bridge.currentTime()
}
Label {
text: currentTime
anchors.right: wallpaperImage.right
anchors.top: wallpaperImage.top
anchors.topMargin: 20
anchors.rightMargin: 20
font.pixelSize: window.height * 0.1
}
ListModel {
id: notificationModel
}
Timer {
interval: 8 * 1000
running: notificationModel.count > 0
repeat: true
onTriggered: notificationModel.remove(0, 1)
}
Component {
id: notificationDelegate
Rectangle {
color: "#37474F"
width: 400
height: 100
radius: 10
Label {
text: message
color: "white"
font.pixelSize: 20
anchors.fill: parent
anchors.margins: 10
}
}
}
ListView {
anchors.right: wallpaperImage.right
anchors.top: wallpaperImage.top
anchors.topMargin: 20 * 2 + window.height * 0.1
anchors.rightMargin: 20
width: 400
height: window.height * 0.9 - 20 * 2
spacing: 50
model: notificationModel
delegate: notificationDelegate
}
}

View File

@@ -1,5 +1,6 @@
from typing import Any
class _Repository:
_data: dict[str, Any]
@@ -12,4 +13,5 @@ class _Repository:
def get(self, key: str) -> Any | None:
return self._data.get(key)
I = _Repository()
I = _Repository() # noqa:E741

45
microkodi/tv.py Normal file
View File

@@ -0,0 +1,45 @@
import logging
import time
import cec
import requests
from microkodi.config import Config
from microkodi.repository import I
class TVHandler:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger("TVHandler")
config: Config = I.get("Config")
if config.cec:
cec.init()
self.logger.debug("CEC initialised")
def enable_power_if_configured(self) -> bool:
config: Config = I.get("Config")
if config.tv_power_webhook is not None:
try:
requests.put(config.tv_power_webhook)
self.logger.debug("Triggered webhook to enable power to the TV")
except Exception:
self.logger.warn("Failed to enable power to the TV")
return config.tv_power_webhook is not None
def power_on_if_needed(self) -> bool:
config: Config = I.get("Config")
if config.cec:
tc = cec.Device(cec.CECDEVICE_TV)
if not tc.is_on():
tc.power_on()
return config.cec
def turn_on_tv(self):
if self.enable_power_if_configured():
self.logger.debug("Waiting 500ms for TV to get power...")
time.sleep(500)
self.power_on_if_needed()

View File

@@ -1,4 +1,7 @@
from PySide6.QtCore import QObject, Signal
from datetime import datetime
from PySide6.QtCore import QObject, Signal, Slot
class DataBridge(QObject):
"""Bridge between the QML app and the Python "host"."""
@@ -6,9 +9,15 @@ class DataBridge(QObject):
# Indicates whether we're currently loading something or not
isLoading = Signal(bool, arguments=["loading"])
notification = Signal(str, arguments=["text"])
def __init__(self):
super().__init__()
def set_loading(self, state: bool):
"""Set the loading state in the UI."""
self.isLoading.emit(state)
@Slot(result=str)
def currentTime(self) -> str:
return datetime.now().strftime("%H:%M")

View File

@@ -2,8 +2,24 @@
name = "microkodi"
version = "0.1.0"
dependencies = [
"pyside6"
"pyside6",
"requests",
"cec"
]
requires-python = ">= 3.11"
[project.optional-dependencies]
youtube = [
"yt-dlp"
]
dev = [
"mypy",
"black",
"ruff"
]
[tools.build]
packages = ["microkodi"]
[project.scripts]
microkodi = "microkodi.main:main"
[tool.setuptools.packages.find]
exclude = ["contrib"]

64
scripts/youtube.py Normal file
View File

@@ -0,0 +1,64 @@
from urllib.parse import ParseResult, urlunparse, urlparse
import logging
from microkodi.repository import I
from microkodi.config import Config
from microkodi.helpers import recursive_dict_merge
import yt_dlp
DEFAULT_CONFIG = {"format": "bestvideo[width<=1920]+bestaudio", "ytdlp_options": {}}
def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
logger = logging.getLogger("Youtube")
youtube_config = I.get("YoutubeConfig")
opts = {
"format": youtube_config["format"],
**youtube_config["ytdlp_options"],
}
logger.debug("Using config for yt-dlp: %s", opts)
with yt_dlp.YoutubeDL(opts) as ytdl:
info = ytdl.extract_info(urlunparse(url), download=False)
#user_agent = None
audio_url = None
video_url = None
for format in info["requested_formats"]:
if format["width"] is None:
audio_url = format["url"]
else:
#user_agent = format["http_headers"]["User-Agent"]
video_url = format["url"]
args = (
[
f"--input-slave={audio_url}",
]
if audio_url
else None
)
return args, urlparse(video_url)
def init():
# Create the config
config: Config = I.get("Config")
youtube_config = recursive_dict_merge(
DEFAULT_CONFIG,
config.options.get(
"me.polynom.microkodi.youtube",
{},
),
)
I.register("YoutubeConfig", youtube_config)
# Register the transformers
I.get("VlcConfig").register_domain_transformer("youtu.be", youtube_url_transformer)
I.get("VlcConfig").register_domain_transformer(
"youtube.com", youtube_url_transformer
)
I.get("VlcConfig").register_domain_transformer(
"www.youtube.com", youtube_url_transformer
)

4
start-microkodi.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
cd `dirname $0`
source .venv/bin/activate
python3 microkodi/main.py -c ./config.json $@