Source code for distrax.filesystems

import logging
from importlib import import_module
from typing import List, NamedTuple, Optional, Union

from . import abstract_filesystem

log = logging.getLogger(__name__)

# Filesystem that can be used
AVAILABLE: List[str] = ["ceph"]
"""Filesystems that are supported and can be used."""


class FILESYSTEM(NamedTuple):
    """Structure for filesystem access."""

    name: str
    FILESYSTEM: type[abstract_filesystem.AbstractFilesystem]


[docs]def set_filesystem(filesystem: str) -> None: """Sets the Filesystem to use. Args: filesystem: the filesystem to get, i.e. ceph Examples: >>> distrax.filesystems as filesystem >>> filesystem.set_filesystem() """ filesystem = filesystem.lower() global _current if filesystem in AVAILABLE: module_ = import_module(f"distrax.filesystems.{filesystem}_filesystem") if hasattr(module_, "_filesystem"): log.debug("Switching filesystem to `%s`", module_._filesystem.name) _current = module_._filesystem else: raise Exception(f"Module `{filesystem}` is not configured correctly.") else: raise Exception( f"Filesystem `{filesystem}` is not available! Choose from: {AVAILABLE}" )
[docs]def get_filesystem(name: str = "") -> Optional[FILESYSTEM]: """Gets the filesystem as specified by the name. Args: name: Name of a supported filesystem, i.e. ceph Returns: A name tuple with the name of the storage and the class Examples: >>> distrax.filesystems as filesystems >>> filesystem = filesystems.get_filesystem("ceph") >>> filesystem.name ceph >>> filesystem.FILESYSTEM CephFilesystem """ if name != "": set_filesystem(name) if _current is None: set_filesystem(AVAILABLE[0]) return _current
_current: Union[FILESYSTEM, None] = None