linux-mod-manager/lmm/overlayfs.py

143 lines
3.9 KiB
Python

from pathlib import Path
import tempfile
from typing import Optional
from lmm.config import LMMConfig
from lmm.cmd import run_sudo_cmd, run_cmd
def compute_workdirs_location(mount: Path) -> Path:
with open("/proc/mounts", "r") as f:
mounts = f.read().split("\n")
mounted_dirs = [
m for line in mounts if line != "" and (m := line.strip().split(" ")[1]) != "/"
]
mounted_dirs.sort()
closest_mount = None
for m in mounted_dirs:
if str(mount).startswith(m) and len(m) > len(closest_mount or ""):
closest_mount = m
continue
return Path("/tmp") if closest_mount is None else Path(closest_mount) / ".temp"
class OverlayFSMount:
upper: Optional[Path]
lower: list[Path | str]
workdir: Optional[tempfile.TemporaryDirectory]
mount_path: Path
_mounted: bool
cd: Optional[Path]
def __init__(
self,
upper: Optional[Path],
lower: list[Path | str],
mount: Path,
cd: Optional[Path] = None,
):
self.upper = upper
self.lower = lower
self.mount_path = mount
self._mounted = False
self.cd = cd
assert len(lower) < 500, "overlayfs only supports up-to 500 lower directories"
def create_workdir(self) -> Path:
# Ensure the temporary directory for workdirs exists
workdir_location = compute_workdirs_location(self.mount_path)
if not workdir_location.exists():
workdir_location.mkdir(parents=True, exist_ok=True)
# Create the workdir
self.workdir = tempfile.TemporaryDirectory(
dir=workdir_location,
)
print(f"Temporary workdir {self.workdir.name} created")
return Path(self.workdir.name)
def mount(self) -> Optional[Path]:
lower = ":".join([str(l) for l in self.lower])
options = f"lowerdir={lower}"
if self.upper is not None:
workdir = self.create_workdir()
options += f",upperdir={self.upper},workdir={workdir}"
if LMMConfig.use_fuse:
result = run_cmd(
[
LMMConfig.overlayfs_command,
"-o",
options,
str(self.mount_path),
],
cd=self.cd,
)
else:
result = run_sudo_cmd(
[
"mount",
"-t",
"overlay",
"overlay",
"-o",
options,
str(self.mount_path),
],
cd=self.cd,
)
print("Mount result:", result)
if not result == 0:
return None
self._mounted = True
return self.mount_path
def unmount(self) -> bool:
if self._mounted:
print("Unmounting...")
# Remove the mount
if LMMConfig.use_fuse:
returncode = run_cmd(
[
LMMConfig.fusermount_command,
"-u",
str(self.mount_path),
]
) == 0
else:
returncode = run_sudo_cmd(
[
"umount",
str(self.mount_path),
]
)
if returncode != 0:
print("Unmounting failed")
return False
# Remove the temporary workdir
if self.workdir is not None:
try:
# TODO: This fails because the workdir is owned by root
self.workdir.cleanup()
except PermissionError:
print(f"Failed to clean temporary directory {self.workdir.name}")
self._mounted = False
return True
def __del__(self):
self.unmount()