Source code for distrax.pools.ceph_pool

import logging
import subprocess
import time
from math import floor, log
from typing import Union

import distrax.utils.ceph as ceph
from distrax.pools import POOL

logger = logging.getLogger(__name__)


[docs]class CephPool: """CephPool class this allows for the creation and removal of the Ceph Pools. To read more about the Ceph Pools please see: https://docs.ceph.com/en/latest/rados/operations/pools/ Examples: >>> pool = CephPool() """
[docs] @staticmethod def create_pool(name: str = "distrax", percentage: float = 1.0) -> None: """Create the Pool to store objects. Args: name: The name of the pool percentage: The percentage of the cluster to allocate to the pool. Examples: >>> pool.create_pool(name="distrax", percentage=1.0) pool 'distrax' created """ logger.info(f"Creating Pool {name}") current_cluster_pgs = ceph.get_current_pg() TARGET_PGS = 100 # Calculate pgs based off https://old.ceph.com/pgcalc/ pool_pgs = int( 2 ** floor( log((TARGET_PGS * ceph.osd_status()["num_osds"] * percentage) / log(2)) + 0.5 ) ) subprocess.run( ["ceph", "osd", "pool", "create", name, str(pool_pgs), str(pool_pgs)] ) new_cluster_pg = current_cluster_pgs + pool_pgs clean_pgs: Union[int, str] = -1 while new_cluster_pg != clean_pgs: time.sleep(0.1) status = ceph.pool_status()["pgs_by_state"] for stat in status: if stat["state_name"] == "active+clean": clean_pgs = stat["count"] break
[docs] @staticmethod def remove_pools() -> None: """Remove and purge pools. Examples: >>> ceph.remove_pools() Warning: using slow linear search Removed 2 objects successfully purged pool .mgr pool '.mgr' removed """ pools = ceph.lspools() for pool in pools: subprocess.run( [ "rados", "purge", pool["poolname"], "--yes-i-really-really-mean-it", "--connect-timeout", "5", ] ) subprocess.run( [ "ceph", "osd", "pool", "delete", pool["poolname"], pool["poolname"], "--yes-i-really-really-mean-it", "--connect-timeout", "5", ] )
_pool = POOL("ceph", CephPool)