Compare commits

...

20 Commits

Author SHA1 Message Date
c353ed5eb7 Format and lint youtube.py 2025-01-13 22:02:19 +00:00
71ee104658 Ignore tooling caches 2025-01-13 21:59:25 +00:00
fad2d4f312 Lint using ruff 2025-01-13 21:58:40 +00:00
7d91041091 Format using black 2025-01-13 21:54:13 +00:00
4f76ad144f Improve pyproject.toml 2025-01-13 21:53:51 +00:00
2cf210bce6 Allow turning on the TV using a webhook 2025-01-13 22:51:02 +01:00
50d5e40a46 Add youtube.com to the Youtube plugin 2025-01-13 22:34:13 +01:00
5b010124d7 Hopefully exclude the contrib directory 2025-01-12 21:44:00 +00:00
ddb2730249 Fix vlc is_playing 2025-01-12 22:38:42 +01:00
597a10543d Integrate with CEC 2025-01-12 22:38:32 +01:00
fcfe97ad89 Show the loading spinner before doing anything in Player.Open 2025-01-12 22:01:53 +01:00
0f5756b064 Do not ignore the Youtube script 2025-01-12 16:18:33 +01:00
ba04f03d71 Add Youtube script 2025-01-12 16:18:01 +01:00
58e2b09b8a Add config field for script config 2025-01-12 16:17:39 +01:00
f590e752be Implement seeking for vlc 2025-01-12 13:56:33 +01:00
a90047d14c Fix exception when stopping mpv/vlc 2025-01-12 01:13:40 +00:00
f2c96389ad Pass args in start-microkodi.sh 2025-01-12 00:39:53 +00:00
ad231134af Remove the udev stuff 2025-01-12 00:25:06 +00:00
03f143cdcb Run via sway 2025-01-12 01:24:22 +01:00
b0d16ffd49 Add notifications 2025-01-12 01:24:14 +01:00
18 changed files with 423 additions and 144 deletions

6
.gitignore vendored
View File

@@ -1,7 +1,13 @@
# Python
**/*.egg-info
**/__pycache__/
.venv/
# Tooling
.mypy_cache/
.ruff_cache/
# Testing
config.json
scripts/
!scripts/youtube.py

12
contrib/sway.cfg Normal file
View File

@@ -0,0 +1,12 @@
# Prevent window resizing when we open the player apps
workspace_layout stacking
# Hide the window titlebar
default_border none
default_floating_border none
font pango:monospace 0
titlebar_padding 1
titlebar_border_thickness 0
# Start microkodi
exec bash <repo path>/start-microkodi.sh

View File

@@ -1,9 +1,11 @@
from dataclasses import dataclass
from pathlib import Path
import json
from typing import Any
from microkodi.helpers import recursive_dict_merge
@dataclass
class Config:
host: str
@@ -34,12 +36,15 @@ class Config:
# URL scheme -> netloc (or '*' for fallback) -> fully-qualified player class
players: dict[str, dict[str, str]]
card: str | None
connector: str | None
# The entire configuration file for use in user scripts
options: dict[str, Any]
# Enables the use of CEC
cec: bool
# Webhook to trigger to turn on power to the TV
tv_power_webhook: str | None
@property
def watch_connector(self) -> bool:
return self.card is not None and self.connector is not None
def load_config(config_path: Path | None) -> Config:
if config_path is None:
@@ -66,6 +71,7 @@ def load_config(config_path: Path | None) -> Config:
},
config_data.get("players", {}),
),
card=config_data.get("card"),
connector=config_data.get("connector"),
options=config_data.get("options", {}),
cec=config_data.get("cec", False),
tv_power_webhook=config_data.get("tv_power_webhook"),
)

View File

@@ -1,5 +1,23 @@
from dataclasses import dataclass
from typing import TypeVar, Callable
from typing import TypeVar, ParamSpec, Callable
P = ParamSpec("P")
T = TypeVar("T")
def after(func: Callable[[], None]) -> Callable[P, T]:
"""Runs @func after the decorated function exits."""
def decorator(f: Callable[P, T]) -> Callable[P, T]:
def inner(*args: P.args, **kwargs: P.kwargs) -> T:
ret = f(*args, **kwargs)
func()
return ret
return inner
return decorator
@dataclass
class KodiTime:
@@ -7,6 +25,10 @@ class KodiTime:
minutes: int
seconds: int
def to_seconds(self) -> int:
return self.hours * 3600 + self.minutes * 59 + self.seconds
def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600
@@ -20,13 +42,6 @@ def seconds_to_kodi_format(seconds: int) -> KodiTime:
)
T = TypeVar("T")
def find(l: list[T], pred: Callable[[T], bool]) -> T | None:
for i in l:
if pred(i):
return i
return None
def recursive_dict_merge(base: dict, update: dict) -> dict:
unique_keys = set([*base.keys(), *update.keys()])
out = {}

View File

@@ -4,12 +4,15 @@ from urllib.parse import urlparse
import logging
import base64
import sys
import importlib
from http.server import BaseHTTPRequestHandler
from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo
from microkodi.helpers import seconds_to_kodi_format
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
from microkodi.config import Config
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return {
@@ -18,6 +21,7 @@ def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
"id": id,
}
class JsonRpcObject:
logger: logging.Logger
@@ -70,9 +74,8 @@ class ApplicationRpcObject:
return "LOL"
else:
return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"]
}
return {prop: _get_property(prop) for prop in params["properties"]}
class PlayerRpcObject(JsonRpcObject):
@@ -99,20 +102,38 @@ class PlayerRpcObject(JsonRpcObject):
return self.open(params)
elif method == "PlayPause":
return self.play_pause(params)
elif method == "Seek":
return self.seek(params)
else:
return "Unknown method"
def seek(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to seek player that is not active")
return "ERR"
time_raw = params["value"]["time"]
kodi_time = KodiTime(
hours=time_raw["hours"],
minutes=time_raw["minutes"],
seconds=time_raw["seconds"],
)
self._active_program.seek(kodi_time)
return "Ok"
def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active:
return []
return [{"playerid":1,"playertype":"internal","type":"video"}]
return [{"playerid": 1, "playertype": "internal", "type": "video"}]
def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to toggle player that is not running")
return "OK"
if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"])
self.logger.warn(
"Trying to call PlayPause with unknown play: '%s'", params["play"]
)
return "Err"
playing = self._active_program.is_playing()
@@ -132,7 +153,13 @@ class PlayerRpcObject(JsonRpcObject):
self._active_program.stop("playerrpcobject.stop")
return "OK"
@after(lambda: I.get("DataBridge").set_loading(False))
def open(self, params: dict[str, Any]) -> Any:
# Turn on the TV
config: Config = I.get("Config")
I.get("TVHandler").turn_on_tv()
I.get("DataBridge").set_loading(True)
url = urlparse(params["item"]["file"])
# Handle plugins
@@ -142,41 +169,42 @@ class PlayerRpcObject(JsonRpcObject):
url = urlparse(url.query)
# Find out what player class to use
config: Config = I.get("Config")
scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None:
I.get("DataBridge").notification.emit(
f"No player available for {url.scheme}"
)
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None:
player_class_name = scheme_configuration.get("*")
if player_class_name is None:
I.get("DataBridge").notification.emit(
f"No player available for {url.netloc}"
)
self.logger.warn("No player was picked for url '%s'", url)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
# Try to import the class
*module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts)
program_cls = None
if module_name not in sys.modules:
self.logger.debug("Trying to import %s to get class %s", module_name, class_name)
spec = importlib.util.find_spec(module)
self.logger.debug(
"Trying to import %s to get class %s", module_name, class_name
)
spec = importlib.util.find_spec(module_name)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
program_cls = getattr(sys.modules[module_name], class_name, None)
if program_cls is None:
I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name)
return {
"error": "invalid protocol"
}
return {"error": "invalid protocol"}
I.get("DataBridge").set_loading(True)
if self._active_program is not None:
if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open")
@@ -200,9 +228,7 @@ class PlayerRpcObject(JsonRpcObject):
return name if self.is_active else ""
player_info = self._active_program.get_player_info()
item = {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
item = {prop: _get_property(prop, player_info) for prop in params["properties"]}
return {
"item": {
**item,
@@ -211,13 +237,19 @@ class PlayerRpcObject(JsonRpcObject):
}
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]:
def _get_property(
name: str, player_info: PlayerInfo
) -> str | bool | dict[str, Any]:
if name == "currentsubtitle":
return None
elif name == "partymode":
return False
elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0
return (
player_info.position / player_info.runtime
if player_info.runtime > 0
else 0.0
)
elif name == "playlistid":
return 0
elif name == "position":
@@ -257,10 +289,10 @@ class PlayerRpcObject(JsonRpcObject):
else:
self.logger.error("Unknown param '%s'", name)
return "Unknown"
player_info = self._active_program.get_player_info()
return {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return {prop: _get_property(prop, player_info) for prop in params["properties"]}
class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any:
@@ -273,15 +305,18 @@ class PlaylistRpcObject(JsonRpcObject):
def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid)
self.logger.warn(
"Asked to empty playlist %d but we're just pretending to do so", playlistid
)
return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}]
return [{"playlistid": 1, "type": "video"}]
def get_items(self, params: dict[str, Any]) -> Any:
return "ERR"
class GlobalMethodHandler:
objects: dict[str, JsonRpcObject]
@@ -306,7 +341,6 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path)
def do_POST(self):
logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc":
@@ -337,18 +371,24 @@ class JsonRpcHandler(BaseHTTPRequestHandler):
if authenticated:
auth: str | None = self.headers.get("Authorization")
if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method)
logger.warn(
"Client provided no Authorization header. Method: %s", method_name
)
self.send_error(401)
return
auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method)
logger.warn(
"Client provided no Basic authorization. Method: %s", method_name
)
self.send_error(401)
return
credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method)
logger.warn(
"Rejecting request due to wrong credentials. Method: %s", method_name
)
self.send_error(403)
return

View File

@@ -5,7 +5,6 @@ import threading
import argparse
from pathlib import Path
import importlib.util
import time
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
@@ -13,8 +12,8 @@ from PySide6.QtQml import QQmlApplicationEngine
from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler
from microkodi.ui.bridge import DataBridge
from microkodi.config import Config, load_config
from microkodi.cec_handler import TVHandler
from microkodi.repository import I
from microkodi.udev import is_display_connected, block_until_display_connected
def run_kodi_server():
@@ -24,6 +23,9 @@ def run_kodi_server():
method_handler = GlobalMethodHandler()
I.register("GlobalMethodHandler", method_handler)
# Setup CEC
I.register("TVHandler", TVHandler())
# Load extra plugins
if config.scripts:
logger.info("Loading scripts...")
@@ -48,22 +50,25 @@ def run_kodi_server():
init_method = getattr(module, "init")
init_method()
server = HTTPServer
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler)
httpd = HTTPServer(
(
config.host,
config.port,
),
JsonRpcHandler,
)
logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever()
logger.info("Shutting down server")
if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
parser.add_argument("--debug", action="store_true", default=False)
options = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if options.debug else logging.INFO)
logger = logging.getLogger("ui")
# Load the config
config = load_config(options.config)
@@ -92,25 +97,10 @@ if __name__ == "__main__":
sys.exit(-1)
engine.rootObjects()[0].setProperty("bridge", bridge)
if config.watch_connector:
logger.info("Will be watching display if it's gone")
exit_code = 0
while True:
exit_code = app.exec()
if not config.watch_connector:
break
# Exit if the display is still connected
if is_display_connected(config.card, config.connector):
break
logger.info("Display is gone. Waiting until it's back")
block_until_display_connected(config.card, config.connector)
logger.info("Display is back. Waiting 500ms...")
time.sleep(0.5)
exit_code = app.exec()
del engine
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -2,6 +2,7 @@ from threading import Thread
import subprocess
from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen):
process.wait()

View File

@@ -2,6 +2,9 @@ from dataclasses import dataclass
from urllib.parse import ParseResult
import logging
from microkodi.helpers import KodiTime
@dataclass
class PlayerInfo:
runtime: int
@@ -9,6 +12,7 @@ class PlayerInfo:
title: str
playing: bool
class Program:
logger: logging.Logger
@@ -38,3 +42,6 @@ class Program:
def get_player_info(self) -> PlayerInfo:
raise NotImplementedError()
def seek(self, timestamp: KodiTime):
raise NotImplementedError()

View File

@@ -1,8 +1,7 @@
import subprocess
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse, urldefrag, unquote
from urllib.parse import ParseResult, urlunparse
import logging
from dataclasses import dataclass
import os
import json
import socket
@@ -12,6 +11,7 @@ from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
class MpvConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@@ -19,14 +19,23 @@ class MpvConfig:
def __init__(self):
self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext)
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("MpvConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig())
class MpvProgram(Program):
_process: subprocess.Popen
@@ -45,9 +54,14 @@ class MpvProgram(Program):
def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({
"command": args,
}) + "\n"
command = (
json.dumps(
{
"command": args,
}
)
+ "\n"
)
ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try:
@@ -61,7 +75,6 @@ class MpvProgram(Program):
ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default)
@@ -108,16 +121,18 @@ class MpvProgram(Program):
def _when_mpv_exit(self):
self.logger.info("MPV has exited")
self._process = None
I.get("DataBridge").set_loading(False)
if self._process.returncode != 0:
I.get("DataBridge").notification.emit("mpv exited with an error")
self._process = None
def pause(self):
self.__mpv_command(["set_property", "pause", True])
def stop(self, src):
if self.is_active():
self._process.terminate()
self._process = None
def exit(self):
self.stop("mpvplayer.exit")

View File

@@ -1,5 +1,5 @@
from xml.dom.minidom import parseString
from typing import Callable
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse
import subprocess
import logging
@@ -11,9 +11,11 @@ from microkodi.process import nonblocking_run
from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
@@ -28,18 +30,30 @@ class VlcConfig:
self._event_listeners = {}
self._domain_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext)
def register_ext_transformer(
self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for .%s URLs", ext
)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_ext_transformer(
self, ext: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain)
def register_domain_transformer(
self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]
):
logging.getLogger("VlcConfig").debug(
"Registering transformer for the %s domain", domain
)
self._domain_map[domain] = transformer
def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
def get_domain_transformer(
self, domain: str
) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]):
@@ -53,8 +67,11 @@ class VlcConfig:
for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event)
listener()
I.register("VlcConfig", VlcConfig())
class VlcProgram(Program):
_process: subprocess.Popen
@@ -76,7 +93,7 @@ class VlcProgram(Program):
return self._process is not None and self._process.returncode is None
def is_playing(self) -> bool:
return self.get_player_info().is_playing
return self.get_player_info().playing
def resume(self):
self.__vlc_command("pl_pause")
@@ -122,6 +139,7 @@ class VlcProgram(Program):
"--http-port=9090",
f"--http-password={self._vlc_password}",
"--quiet",
"--play-and-exit",
*extra_args,
final_url,
]
@@ -130,15 +148,24 @@ class VlcProgram(Program):
def _when_vlc_exit(self):
self.logger.info("vlc has exited")
self._process = None
I.get("VlcConfig").run_event_listeners(EVENT_PLAYER_EXIT)
I.get("DataBridge").set_loading(False)
def __vlc_command(self, command: str) -> str | None:
if self._process.returncode != 0:
I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None
def __vlc_command(
self, command: str, params: dict[str, Any] | None = None
) -> str | None:
try:
req = requests.get(
f"http://127.0.0.1:9090/requests/status.xml?command={command}",
"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
params={
"command": command,
**(params or {}),
},
)
return req.text
except Exception as ex:
@@ -151,7 +178,14 @@ class VlcProgram(Program):
def stop(self, src):
if self.is_active():
self._process.terminate()
self._process = None
def seek(self, timestamp: KodiTime):
self.__vlc_command(
"seek",
{
"val": timestamp.to_seconds(),
},
)
def exit(self):
self.stop("mpvplayer.exit")
@@ -176,7 +210,9 @@ class VlcProgram(Program):
root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
playing = (
root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
)
except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text)

View File

@@ -20,6 +20,10 @@ Window {
function onIsLoading(loading) {
isLoading = loading
}
function onNotification(text) {
notificationModel.append({"message": text})
}
}
Image {
@@ -67,4 +71,47 @@ Window {
anchors.rightMargin: 20
font.pixelSize: window.height * 0.1
}
ListModel {
id: notificationModel
}
Timer {
interval: 8 * 1000
running: notificationModel.count > 0
repeat: true
onTriggered: notificationModel.remove(0, 1)
}
Component {
id: notificationDelegate
Rectangle {
color: "#37474F"
width: 400
height: 100
radius: 10
Label {
text: message
color: "white"
font.pixelSize: 20
anchors.fill: parent
anchors.margins: 10
}
}
}
ListView {
anchors.right: wallpaperImage.right
anchors.top: wallpaperImage.top
anchors.topMargin: 20 * 2 + window.height * 0.1
anchors.rightMargin: 20
width: 400
height: window.height * 0.9 - 20 * 2
spacing: 50
model: notificationModel
delegate: notificationDelegate
}
}

View File

@@ -1,5 +1,6 @@
from typing import Any
class _Repository:
_data: dict[str, Any]
@@ -12,4 +13,5 @@ class _Repository:
def get(self, key: str) -> Any | None:
return self._data.get(key)
I = _Repository()
I = _Repository() # noqa:E741

45
microkodi/tv.py Normal file
View File

@@ -0,0 +1,45 @@
import logging
import time
import cec
import requests
from microkodi.config import Config
from microkodi.repository import I
class TVHandler:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger("TVHandler")
config: Config = I.get("Config")
if config.cec:
cec.init()
self.logger.debug("CEC initialised")
def enable_power_if_configured(self) -> bool:
config: Config = I.get("Config")
if config.tv_power_webhook is not None:
try:
requests.put(config.tv_power_webhook)
self.logger.debug("Triggered webhook to enable power to the TV")
except Exception:
self.logger.warn("Failed to enable power to the TV")
return config.tv_power_webhook is not None
def power_on_if_needed(self) -> bool:
config: Config = I.get("Config")
if config.cec:
tc = cec.Device(cec.CECDEVICE_TV)
if not tc.is_on():
tc.power_on()
return config.cec
def turn_on_tv(self):
if self.enable_power_if_configured():
self.logger.debug("Waiting 500ms for TV to get power...")
time.sleep(500)
self.power_on_if_needed()

View File

@@ -1,27 +0,0 @@
import logging
import pyudev
def is_display_connected(card: str, connector: str) -> bool:
logger = logging.getLogger("udev")
status_file = f"/sys/class/drm/{card}-{connector}/status"
logger.debug("Reading file %s", status_file)
with open(status_file, "r") as f:
result = f.read().strip()
logger.debug("Result: '%s'", result)
return result == "connected"
def block_until_display_connected(card: str, connector: str):
ctx = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(ctx)
monitor.filter_by("drm")
for device in iter(monitor.poll, None):
if not "DEVNAME" in device:
continue
if device.get("DEVNAME") != f"/dev/dri/{card}":
continue
if not is_display_connected(card, connector):
continue
break

View File

@@ -2,12 +2,15 @@ from datetime import datetime
from PySide6.QtCore import QObject, Signal, Slot
class DataBridge(QObject):
"""Bridge between the QML app and the Python "host"."""
# Indicates whether we're currently loading something or not
isLoading = Signal(bool, arguments=["loading"])
notification = Signal(str, arguments=["text"])
def __init__(self):
super().__init__()

View File

@@ -4,9 +4,22 @@ version = "0.1.0"
dependencies = [
"pyside6",
"requests",
"yt-dlp",
"pyudev"
"cec"
]
requires-python = ">= 3.11"
[project.optional-dependencies]
youtube = [
"yt-dlp"
]
dev = [
"mypy",
"black",
"ruff"
]
[tools.build]
packages = ["microkodi"]
[project.scripts]
microkodi = "microkodi.main:main"
[tool.setuptools.packages.find]
exclude = ["contrib"]

64
scripts/youtube.py Normal file
View File

@@ -0,0 +1,64 @@
from urllib.parse import ParseResult, urlunparse, urlparse
import logging
from microkodi.repository import I
from microkodi.config import Config
from microkodi.helpers import recursive_dict_merge
import yt_dlp
DEFAULT_CONFIG = {"format": "bestvideo[width<=1920]+bestaudio", "ytdlp_options": {}}
def youtube_url_transformer(url: ParseResult) -> tuple[list[str], str]:
logger = logging.getLogger("Youtube")
youtube_config = I.get("YoutubeConfig")
opts = {
"format": youtube_config["format"],
**youtube_config["ytdlp_options"],
}
logger.debug("Using config for yt-dlp: %s", opts)
with yt_dlp.YoutubeDL(opts) as ytdl:
info = ytdl.extract_info(urlunparse(url), download=False)
#user_agent = None
audio_url = None
video_url = None
for format in info["requested_formats"]:
if format["width"] is None:
audio_url = format["url"]
else:
#user_agent = format["http_headers"]["User-Agent"]
video_url = format["url"]
args = (
[
f"--input-slave={audio_url}",
]
if audio_url
else None
)
return args, urlparse(video_url)
def init():
# Create the config
config: Config = I.get("Config")
youtube_config = recursive_dict_merge(
DEFAULT_CONFIG,
config.options.get(
"me.polynom.microkodi.youtube",
{},
),
)
I.register("YoutubeConfig", youtube_config)
# Register the transformers
I.get("VlcConfig").register_domain_transformer("youtu.be", youtube_url_transformer)
I.get("VlcConfig").register_domain_transformer(
"youtube.com", youtube_url_transformer
)
I.get("VlcConfig").register_domain_transformer(
"www.youtube.com", youtube_url_transformer
)

4
start-microkodi.sh Executable file
View File

@@ -0,0 +1,4 @@
#!/bin/bash
cd `dirname $0`
source .venv/bin/activate
python3 microkodi/main.py -c ./config.json $@