microkodi/microkodi/jsonrpc.py

394 lines
14 KiB
Python

import json
from typing import Any, Callable
from dataclasses import dataclass
from urllib.parse import urlparse
import logging
import base64
import sys
from http.server import BaseHTTPRequestHandler
from microkodi.repository import I
from microkodi.programs import Program, PlayerInfo
from microkodi.helpers import KodiTime, seconds_to_kodi_format, after
def jsonrpc_response(id: int, payload: dict[str, Any]) -> dict[str, Any]:
return {
"jsonrpc": "2.0",
"result": payload,
"id": id,
}
class JsonRpcObject:
logger: logging.Logger
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def handle(self, method: str, params: dict[str, Any]) -> Any:
raise NotImplementedError()
class JSONRPCRpcObject:
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Ping":
return "pong"
else:
return "Err"
class ApplicationRpcObject:
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Ping":
return "pong"
elif method == "GetProperties":
return self.get_properties(params)
else:
return "Err"
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str) -> str | bool | dict[str, Any]:
if name == "muted":
return False
elif name == "version":
return {
"major": 20,
"minor": 5,
"revision": "20240501-8c8d7afa26",
"tag": "stable",
}
elif name == "volume":
return 0
elif name == "partymode":
return False
elif name == "index":
return 0
elif name == "isdefault":
return True
elif name == "language":
return "und"
elif name == "name":
return "LOL"
else:
return "Unknown"
return {
prop: _get_property(prop) for prop in params["properties"]
}
class PlayerRpcObject(JsonRpcObject):
_active_program: Program | None
def __init__(self):
super().__init__()
self._active_program = None
@property
def is_active(self) -> bool:
return self._active_program is not None and self._active_program.is_active()
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "GetProperties":
return self.get_properties(params)
elif method == "GetItem":
return self.get_item(params)
elif method == "GetActivePlayers":
return self.get_active_players(params)
elif method == "Stop":
return self.stop(params)
elif method == "Open":
return self.open(params)
elif method == "PlayPause":
return self.play_pause(params)
elif method == "Seek":
return self.seek(params)
else:
return "Unknown method"
def seek(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to seek player that is not active")
return "ERR"
time_raw = params["value"]["time"]
kodi_time = KodiTime(
hours=time_raw["hours"],
minutes=time_raw["minutes"],
seconds=time_raw["seconds"],
)
self._active_program.seek(kodi_time)
return "Ok"
def get_active_players(self, params: dict[str, Any]) -> Any:
if not self.is_active:
return []
return [{"playerid":1,"playertype":"internal","type":"video"}]
def play_pause(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to toggle player that is not running")
return "OK"
if params["play"] != "toggle":
self.logger.warn("Trying to call PlayPause with unknown play: '%s'", params["play"])
return "Err"
playing = self._active_program.is_playing()
if playing:
self._active_program.pause()
else:
self._active_program.resume()
return {
"speed": 1 if not playing else 0,
}
def stop(self, params: dict[str, Any]) -> Any:
if not self.is_active:
self.logger.warn("Trying to stop player that is not running")
return "OK"
self._active_program.stop("playerrpcobject.stop")
return "OK"
@after(lambda: I.get("DataBridge").set_loading(False))
def open(self, params: dict[str, Any]) -> Any:
# Turn on the TV
config: Config = I.get("Config")
I.get("TVHandler").turn_on_tv()
I.get("DataBridge").set_loading(True)
url = urlparse(params["item"]["file"])
# Handle plugins
if url.scheme == "plugin":
if url.netloc == "plugin.video.sendtokodi":
self.logger.debug("Rewriting provided URL to %s", url.query)
url = urlparse(url.query)
# Find out what player class to use
scheme_configuration = config.players.get(url.scheme)
if scheme_configuration is None:
I.get("DataBridge").notification.emit(f"No player available for {url.scheme}")
self.logger.warn("Client requested unknown scheme: '%s'", url.scheme)
return {
"error": "invalid protocol"
}
player_class_name = scheme_configuration.get(url.netloc)
if player_class_name is None:
player_class_name = scheme_configuration.get("*")
if player_class_name is None:
I.get("DataBridge").notification.emit(f"No player available for {url.netloc}")
self.logger.warn("No player was picked for url '%s'", url)
return {
"error": "invalid protocol"
}
# Try to import the class
*module_parts, class_name = player_class_name.split(".")
module_name = ".".join(module_parts)
program_cls = None
if module_name not in sys.modules:
self.logger.debug("Trying to import %s to get class %s", module_name, class_name)
spec = importlib.util.find_spec(module)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
program_cls = getattr(sys.modules[module_name], class_name, None)
if program_cls is None:
I.get("DataBridge").notification.emit("Could not start player")
self.logger.warn("Class %s not found in module %s", class_name, module_name)
return {
"error": "invalid protocol"
}
if self._active_program is not None:
if isinstance(self._active_program, program_cls):
self._active_program.stop("playerrpcobject.open")
else:
self._active_program.exit()
self._active_program = program_cls()
else:
self._active_program = program_cls()
self._active_program.play(url)
def get_item(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> Any:
if name == "title":
return player_info.title
elif name == "label":
return player_info.title
elif name == "art":
return {
"icon": "image://DefaultVideo.png/",
}
return name if self.is_active else ""
player_info = self._active_program.get_player_info()
item = {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
return {
"item": {
**item,
"label": player_info.title,
}
}
def get_properties(self, params: dict[str, Any]) -> Any:
def _get_property(name: str, player_info: PlayerInfo) -> str | bool | dict[str, Any]:
if name == "currentsubtitle":
return None
elif name == "partymode":
return False
elif name == "percentage":
return player_info.position / player_info.runtime if player_info.runtime > 0 else 0.0
elif name == "playlistid":
return 0
elif name == "position":
return 0
elif name == "repeat":
return "off"
elif name == "shuffled":
return False
elif name == "speed":
return 1 if player_info.playing else 0
elif name == "subtitleenabled":
return False
elif name == "subtitles":
return []
elif name == "time":
kodi_time = seconds_to_kodi_format(player_info.position)
return {
"hours": kodi_time.hours,
"milliseconds": 0,
"minutes": kodi_time.minutes,
"seconds": kodi_time.seconds,
}
elif name == "totaltime":
kodi_time = seconds_to_kodi_format(player_info.runtime)
return {
"hours": kodi_time.hours,
"milliseconds": 0,
"minutes": kodi_time.minutes,
"seconds": kodi_time.seconds,
}
elif name == "audiostreams":
return []
elif name == "currentaudiostream":
return {}
elif name == "isdefault":
return True
else:
self.logger.error("Unknown param '%s'", name)
return "Unknown"
player_info = self._active_program.get_player_info()
return {
prop: _get_property(prop, player_info) for prop in params["properties"]
}
class PlaylistRpcObject(JsonRpcObject):
def handle(self, method: str, params: dict[str, Any]) -> Any:
if method == "Clear":
return self.clear(params)
elif method == "GetPlaylists":
return self.get_playlists(params)
elif method == "GetItems":
return self.get_items(params)
def clear(self, params: dict[str, Any]) -> Any:
playlistid: int = params["playlistid"]
self.logger.warn("Asked to empty playlist %d but we're just pretending to do so", playlistid)
return "OK"
def get_playlists(self, params: dict[str, Any]) -> Any:
return [{"playlistid":1,"type":"video"}]
def get_items(self, params: dict[str, Any]) -> Any:
return "ERR"
class GlobalMethodHandler:
objects: dict[str, JsonRpcObject]
def __init__(self):
self.objects = {}
# Register default objects
self.register("JSONRPC", JSONRPCRpcObject())
self.register("Application", ApplicationRpcObject())
self.register("Player", PlayerRpcObject())
self.register("Playlist", PlaylistRpcObject())
def register(self, name: str, obj: JsonRpcObject):
self.objects[name] = obj
I.register(name, obj)
class JsonRpcHandler(BaseHTTPRequestHandler):
logger: logging.Logger
def do_GET(self):
logger = logging.getLogger("JsonRpcHandler")
logger.debug("GET %s", self.path)
def do_POST(self):
logger = logging.getLogger("JsonRpcHandler")
if self.path != "/jsonrpc":
logger.error("POST on invalid path %s", self.path)
self.send_error(404)
return
body = self.rfile.read(int(self.headers["Content-Length"])).decode()
payload = json.loads(body)
# Only allow version 2
if payload["jsonrpc"] != "2.0":
self.send_error(400)
return
object_name, method_name = payload["method"].split(".")
call_id: int = payload["id"]
params = payload.get("params") or {}
obj = I.get("GlobalMethodHandler").objects.get(object_name)
if obj is None:
logger.debug("Payload: %s", body)
self.send_error(404)
return
# TODO: Handle authentication
authenticated = False
if authenticated:
auth: str | None = self.headers.get("Authorization")
if auth is None:
logger.warn("Client provided no Authorization header. Method: %s", method)
self.send_error(401)
return
auth_type, auth_payload = auth.split(" ")
if auth_type != "Basic":
logger.warn("Client provided no Basic authorization. Method: %s", method)
self.send_error(401)
return
credentials = base64.b64decode(auth_payload).decode("utf-8")
if credentials != "kodi:1234":
logger.warn("Rejecting request due to wrong credentials. Method: %s", method)
self.send_error(403)
return
response = jsonrpc_response(
call_id,
obj.handle(method_name, params),
)
response_body = json.dumps(response).encode()
logger.debug("%s.%s: %s", object_name, method_name, payload)
logger.debug("-> %s", response_body)
self.send_response_only(200)
self.send_header("Content-Length", len(response_body))
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response_body)
self.wfile.flush()
self.log_request(code="200")