importloggingfromimportlibimportimport_modulefromtypingimportNamedTuple,Optional,Unionfrom.importabstract_mgrlog=logging.getLogger(__name__)AVAILABLE=["ceph"]"""MGRs that are supported and can be used."""classMGR(NamedTuple):"""Structure for mgr access."""name:strMGR:type[abstract_mgr.AbstractMGR]
[docs]defset_mgr(mgr:str)->None:"""Sets the MGR to use. Args: mgr: the mgr to get, i.e. ceph Examples: >>> distrax.mgrs as mgr >>> mgr.set_mgr() """mgr=mgr.lower()global_currentifmgrinAVAILABLE:module_=import_module(f"distrax.mgrs.{mgr}_mgr")ifhasattr(module_,"_mgr"):log.debug("Switching manger to `%s`",module_._mgr.name)_current=module_._mgrelse:raiseException(f"Module `{mgr}` is not configured correctly.")else:raiseException(f"Manager `{mgr}` is not available! Choose from: {AVAILABLE}")
[docs]defget_mgr(name:str="")->Optional[MGR]:"""Gets the mgr as specified by the name. Args: name: Name of a supported mgr, i.e. ceph Returns: A name tuple with the name of the storage and the class Examples: >>> distrax.mgrs as mgrs >>> mgr = mgrs.get_mgr("ceph") >>> mgr.name ceph >>> mgr.MGR CephMGR """ifname!="":set_mgr(name)if_currentisNone:set_mgr(AVAILABLE[0])return_current