Files
scripts/rdify

145 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python
# Script to ramdiskify things
import argparse
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class RamDiskItem():
def __init__(self, target, persist):
self.target = Path(target)
self.persist = Path(persist)
self.ramdisk = Path("/dev/shm/ramdisk")
self.full_rd_path = self.ramdisk.joinpath(self.target.name)
self.full_persist_path = self.persist.joinpath(self.target.name)
self.rsync_destination = self.full_persist_path if self.target.is_file() else self.persist
def check_state(self):
path_states = (
self.target.is_symlink(), # Is the target a symlink?
self.target.resolve().exists(), # Does the symlink resolve to something that exists?
self.target.resolve() == self.full_rd_path, # Does the symlink point to the ramdisk location?
self.full_persist_path.exists() # Does the persistent copy exist?
)
match path_states:
case (False, True, False, False):
return "UNINITIALIZED" # Normal file/dir, not ramdiskified
case (True, True, True, True):
return "RAMDISKIFIED" # Symlink to ramdisk, persistent copy exists
case (True, True, True, False):
return "NONPERSISTENT" # Symlink to ramdisk, persistent copy exists
case (True, False, _, True):
return "PERSIST_ONLY" # Symlink broken, persistent copy exists
case (True, False, _, False):
return "UNRECOVERABLE" # Symlink broken, persistent copy missing
case (False, False, _, _):
return "MISSING_TARGET" # Target missing
case _:
eprint("Unhandled state:", path_states)
sys.exit(1)
def init(self):
match self.check_state():
case "UNINITIALIZED":
self.persist.mkdir(parents=True, exist_ok=True)
self.ramdisk.mkdir(parents=True, exist_ok=True)
if self.target.is_file():
shutil.copy2(self.target, self.ramdisk)
else:
shutil.copytree(self.target, self.full_rd_path, dirs_exist_ok=True)
shutil.move(self.target, self.persist)
os.symlink(self.full_rd_path, self.target)
case "RAMDISKIFIED":
pass
case "NONPERSISTENT":
self.persist.mkdir(parents=True, exist_ok=True)
self.sync()
case "PERSIST_ONLY":
if self.full_persist_path.is_file():
shutil.copy2(self.full_persist_path, self.ramdisk)
else:
shutil.copytree(self.full_persist_path, self.full_rd_path, dirs_exist_ok=True)
case _:
eprint(f"Cannot initialize: {self.check_state()}")
def sync(self):
if self.check_state() != "RAMDISKIFIED":
self.init()
# TODO: actually look at output from this...
subprocess.run(["rsync", "-aP", "--delete", self.full_rd_path, self.rsync_destination])
def loop_sync(self, delay):
while True:
try:
self.sync()
time.sleep(delay)
except KeyboardInterrupt:
self.sync()
eprint("Interrupt received, exiting...")
return
def cleanup(self):
if self.check_state() != "RAMDISKIFIED":
self.init()
self.sync()
self.target.unlink()
shutil.move(self.full_persist_path, self.target.parent)
if self.full_rd_path.is_file():
self.full_rd_path.unlink()
else:
shutil.rmtree(self.full_rd_path)
def main():
parser = argparse.ArgumentParser()
# Arguments to specify the action to take
action = parser.add_mutually_exclusive_group(required=True)
action.add_argument("-i", "--init", action="store_true",
help="Initialize the target file/directory into ramdisk")
action.add_argument("-s", "--sync", action="store_true",
help="Perform a single sync of the target from ramdisk to persistent storage")
action.add_argument("-l", "--loop_sync", type=int,
help="Starts a repeating sync that loops every LOOP_SYNC seconds")
action.add_argument("-c", "--cleanup", action="store_true",
help="Clean up an existing ramdiskified target")
action.add_argument("-d", "--diagnose", action="store_true",
help="Check the current state")
# Arguments to specify what to act on
parser.add_argument("target", help="The target file/directory to ramdiskify")
parser.add_argument("persist", nargs="?", default=".persist",
help="The location to store the persistent copy while ramdiskified, defaults to '.persist'")
args = parser.parse_args()
rd_item = RamDiskItem(args.target, args.persist)
if args.diagnose:
state = rd_item.check_state()
print(f"State of {args.target}: {state}")
return
if args.init:
rd_item.init()
if args.sync:
rd_item.sync()
if args.loop_sync:
rd_item.loop_sync(args.loop_sync)
if args.cleanup:
rd_item.cleanup()
if __name__ == "__main__":
main()