Source code for simpleml.save_patterns.locations.libcloud

"""
Module for cloud save pattern definitions
Uses Apache Libcloud as a universal engine
"""

[docs]__author__ = "Elisha Yadgaran"
from os import makedirs, walk from os.path import dirname, isdir, isfile, join from typing import Any, Dict, List from simpleml.imports import Provider, get_driver from simpleml.registries import FILEPATH_REGISTRY from simpleml.save_patterns.base import BaseSerializer from simpleml.utils.configuration import CONFIG, LIBCLOUD_CONFIG_SECTION
[docs]class LibcloudMethods(object): """ Mixin class to save/load objects via Apache Libcloud Generic api for all cloud providers so naming convention is extremely important to follow in the config. Please reference libcloud documentation for supported input parameters ``` [cloud] section = `name of the config section to use, ex: s3` [s3] param = value --> normal key:value syntax. match these to however they are referenced later, examples: key = abc123 secret = superSecure region = us-east-1 something_specific_to_s3 = s3_parameter --- Below are internally referenced SimpleML params --- driver = S3 --> this must be the Apache Libcloud provider (https://github.com/apache/libcloud/blob/trunk/libcloud/storage/types.py) connection_params = key,secret,region,something_specific_to_s3 --> this determines the key: value params passed to the constructor (it can be different for each provider) path = simpleml/specific/root --> similar to disk based home directory, cloud home directory will start relative to here container = simpleml --> the cloud bucket or container name ``` How this gets used: ``` from libcloud.storage.types import Provider from libcloud.storage.providers import get_driver cloud_section = CONFIG.get(CLOUD_SECTION, 'section') connection_params = CONFIG.getlist(cloud_section, 'connection_params') root_path = CONFIG.get(cloud_section, 'path', fallback='') driver_cls = get_driver(getattr(Provider, CONFIG.get(cloud_section, 'driver'))) driver = driver_cls(**{param: CONFIG.get(cloud_section, param) for param in connection_params}) container = driver.get_container(container_name=CONFIG.get(cloud_section, 'container')) extra = {'content_type': 'application/octet-stream'} obj = driver.upload_object(LOCAL_FILE_PATH, container=container, object_name=root_path + simpleml_folder_path + filename, extra=extra) obj = driver.download_object(CLOUD_OBJECT, destination_path=LOCAL_FILE_PATH, overwrite_existing=True, delete_on_failure=True) ``` """ @staticmethod
[docs] def get_driver_config(config_section: str = None, **kwargs) -> Dict[str, str]: if config_section is None: # use default config_section = LIBCLOUD_CONFIG_SECTION connection_params = CONFIG.getlist(config_section, "connection_params") return {param: CONFIG.get(config_section, param) for param in connection_params}
@classmethod
[docs] def get_driver(cls, provider: str = None, **kwargs) -> Any: if provider is None: provider = CONFIG.get(LIBCLOUD_CONFIG_SECTION, "driver") driver_cls = get_driver(getattr(Provider, provider)) driver = driver_cls(**cls.get_driver_config(**kwargs)) return driver
@staticmethod
[docs] def get_container_name(config_section: str = None, **kwargs) -> str: if config_section is None: # use default config_section = LIBCLOUD_CONFIG_SECTION return CONFIG.get(config_section, "container")
@staticmethod
[docs] def upload( driver, source_filepath: str, destination_filepath: str, container_name: str ) -> None: """ Upload any file from disk to cloud """ container = driver.get_container(container_name=container_name) extra = {"content_type": "application/octet-stream"} driver.upload_object( source_filepath, container=container, object_name=destination_filepath, extra=extra,
) @staticmethod
[docs] def download( driver, source_filepath: str, destination_filepath: str, container_name: str ) -> None: """ Download any file from cloud to disk """ obj = driver.get_object( container_name=container_name, object_name=source_filepath ) driver.download_object( obj, destination_path=destination_filepath, overwrite_existing=True, delete_on_failure=True,
)
[docs]class LibcloudCopyFileLocation(BaseSerializer): @staticmethod
[docs] def serialize( filepath: str, source_directory: str = "system_temp", destination_directory: str = "libcloud_root_path", **kwargs, ) -> Dict[str, str]: source_filepath = join(FILEPATH_REGISTRY.get(source_directory), filepath) if isdir(source_filepath): raise ValueError( "Cannot use file persistence pattern for folders. Use `LibcloudCopyFolderLocation` instead" ) LibcloudMethods.upload( LibcloudMethods.get_driver(**kwargs), source_filepath, join(FILEPATH_REGISTRY.get(destination_directory), filepath), LibcloudMethods.get_container_name(**kwargs), ) return {"filepath": filepath, "source_directory": destination_directory}
@staticmethod
[docs] def deserialize( filepath: str, source_directory: str = "libcloud_root_path", destination_directory: str = "system_temp", **kwargs, ) -> Dict[str, str]: LibcloudMethods.download( LibcloudMethods.get_driver(**kwargs), join(FILEPATH_REGISTRY.get(source_directory), filepath), join(FILEPATH_REGISTRY.get(destination_directory), filepath), LibcloudMethods.get_container_name(**kwargs), ) return {"filepath": filepath, "source_directory": destination_directory}
[docs]class LibcloudCopyFolderLocation(BaseSerializer): """ Libcloud doesnt have a notion of folder objects so iterate through filepaths individually """ @staticmethod
[docs] def serialize( filepath: str, source_directory: str = "system_temp", destination_directory: str = "libcloud_root_path", **kwargs, ) -> Dict[str, str]: source_folder = FILEPATH_REGISTRY.get(source_directory) destination_folder = FILEPATH_REGISTRY.get(destination_directory) driver = LibcloudMethods.get_driver(**kwargs) container = LibcloudMethods.get_container_name(**kwargs) if not isdir(join(source_folder, filepath)): raise ValueError( "Cannot use folder persistence pattern for files. Use `LibcloudCopyFileLocation` instead" ) # walkthrough all subpaths filepaths = [] for (dirpath, dirnames, filenames) in walk(join(source_folder, filepath)): for filename in filenames: # strip out root path to keep relative to directory filename = join(dirpath, filename).split(source_folder)[1] # strip the preceding / if filename[0] == "/": filename = filename[1:] filepaths.append(filename) for file in filepaths: source_filepath = join(source_folder, file) destination_filepath = join(destination_folder, file) LibcloudMethods.upload( driver, source_filepath, destination_filepath, container ) return {"filepaths": filepaths, "source_directory": destination_directory}
@staticmethod
[docs] def common_path(paths: List[str]) -> str: """ Helper utility to return the common parent path for a bunch of filepaths """ split_paths = [i.split("/") for i in paths] common_splits = [] shortest_split = min([len(i) for i in split_paths]) index = 0 while index < shortest_split: parent_paths = [i[index] for i in split_paths] if len(set(parent_paths)) > 1: break common_splits.append(parent_paths[0]) index += 1 return join(*common_splits)
@classmethod
[docs] def deserialize( cls, filepaths: List[str], source_directory: str = "libcloud_root_path", destination_directory: str = "system_temp", **kwargs, ) -> Dict[str, str]: driver = LibcloudMethods.get_driver(**kwargs) container = LibcloudMethods.get_container_name(**kwargs) for file in filepaths: source_filepath = join(FILEPATH_REGISTRY.get(source_directory), file) destination_filepath = join( FILEPATH_REGISTRY.get(destination_directory), file ) # safety check for the destination path makedirs(dirname(destination_filepath), exist_ok=True) LibcloudMethods.download( driver, source_filepath, destination_filepath, container ) folder_filepath = cls.common_path(filepaths) return {"filepath": folder_filepath, "source_directory": destination_directory}
[docs]class LibcloudCopyFilesLocation(BaseSerializer): """ Libcloud transport for many individual files """ @staticmethod
[docs] def serialize( filepaths: List[str], source_directory: str = "system_temp", destination_directory: str = "libcloud_root_path", **kwargs, ) -> Dict[str, str]: source_folder = FILEPATH_REGISTRY.get(source_directory) destination_folder = FILEPATH_REGISTRY.get(destination_directory) driver = LibcloudMethods.get_driver(**kwargs) container = LibcloudMethods.get_container_name(**kwargs) for file in filepaths: source_filepath = join(source_folder, file) destination_filepath = join(destination_folder, file) if isdir(source_filepath): raise ValueError( "Cannot use file persistence pattern for folder. Use `LibcloudCopyFolderLocation` instead" ) LibcloudMethods.upload( driver, source_filepath, destination_filepath, container ) return {"filepaths": filepaths, "source_directory": destination_directory}
@classmethod
[docs] def deserialize( cls, filepaths: List[str], source_directory: str = "libcloud_root_path", destination_directory: str = "system_temp", **kwargs, ) -> Dict[str, str]: driver = LibcloudMethods.get_driver(**kwargs) container = LibcloudMethods.get_container_name(**kwargs) for file in filepaths: source_filepath = join(FILEPATH_REGISTRY.get(source_directory), file) destination_filepath = join( FILEPATH_REGISTRY.get(destination_directory), file ) # safety check for the destination path makedirs(dirname(destination_filepath), exist_ok=True) LibcloudMethods.download( driver, source_filepath, destination_filepath, container ) return {"filepaths": filepaths, "source_directory": destination_directory}