microkodi/microkodi/programs/vlc.py

212 lines
7.1 KiB
Python

from xml.dom.minidom import parseString
from typing import Any, Callable
from urllib.parse import ParseResult, urlunparse
import subprocess
import logging
import secrets
import requests
from microkodi.process import nonblocking_run
from microkodi.config import Config
from microkodi.programs import PlayerInfo, Program
from microkodi.repository import I
from microkodi.helpers import KodiTime
EVENT_PLAYER_EXIT = "post-player-exit"
class VlcConfig:
# Mapping of file extensions to a potential transformer function
_ext_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
# Mapping of domains to a potential transformer function
_domain_map: dict[str, Callable[[ParseResult], tuple[list[str], str]]]
_event_listeners: dict[str, Callable[[], None]]
def __init__(self):
self._ext_map = {}
self._event_listeners = {}
self._domain_map = {}
def register_ext_transformer(self, ext: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for .%s URLs", ext)
self._ext_map[ext] = transformer
def get_ext_transformer(self, ext: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._ext_map.get(ext)
def register_domain_transformer(self, domain: str, transformer: Callable[[ParseResult], tuple[list[str], str]]):
logging.getLogger("VlcConfig").debug("Registering transformer for the %s domain", domain)
self._domain_map[domain] = transformer
def get_domain_transformer(self, domain: str) -> Callable[[ParseResult], tuple[list[str], str]] | None:
return self._domain_map.get(domain)
def register_event_listener(self, event: str, callback: Callable[[], None]):
if event not in self._event_listeners:
self._event_listeners[event] = []
logging.getLogger("VlcConfig").debug("Registering event listener for %s", event)
self._event_listeners[event].append(callback)
def run_event_listeners(self, event: str):
logger = logging.getLogger("VlcConfig")
for listener in self._event_listeners.get(event, []):
logger.debug("Running listener %s for event %s", listener.__name__, event)
listener()
I.register("VlcConfig", VlcConfig())
class VlcProgram(Program):
_process: subprocess.Popen
_vlc_bin_path: str
_vlc_extra_args: list[str]
_vlc_password: str | None
def __init__(self):
super().__init__()
config: Config = I.get("Config")
self._process = None
self._vlc_bin_path = config.vlc
self._vlc_extra_args = config.vlc_args
self._vlc_password = None
def is_active(self) -> bool:
return self._process is not None and self._process.returncode is None
def is_playing(self) -> bool:
return self.get_player_info().is_playing
def resume(self):
self.__vlc_command("pl_pause")
def play(self, url: ParseResult):
self.stop("mpvplayer.play")
# Allow transforming the domain
extra_args = []
domain_transformer = I.get("VlcConfig").get_domain_transformer(url.netloc)
if domain_transformer is not None:
args, new_url = domain_transformer(url)
extra_args += args
url = new_url
# Allow preprocessing the URL
last_segment = url.path.split("/")[-1]
final_url = ""
if "." in last_segment:
*_, ext = last_segment.split(".")
transformer = I.get("VlcConfig").get_ext_transformer(ext)
if transformer is not None:
self.logger.info("Transforming URL due to extension: %s", ext)
args, final_url = transformer(url)
extra_args += args
self.logger.info("New url: %s", final_url)
else:
final_url = urlunparse(url)
else:
final_url = urlunparse(url)
# Start vlc
self._vlc_password = secrets.token_hex(32)
cmdline = [
self._vlc_bin_path,
*self._vlc_extra_args,
"-f",
"--qt-minimal-view",
"--no-osd",
"--intf=qt",
"--extraintf=http",
"--http-host=127.0.0.1",
"--http-port=9090",
f"--http-password={self._vlc_password}",
"--quiet",
"--play-and-exit",
*extra_args,
final_url,
]
self.logger.debug("Cmdline: %s", " ".join(cmdline))
self._process = nonblocking_run(cmdline, self._when_vlc_exit)
def _when_vlc_exit(self):
self.logger.info("vlc has exited")
I.get("VlcConfig").run_event_listeners(EVENT_PLAYER_EXIT)
I.get("DataBridge").set_loading(False)
if self._process.returncode != 0:
I.get("DataBridge").notification.emit("VLC exited with an error")
self._process = None
def __vlc_command(self, command: str, params: dict[str, Any] | None = None) -> str | None:
try:
req = requests.get(
f"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
params={
"command": command,
**(params or {}),
},
)
return req.text
except Exception as ex:
self.logger.warning("Failed to command vlc API: %s", ex)
return None
def pause(self):
self.__vlc_command("pl_pause")
def stop(self, src):
if self.is_active():
self._process.terminate()
def seek(self, timestamp: KodiTime):
self.__vlc_command(
"seek",
{
"val": timestamp.to_seconds(),
}
)
def exit(self):
self.stop("mpvplayer.exit")
def get_player_info(self) -> PlayerInfo:
try:
req = requests.get(
"http://127.0.0.1:9090/requests/status.xml",
auth=("", self._vlc_password),
)
dom = parseString(req.text)
except Exception as ex:
self.logger.warning("Failed to query vlc API: %s", ex)
return PlayerInfo(
runtime=0,
position=0,
title="",
playing=False,
)
try:
root = dom.getElementsByTagName("root")[0]
time = int(root.getElementsByTagName("time")[0].childNodes[0].data)
length = int(root.getElementsByTagName("length")[0].childNodes[0].data)
playing = root.getElementsByTagName("state")[0].childNodes[0].data == "playing"
except Exception as ex:
self.logger.warning("Failed to parse vlc API response: %s", ex)
self.logger.debug("Response body: %s", req.text)
return PlayerInfo(
runtime=0,
position=0,
title="",
playing=False,
)
return PlayerInfo(
runtime=length,
position=time,
# TODO
title="TODO: Find out",
playing=playing,
)