Source code for distrax.filesystems.ceph_filesystem
import configparser
import getpass
import json
import os
import subprocess
import time
import distrax.utils.fileio as fileio
from distrax.exceptions.exceptions import MountingFilesystemError
from distrax.filesystems import FILESYSTEM
[docs]class CephFilesystem:
"""Ceph Filesystem Class.
This class contains all the methods required to mount and unmount a Ceph Filesystem
To read more about the Ceph Gateway please see:
https://docs.ceph.com/en/latest/glossary/#term-CephFS
Examples:
>>> filesystem = CephFilesystem()
>>> filesystem = CephFilesystem(folder="ceph")
"""
def __init__(self, folder: str = "ceph", timeout: int = 5):
"""Initialise the CephFilesystem object.
Examples:
>>> filesystem = CephFilesystem()
"""
self.mount_point: str = "/mnt/distrax"
self.folder = folder
self.timeout = timeout
[docs] def mount_filesystem(self) -> None:
"""Mount the Ceph Filesystem.
This mounts the filesystem for the user in the /mnt/distrax/ location
Examples:
>>> filesystem.mount_filesystem()
"""
# Get user
user = getpass.getuser()
# Get admin key
admin_key = subprocess.run(
["ceph", "auth", "print-key", "client.admin"], stdout=subprocess.PIPE
).stdout.decode()
# Create Filesystem directory
fileio.create_dir(self.mount_point, 755, admin=True)
# Mount filesystem
mounted = False
start = time.time()
while mounted is False:
subprocess.run(
[
"sudo",
"mount",
"-t",
"ceph",
":/",
self.mount_point,
"-o",
f"name=admin,secret={admin_key}",
]
)
mounted = os.path.ismount(self.mount_point)
time.sleep(0.1)
if time.time() - start > self.timeout:
raise MountingFilesystemError(
"Mounting Ceph Filesystem timed out and failed. "
"Ensure all the correct packages are "
"installed for operating the ceph filesystem, i,e, ceph-mds."
)
# Change the ownership of the folder to ceph
fileio.recursive_change_ownership(self.mount_point, user, user, admin=True)
[docs] def unmount_filesystem(self) -> None:
"""Unmount the Ceph filesystem.
Examples:
>>> filesystem.unmount_filesystem()
"""
port = "6789"
config = configparser.ConfigParser()
config.read(f"{self.folder}/ceph.conf")
mon_ip = config["global"]["mon host"]
mon_mount = mon_ip + f":{port}:/"
mounts = subprocess.run(
["findmnt", "-t", "ceph", "--json"], stdout=subprocess.PIPE
)
if mounts.stdout.decode("utf-8") != "":
mount_dict = json.loads(mounts.stdout)
if "filesystems" in mount_dict:
for filesystems in mount_dict["filesystems"]:
if filesystems["source"] == mon_mount:
subprocess.run(["sudo", "umount", filesystems["target"]])
fileio.remove_dir(self.mount_point, admin=True)
_filesystem = FILESYSTEM("ceph", CephFilesystem)