Source code for distrax.mgrs

import logging
from importlib import import_module
from typing import NamedTuple, Optional, Union

from . import abstract_mgr

log = logging.getLogger(__name__)
AVAILABLE = ["ceph"]
"""MGRs that are supported and can be used."""


class MGR(NamedTuple):
    """Structure for mgr access."""

    name: str
    MGR: type[abstract_mgr.AbstractMGR]


[docs]def set_mgr(mgr: str) -> None: """Sets the MGR to use. Args: mgr: the mgr to get, i.e. ceph Examples: >>> distrax.mgrs as mgr >>> mgr.set_mgr() """ mgr = mgr.lower() global _current if mgr in AVAILABLE: module_ = import_module(f"distrax.mgrs.{mgr}_mgr") if hasattr(module_, "_mgr"): log.debug("Switching manger to `%s`", module_._mgr.name) _current = module_._mgr else: raise Exception(f"Module `{mgr}` is not configured correctly.") else: raise Exception(f"Manager `{mgr}` is not available! Choose from: {AVAILABLE}")
[docs]def get_mgr(name: str = "") -> Optional[MGR]: """Gets the mgr as specified by the name. Args: name: Name of a supported mgr, i.e. ceph Returns: A name tuple with the name of the storage and the class Examples: >>> distrax.mgrs as mgrs >>> mgr = mgrs.get_mgr("ceph") >>> mgr.name ceph >>> mgr.MGR CephMGR """ if name != "": set_mgr(name) if _current is None: set_mgr(AVAILABLE[0]) return _current
_current: Union[MGR, None] = None