Source code for distrax.mgrs.ceph_mgr
import subprocess
import distrax.utils.ceph as ceph
import distrax.utils.fileio as fileio
import distrax.utils.network as network
import distrax.utils.system as system
from distrax.exceptions.exceptions import DaemonNotStartedError
from distrax.mgrs import MGR
[docs]class CephMGR:
"""Ceph Manager Class.
This class contains all the methods required to create and remove a Ceph Manager
To read more about the Ceph Manager please see:
https://docs.ceph.com/en/latest/glossary/#term-Ceph-Manager
Examples:
>>> mgr = CephMGR()
>>> mgr = CephMGR(folder="distrax")
"""
def __init__(self, folder: str = "ceph"):
"""Initialise the CephMGR object.
Args:
folder: the location to store the keys of the ceph system
Examples:
>>> mgr = CephMGR()
>>> mgr = CephMGR(folder="distrax")
"""
self.hostname = network.hostname()
self.folder = folder
[docs] def create_mgr(self) -> None:
"""Create the Ceph Manager Daemon.
This Daemon operates with the monitor to provide additional
monitoring and interfacing to external tools.
Examples:
>>> mgr.create_mgr()
"""
# Create key
mgr_keyring = self._add_mgr()
# Create MGR directory
fileio.create_dir(f"{ceph.VAR_MGR}{self.hostname}", 755, admin=True)
# Copy the key to the folder
fileio.copy_file(
f"{self.folder}/{mgr_keyring}",
f"{ceph.VAR_MGR}{self.hostname}/keyring",
admin=True,
)
# Change the ownership of the folder to ceph
fileio.recursive_change_ownership(
f"{ceph.VAR_MGR}{self.hostname}", "ceph", "ceph", admin=True
)
# Start the Daemon
system.start_service(f"ceph-mgr@{self.hostname}")
status = system.is_systemd_service_active(f"ceph-mgr@{self.hostname}")
if status is False:
message = "Ceph Manager Failed to Start, please investigate"
raise DaemonNotStartedError(message)
def _add_mgr(self) -> str:
"""Adds the manager keys to the ceph system.
The settings allow the mgr to access
the Object Storage Devices (osd) and Metadata Sever (MDS) and Monitor.
Returns: the name of the keyring ceph.mgr.keyring
Examples:
>>>mgr._add_mgr()
ceph.mgr.keyring
"""
subprocess.run(
[
"ceph",
"auth",
"get-or-create",
f"mgr.{self.hostname}",
"mon",
"allow profile mgr",
"osd",
"allow *",
"mds",
"allow *",
"-o" f"{self.folder}/ceph.mgr.keyring",
]
)
return "ceph.mgr.keyring"
[docs] def remove_mgr(self) -> None:
"""Remove the Ceph Manager Daemon.
Examples:
>>> mgr.remove_mgr()
"""
system.stop_service("ceph-mgr.target")
system.disable_service("ceph-mgr.target")
system.stop_service("system-ceph\\x2dmgr.slice")
fileio.remove_dir(f"{ceph.VAR_MGR}{self.hostname}", admin=True)
fileio.remove_file(f"{ceph.VAR_RUN}mgr.{self.hostname}.asok", admin=True)
_mgr = MGR(name="ceph", MGR=CephMGR)