Initial commit

This commit is contained in:
PapaTutuWawa 2025-01-11 15:58:09 +00:00
commit 312238b35a
14 changed files with 788 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
**/*.egg-info
**/__pycache__/
.venv/
# Testing
config.json
scripts/

0
microkodi/__init__.py Normal file
View File

40
microkodi/config.py Normal file
View File

@ -0,0 +1,40 @@
from dataclasses import dataclass
from pathlib import Path
import json
@dataclass
class Config:
host: str
port: int
# Path to the mpv binary
mpv: str
# Path to where the mpv IPC sock should be put
mpv_ipc_sock: str
# Extra args to pass to mpv
mpv_args: list[str]
# Wallpapers to show
wallpapers: list[str]
# Additional scripts to load
scripts: dict[str, str]
def load_config(config_path: Path | None) -> Config:
if config_path is None:
config_data = {}
else:
with open(config_path, "r") as f:
config_data = json.load(f)
return Config(
host=config_data.get("host", "0.0.0.0"),
port=config_data.get("port", 8080),
mpv=config_data.get("mpv", "/usr/bin/mpv"),
mpv_ipc_sock=config_data.get("mpv_ipc_sock", "/tmp/mpv.sock"),
mpv_args=config_data.get("mpv_args", []),
wallpapers=config_data.get("wallpapers", []),
scripts=config_data.get("scripts", {}),
)

19
microkodi/helpers.py Normal file
View File

@ -0,0 +1,19 @@
from dataclasses import dataclass
@dataclass
class KodiTime:
hours: int
minutes: int
seconds: int
def seconds_to_kodi_format(seconds: int) -> KodiTime:
"""Convert seconds into hours, minutes, and seconds as Kodi wants that."""
hours = seconds // 3600
seconds -= hours * 3600
minutes = seconds // 60
seconds -= minutes * 60
return KodiTime(
hours=hours,
minutes=minutes,
seconds=seconds,
)

345
microkodi/jsonrpc.py Normal file
View File

@ -0,0 +1,345 @@
import json
from typing import Any
from urllib.parse import urlparse
import logging
import base64
from http.server import BaseHTTPRequestHandler
from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo
from microkodi.programs.mpv import MpvProgram
from microkodi.helpers import seconds_to_kodi_format
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"result": payload,
"id": id,
}
class JsonRpcObject:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def handle(self, method: str, params: dict[str, Any]) -> Any:
raise NotImplementedError()
class JSONRPCRpcObject:
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Ping":
return "pong"
else:
return "Err"
class ApplicationRpcObject:
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Ping":
return "pong"
elif method == "GetProperties":
return self.get_properties(params)
else:
return "Err"
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str) -> str | bool | dict[str, Any]:
if name == "muted":
return False
elif name == "version":
return {
"major": 20,
"minor": 5,
"revision": "20240501-8c8d7afa26",
"tag": "stable",
}
elif name == "volume":
return 0
elif name == "partymode":
return False
elif name == "index":
return 0
elif name == "isdefault":
return True
elif name == "language":
return "und"
elif name == "name":
return "LOL"
else:
return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"]
}
class PlayerRpcObject(JsonRpcObject):
_active_program: Program | None
def __init__(self):
super().__init__()
self._active_program = None
@property
def is_active(self) -> bool:
return self._active_program is not None and self._active_program.is_active()
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "GetProperties":
return self.get_properties(params)
elif method == "GetItem":
return self.get_item(params)
elif method == "GetActivePlayers":
return self.get_active_players(params)
elif method == "Stop":
return self.stop(params)
elif method == "Open":
return self.open(params)
elif method == "PlayPause":
return self.play_pause(params)
else:
return "Unknown method"
def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active:
return []
return [{"playerid":1,"playertype":"internal","type":"video"}]
def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to toggle player that is not running")
return "OK"
if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"])
return "Err"
playing = self._active_program.is_playing()
if playing:
self._active_program.pause()
else:
self._active_program.resume()
return {
"speed": 1 if not playing else 0,
}
def stop(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to stop player that is not running")
return "OK"
self._active_program.stop("playerrpcobject.stop")
return "OK"
def open(self, params: dict[str, Any]) -> Any:
url = urlparse(params["item"]["file"])
program_cls = None
# Handle plugins
if url.scheme == "plugin":
if url.netloc == "plugin.video.sendtokodi":
self.logger.debug("Rewriting provided URL to %s", url.query)
url = urlparse(url.query)
if url.scheme == "https":
program_cls = MpvProgram
if program_cls is None:
return {
"error": "invalid protocol"
}
I.get("DataBridge").set_loading(True)
if self._active_program is not None:
if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open")
else:
self._active_program.exit()
self._active_program = program_cls()
else:
self._active_program = program_cls()
self._active_program.play(url)
def get_item(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> Any:
if name == "title":
return player_info.title
elif name == "label":
return player_info.title
elif name == "art":
return {
"icon": "image://DefaultVideo.png/",
}
return name if self.is_active else ""
player_info = self._active_program.get_player_info()
item = {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return {
"item": {
**item,
"label": player_info.title,
}
}
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]:
if name == "currentsubtitle":
return None
elif name == "partymode":
return False
elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0
elif name == "playlistid":
return 0
elif name == "position":
return 0
elif name == "repeat":
return "off"
elif name == "shuffled":
return False
elif name == "speed":
return 1 if player_info.playing else 0
elif name == "subtitleenabled":
return False
elif name == "subtitles":
return []
elif name == "time":
kodi_time = seconds_to_kodi_format(player_info.position)
return {
"hours": kodi_time.hours,
"milliseconds": 0,
"minutes": kodi_time.minutes,
"seconds": kodi_time.seconds,
}
elif name == "totaltime":
kodi_time = seconds_to_kodi_format(player_info.runtime)
return {
"hours": kodi_time.hours,
"milliseconds": 0,
"minutes": kodi_time.minutes,
"seconds": kodi_time.seconds,
}
elif name == "audiostreams":
return []
elif name == "currentaudiostream":
return {}
elif name == "isdefault":
return True
else:
self.logger.error("Unknown param '%s'", name)
return "Unknown"
player_info = self._active_program.get_player_info()
return {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Clear":
return self.clear(params)
elif method == "GetPlaylists":
return self.get_playlists(params)
elif method == "GetItems":
return self.get_items(params)
def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid)
return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}]
def get_items(self, params: dict[str, Any]) -> Any:
return "ERR"
class GlobalMethodHandler:
objects: dict[str, JsonRpcObject]
def __init__(self):
self.objects = {}
# Register default objects
self.register("JSONRPC", JSONRPCRpcObject())
self.register("Application", ApplicationRpcObject())
self.register("Player", PlayerRpcObject())
self.register("Playlist", PlaylistRpcObject())
def register(self, name: str, obj: JsonRpcObject):
self.objects[name] = obj
I.register(name, obj)
class JsonRpcHandler(BaseHTTPRequestHandler):
logger: logging.Logger
def do_GET(self):
logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path)
def do_POST(self):
logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc":
logger.error("POST on invalid path %s", self.path)
self.send_error(404)
return
body = self.rfile.read(int(self.headers["Content-Length"])).decode()
payload = json.loads(body)
# Only allow version 2
if payload["jsonrpc"] != "2.0":
self.send_error(400)
return
object_name, method_name = payload["method"].split(".")
call_id: int = payload["id"]
params = payload.get("params") or {}
obj = I.get("GlobalMethodHandler").objects.get(object_name)
if obj is None:
logger.debug("Payload: %s", body)
self.send_error(404)
return
# TODO: Handle authentication
authenticated = False
if authenticated:
auth: str | None = self.headers.get("Authorization")
if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method)
self.send_error(401)
return
auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method)
self.send_error(401)
return
credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method)
self.send_error(403)
return
response = jsonrpc_response(
call_id,
obj.handle(method_name, params),
)
response_body = json.dumps(response).encode()
logger.debug("%s.%s: %s", object_name, method_name, payload)
logger.debug("-> %s", response_body)
self.send_response_only(200)
self.send_header("Content-Length", len(response_body))
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response_body)
self.wfile.flush()
self.log_request(code="200")

88
microkodi/main.py Normal file
View File

@ -0,0 +1,88 @@
from http.server import HTTPServer
import logging
import sys
import threading
import argparse
from pathlib import Path
import importlib.util
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
from microkodi.jsonrpc import JsonRpcHandler, GlobalMethodHandler
from microkodi.ui.bridge import DataBridge
from microkodi.config import Config, load_config
from microkodi.repository import I
def run_kodi_server():
config: Config = I.get("Config")
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("JsonRPCServer")
method_handler = GlobalMethodHandler()
I.register("GlobalMethodHandler", method_handler)
# Load extra plugins
if config.scripts:
logger.info("Loading scripts...")
for name, path in config.scripts.items():
logger.debug("Trying to load %s (%s)", name, path)
if not Path(path).exists():
logger.warning("Failed to load %s: File does not exist", name)
continue
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
# Inject special globals
setattr(module, "I", I)
setattr(module, "logger", logging.getLogger(name))
spec.loader.exec_module(module)
if not hasattr(module, "init"):
logger.warning("Failed to load %s: No init function", name)
continue
init_method = getattr(module, "init")
init_method()
server = HTTPServer
httpd = HTTPServer((config.host, config.port,), JsonRpcHandler)
logger.info("Starting server on %s:%i", config.host, config.port)
httpd.serve_forever()
logger.info("Shutting down server")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", type=Path, help="Location of the config file")
options = parser.parse_args()
# Load the config
config = load_config(options.config)
I.register("Config", config)
bridge = DataBridge()
I.register("DataBridge", bridge)
# Start the server
server_thread = threading.Thread(target=run_kodi_server, daemon=True)
server_thread.start()
# Setup the UI
app = QGuiApplication(sys.argv)
engine = QQmlApplicationEngine()
engine.addImportPath(sys.path[0])
engine.rootContext().setContextProperty(
"wallpapers",
config.wallpapers,
)
engine.loadFromModule("qml", "Main")
if not engine.rootObjects():
sys.exit(-1)
engine.rootObjects()[0].setProperty("bridge", bridge)
exit_code = app.exec()
del engine
sys.exit(exit_code)

13
microkodi/process.py Normal file
View File

@ -0,0 +1,13 @@
from threading import Thread
import subprocess
from typing import Callable
def nonblocking_run(args: list[str], when_done: Callable[[], None]) -> subprocess.Popen:
def _run(process: subprocess.Popen):
process.wait()
when_done()
process = subprocess.Popen(args)
t = Thread(target=_run, args=(process,))
t.start()
return process

View File

@ -0,0 +1,40 @@
from dataclasses import dataclass
from urllib.parse import ParseResult
import logging
@dataclass
class PlayerInfo:
runtime: int
position: int
title: str
playing: bool
class Program:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def is_active(self) -> bool:
raise NotImplementedError()
def is_playing(self) -> bool:
raise NotImplementedError()
def play(self, url: ParseResult):
raise NotImplementedError()
def resume(self):
raise NotImplementedError()
def pause(self):
raise NotImplementedError()
def stop(self):
raise NotImplementedError()
def exit(self):
raise NotImplementedError()
def get_player_info(self) -> PlayerInfo:
raise NotImplementedError()

144
microkodi/programs/mpv.py Normal file
View File

@ -0,0 +1,144 @@
import subprocess
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse, urldefrag, unquote
import logging
from dataclasses import dataclass
import os
import json
import socket
from microkodi.process import nonblocking_run
from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
class MpvConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
def __init__(self):
self._ext_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("MpvConfig").debug("Registering transformer for .%s URLs", ext)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
I.register("MpvConfig", MpvConfig())
class MpvProgram(Program):
_process: subprocess.Popen
_mpv_bin_path: str
_mpv_extra_args: list[str]
_mpv_ipc_sock: str
def __init__(self):
super().__init__()
config: Config = I.get("Config")
self._process = None
self._mpv_bin_path = config.mpv
self._mpv_extra_args = config.mpv_args
self._mpv_ipc_sock = config.mpv_ipc_sock
def register_url_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
self.logger.debug("Registering transformer for .%s URLs", ext)
self._ext_map[ext] = transformer
def __mpv_command(self, args: list[str]) -> dict[str, Any]:
ipc_logger = logging.getLogger("mpv-ipc")
command = json.dumps({
"command": args,
}) + "\n"
ipc_logger.debug("-> %s", command)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
try:
client.connect(self._mpv_ipc_sock)
except ConnectionRefusedError:
ipc_logger.warn("Connection refused")
return {}
client.send(command.encode("utf-8"))
resp_raw = client.recv(4096).decode()
ipc_logger.debug("<- %s", resp_raw)
return json.loads(resp_raw)
def __read_prop(self, prop: str, default: Any) -> Any:
return self.__mpv_command(["get_property", prop]).get("data", default)
def is_active(self) -> bool:
return self._process is not None and self._process.returncode is None
def is_playing(self) -> bool:
return not self.__read_prop("pause", False)
def resume(self):
self.__mpv_command(["set_property", "pause", False])
def play(self, url: ParseResult):
self.stop("mpvplayer.play")
# Allow preprocessing the URL
last_segment = url.path.split("/")[-1]
final_url = ""
extra_args = []
if "." in last_segment:
*_, ext = last_segment.split(".")
transformer = I.get("MpvConfig").get_ext_transformer(ext)
if transformer is not None:
self.logger.info("Transforming URL due to extension: %s", ext)
extra_args, final_url = transformer(url)
self.logger.info("New url: %s", final_url)
else:
final_url = urlunparse(url)
else:
final_url = urlunparse(url)
# Start mpv
cmdline = [
self._mpv_bin_path,
f"--input-ipc-server={self._mpv_ipc_sock}",
"--fs",
"--quiet",
*self._mpv_extra_args,
*extra_args,
final_url,
]
self.logger.debug("Cmdline: %s", " ".join(cmdline))
self._process = nonblocking_run(cmdline, self._when_mpv_exit)
def _when_mpv_exit(self):
self.logger.info("MPV has exited")
self._process = None
I.get("DataBridge").set_loading(False)
def pause(self):
self.__mpv_command(["set_property", "pause", True])
def stop(self, src):
if self.is_active():
self._process.terminate()
self._process = None
def exit(self):
self.stop("mpvplayer.exit")
def get_player_info(self) -> PlayerInfo:
if not os.path.exists(self._mpv_ipc_sock):
return PlayerInfo(
runtime=0,
position=0,
title="",
playing=False,
)
return PlayerInfo(
runtime=int(self.__read_prop("duration", "0")),
position=int(self.__read_prop("playback-time", "0")),
title=self.__read_prop("media-title", ""),
playing=not self.__read_prop("pause", False),
)

52
microkodi/qml/Main.qml Normal file
View File

@ -0,0 +1,52 @@
import QtQuick
import QtQuick.Controls
import QtQuick.Layouts
Window {
id: window
visible: true
title: "Hello World"
visibility: Window.Maximized
flags: Qt.FramelessWindowHint
property var wallpaperIndex: 0
property var isLoading: false
property var bridge
Connections {
target: bridge
function onIsLoading(loading) {
isLoading = loading
}
}
Image {
height: window.height
width: window.width
visible: true
source: wallpapers[wallpaperIndex]
fillMode: Image.PreserveAspectCrop
}
Popup {
visible: isLoading
x: Math.round((parent.width - width) / 2)
y: Math.round((parent.height - height) / 2)
width: 100
height: 100
background: BusyIndicator {
running: isLoading
width: 100
height: 100
}
}
Timer {
interval: 5 * 60 * 1000
running: true
repeat: true
onTriggered: wallpaperIndex = (wallpaperIndex + 1) % wallpapers.length
}
}

2
microkodi/qml/qmldir Normal file
View File

@ -0,0 +1,2 @@
module qml
Main 254.0 Main.qml

15
microkodi/repository.py Normal file
View File

@ -0,0 +1,15 @@
from typing import Any
class _Repository:
_data: dict[str, Any]
def __init__(self):
self._data = {}
def register(self, key: str, value: Any):
self._data[key] = value
def get(self, key: str) -> Any | None:
return self._data.get(key)
I = _Repository()

14
microkodi/ui/bridge.py Normal file
View File

@ -0,0 +1,14 @@
from PySide6.QtCore import QObject, Signal
class DataBridge(QObject):
"""Bridge between the QML app and the Python "host"."""
# Indicates whether we're currently loading something or not
isLoading = Signal(bool, arguments=["loading"])
def __init__(self):
super().__init__()
def set_loading(self, state: bool):
"""Set the loading state in the UI."""
self.isLoading.emit(state)

9
pyproject.toml Normal file
View File

@ -0,0 +1,9 @@
[project]
name = "microkodi"
version = "0.1.0"
dependencies = [
"pyside6"
]
[tools.build]
packages = ["microkodi"]