"""Ceph Utility functions.
This holds all utility function for Ceph
To read more about Ceph please see: https://docs.ceph.com/en/latest/
"""
import base64
import configparser
import json
import secrets
import struct
import subprocess
import time
from typing import Dict, List, TypedDict, Union
ETC_CEPH = "/etc/ceph"
VAR_RUN = "/var/run/ceph/ceph-"
VAR_MON = "/var/lib/ceph/mon/ceph-"
VAR_MGR = "/var/lib/ceph/mgr/ceph-"
VAR_BOOTSTRAP_OSD = "/var/lib/ceph/bootstrap-osd"
VAR_OSD = "/var/lib/ceph/osd"
VAR_OSD_ID = "/var/lib/ceph/osd/ceph-"
VAR_RGW: str = "/var/lib/ceph/radosgw/ceph-radosgw."
VAR_MDS = "/var/lib/ceph/mds/ceph-"
MON_KEYRING = "ceph.mon..keyring"
OSD_KEYRING = "ceph.client.bootstrap-osd.keyring"
ADMIN_KEYRING = "ceph.client.admin.keyring"
CONFIG_FILE = "ceph.conf"
AUTH = "cephx"
[docs]def generate_auth_key() -> str:
"""Generate Ceph Auth Key.
This is informed from:
https://github.com/ceph/ceph-deploy/blob/master/ceph_deploy/new.py
however secrets are used instead of os.urandom due to PEP 506
https://peps.python.org/pep-0506/
Returns:
A string of length 40 containing a valid Ceph Key
Examples:
>>> keyring = distrax.utils.ceph.generate_auth_key()
>>> len(keyring)
40
"""
key = secrets.token_bytes(16)
header = struct.pack(
"<hiih",
1, # le16 type: CEPH_CRYPTO_AES
int(time.time()), # le32 created: seconds
0, # le32 created: nanoseconds,
len(key), # le16: len(key)
)
return base64.b64encode(header + key).decode("utf-8")
[docs]def create_keyring(folder: str, name: str, permissions: Dict[str, str]) -> None:
r"""Creates a Ceph Keyring file and writes the keyring with the permissions passed.
Args:
folder: folder to place keyring
name: Name of the keyring
permissions: arguments for the keyring i.e {"caps mon": "allow \*"}
Examples:
>>> import distrax.utils.ceph as ceph
>>> mon_keyring = ceph.create_keyring("ceph", "mon", {"caps mon": "allow *"})
"""
config = configparser.ConfigParser()
filename = f"ceph.{name}.keyring"
config[name] = {
"key": generate_auth_key(),
}
config[name].update(permissions)
with open(f"{folder}/{filename}", "w") as configfile:
config.write(configfile)
[docs]def create_admin_key(folder: str) -> None:
"""Helper function for creating admin_keyring.
Args:
folder: folder to place keyring
Returns:
filename
"""
create_keyring(
folder,
"client.admin",
{
"caps mon": "allow *",
"caps osd": "allow *",
"caps mds": "allow *",
"caps mgr": "allow *",
},
)
[docs]def create_mon_key(folder: str) -> None:
"""Helper function for creating mon_keyring.
Args:
folder: folder to place keyring
"""
create_keyring(folder, "mon.", {"caps mon": "allow *"})
[docs]def create_osd_key(folder: str) -> None:
"""Helper function for creating osd_keyring.
Args:
folder: folder to place keyring
"""
return create_keyring(
folder,
"client.bootstrap-osd",
{"caps mon": "profile bootstrap-osd", "caps mgr": "allow r"},
)
[docs]def osd_status() -> Dict[str, int]:
"""Get the status of the OSDs.
Returns:
Where the `int` is a number
>>> {'epoch': int,
... 'num_osds': int,
... 'num_up_osds': int,
... 'osd_up_since': int,
... 'num_in_osds': int,
... 'osd_in_since': int,
... 'num_remapped_pgs': int}
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.osd_status()
{'epoch': 1, 'num_osds': 5, 'num_up_osds': 5, 'osd_up_since': 1,
'num_in_osds': 5, 'osd_in_since': 1, 'num_remapped_pgs': 0}
"""
status = subprocess.run(
["ceph", "osd", "stat", "--format=json"], stdout=subprocess.PIPE
)
return dict(json.loads(status.stdout))
[docs]def lspools(timeout: str = "5") -> List[Dict[str, str]]:
"""Get the current pools.
Args:
timeout: The length of time to try and connect to the cluster.
Returns:
A list of dictionaries containing the keys 'poolnum' and 'poolname'
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.lspools()
[{'poolnum': 1, 'poolname': '.mgr',
'poolnum': 2, 'poolname': 'test'}]
"""
pools = subprocess.run(
["ceph", "osd", "lspools", "--format", "json", "--connect-timeout", timeout],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if pools.returncode != 0:
return []
return list(json.loads(pools.stdout))
[docs]class PoolStatus(TypedDict):
"""PoolStatus Type."""
pgs_by_state: List[Dict[str, Union[int, str]]]
num_pgs: int
num_pools: int
num_objects: int
data_bytes: int
bytes_used: int
bytes_avail: int
bytes_total: int
[docs]def pool_status(
timeout: str = "5",
) -> PoolStatus:
"""Get the status of the Pools.
Args:
timeout: The length of time to try and connect to the cluster.
`ceph -s` is used instead of `ceph pg stat` as `ceph pg stat` is faster to record
the status, and such does not mirror `ceph -s`. Therefore, as `ceph -s` is the
default way to check the status of the ceph cluster it is used to
maintain cohesion with system outputs.
Returns:
Where 'int' is a number and 'str' is a string
>>> {'pgs_by_state': [{'state_name':str,'count': int}],
... 'num_pgs': int,
... 'num_pools': int,
... 'num_objects': int,
... 'data_bytes': int,
... 'bytes_used': int,
... 'bytes_avail': int,
... 'bytes_total': int
...}
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.pool_status()
{'pgs_by_state': [{'state_name': 'active+clean', 'count': 65}],
'num_pgs': 65,
'num_pools': 3,
'num_objects': 2,
'data_bytes': 590368,
'bytes_used': 13357056,
'bytes_avail': 1056190464,
'bytes_total': 1069547520}
"""
status = subprocess.run(
["ceph", "--status", "--format", "json", "--connect-timeout", timeout],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
pgmap: PoolStatus
if status.stdout.decode("utf-8") == "":
pgmap = PoolStatus(
pgs_by_state=[{"state_name": "error", "count": -1}],
num_pgs=-1,
num_pools=-1,
num_objects=-1,
data_bytes=-1,
bytes_used=-1,
bytes_avail=-1,
bytes_total=-1,
)
else:
state = dict(json.loads(status.stdout))
pgmap = PoolStatus(
pgs_by_state=state["pgmap"]["pgs_by_state"],
num_pgs=state["pgmap"]["num_pgs"],
num_pools=state["pgmap"]["num_pools"],
num_objects=state["pgmap"]["num_objects"],
data_bytes=state["pgmap"]["data_bytes"],
bytes_used=state["pgmap"]["bytes_used"],
bytes_avail=state["pgmap"]["bytes_avail"],
bytes_total=state["pgmap"]["bytes_total"],
)
return pgmap
[docs]def get_current_pg() -> int:
"""Get the current pg count of the cluster.
Returns:
The current PGs in the cluster
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.get_current_pg()
1
"""
current_cluster_pgs: int
if len(lspools()) == 0 or pool_status()["num_pgs"] == 0:
# Account for the '.mgr' pool that will be created
current_cluster_pgs = 1
else:
current_cluster_pgs = pool_status()["num_pgs"]
return current_cluster_pgs
[docs]def rgw_status(
timeout: str = "5",
) -> bool:
"""Get the status of the RGW.
Args:
timeout: The length of time to try and connect to the cluster.
Returns:
True when it is up and running False otherwise
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.rgw_status()
"""
status = subprocess.run(
["ceph", "--status", "--format", "json", "--connect-timeout", timeout],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
state = dict(json.loads(status.stdout))
servicemap = state["servicemap"]
if "rgw" in servicemap["services"].keys():
return True
return False
[docs]def mds_status(
timeout: str = "5",
) -> bool:
"""Get the status of the MDS.
Args:
timeout: The length of time to try and connect to the cluster.
Returns:
True when it is up and running False otherwise
Examples:
>>> import distrax.utils.ceph as ceph
>>> ceph.mds_status()
True
"""
status = subprocess.run(
["ceph", "--status", "--format", "json", "--connect-timeout", timeout],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
state = dict(json.loads(status.stdout))
fsmap = state["fsmap"]
if fsmap["up"] == 1 and fsmap["in"] == 1:
for fs in fsmap["by_rank"]:
if fs["status"] == "up:active":
return True
return False