from pathlib import Path import tempfile from typing import Optional from lmm.cmd import run_sudo_cmd def compute_workdirs_location(mount: Path) -> Path: with open("/proc/mounts", "r") as f: mounts = f.read().split("\n") mounted_dirs = [ m for line in mounts if line != "" and (m := line.strip().split(" ")[1]) != "/" ] mounted_dirs.sort() closest_mount = None for m in mounted_dirs: if str(mount).startswith(m) and len(m) > len(closest_mount or ""): closest_mount = m continue return Path("/tmp") if closest_mount is None else Path(closest_mount) / ".temp" class OverlayFSMount: upper: Optional[Path] lower: list[Path | str] workdir: Optional[tempfile.TemporaryDirectory] mount_path: Path _mounted: bool cd: Optional[Path] def __init__( self, upper: Optional[Path], lower: list[Path | str], mount: Path, cd: Optional[Path] = None, ): self.upper = upper self.lower = lower self.mount_path = mount self._mounted = False self.cd = cd assert len(lower) < 500, "overlayfs only supports up-to 500 lower directories" def create_workdir(self) -> Path: # Ensure the temporary directory for workdirs exists workdir_location = compute_workdirs_location(self.mount_path) if not workdir_location.exists(): workdir_location.mkdir(parents=True, exist_ok=True) # Create the workdir self.workdir = tempfile.TemporaryDirectory( dir=workdir_location, ) print(f"Temporary workdir {self.workdir.name} created") return Path(self.workdir.name) def mount(self) -> Optional[Path]: lower = ":".join([str(l) for l in self.lower]) options = f"lowerdir={lower}" if self.upper is not None: workdir = self.create_workdir() options += f",upperdir={self.upper},workdir={workdir}" result = run_sudo_cmd( [ "mount", "-t", "overlay", "overlay", "-o", options, str(self.mount_path), ], cd=self.cd, ) print("Mount result:", result) if not result == 0: return None self._mounted = True return self.mount_path def unmount(self): if self._mounted: print("Unmounting...") # Remove the mount run_sudo_cmd( [ "umount", str(self.mount_path), ] ) # Remove the temporary workdir if self.workdir is not None: try: # TODO: This fails because the workdir is owned by root self.workdir.cleanup() except PermissionError: print(f"Failed to clean temporary directory {self.workdir.name}") self._mounted = False def __del__(self): self.unmount()