"""COS utitities.""" import logging import os import tempfile from io import BufferedReader from typing import List, Optional, Tuple from urllib.parse import urlparse import boto3 from boto3_type_annotations.s3 import Bucket from botocore.client import Config logger = logging.getLogger(__name__) def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]: parsed_uri = urlparse(s3_uri) # parse bucket and path, where path can be empty list _, bucket_name, *split_key = parsed_uri.path.split("/") # parsing credentials and host credentials, host = parsed_uri.netloc.split("@") # getting keys access, secret = credentials.split(":") # establish connection connection = boto3.resource( "s3", endpoint_url="http://{}".format(host), aws_access_key_id=access, aws_secret_access_key=secret, config=Config(signature_version="s3v4"), region_name="us-east-1", ) return connection.Bucket(bucket_name), split_key def ensure_filepath_from_uri(file_uri: str) -> str: """ Get a file on the local storage. In case the file_uri provided is a S3 URI, dowloads the file and return the local path. Args: file_uri (str): a uri, either filesystem or S3. Returns: str: the path to the file on the local filesystem. """ if file_uri.startswith("s3://"): try: bucket, split_key = connect_bucket(file_uri) path = os.path.join(*split_key) # create a file handle for storing the file locally a_file = tempfile.NamedTemporaryFile(delete=False) # make sure we close the file a_file.close() # download the file bucket.download_file(path, a_file.name) return a_file.name except Exception: message = "Getting file from COS failed " "for the provided URI: {}".format( file_uri ) logger.exception(message) raise RuntimeError(message) else: logger.debug(f"Searching for {file_uri}") if os.path.exists(file_uri): return file_uri else: message = "File not found on local filesystem." logger.error(message) raise RuntimeError(message) # COS configuration COS_BUCKET_URI = os.environ.get( "COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts") ) COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write") # results prefix RESULTS_PREFIX = "results" def download_from_key(key: str, file_path: Optional[str] = None) -> None: """Download a single file from COS. If no file_path is given, object name is taken as relative local path. Args: key (str): S3 key. file_path (str, optional): Path of downloaded file. Defaults to None. """ file_path = key if file_path is None else file_path os.makedirs(os.path.dirname(file_path), exist_ok=True) BUCKET.download_file(key, file_path) def upload_to_key(file_path: str, key: str) -> None: """Upload local file to COS. Args: file_path (str): Local filepath. key (str): S3 key. """ BUCKET.upload_file(file_path, key) def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None: """Upload readable, binary file from handle to COS. Args: readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode. key (str): S3 key. """ BUCKET.upload_fileobj(readable_binary, key) def delete_from_key(key_or_prefix: str) -> None: """Delete all files matching given prefix from COS. Args: key_or_prefix (str): S3 uri including object name prefix. """ BUCKET.objects.filter(Prefix=key_or_prefix).delete() def string_to_key(string: str, key: str) -> None: """Upload string as object to COS. Args: string (str): object to be stored. key (str): S3 key. """ BUCKET.put_object(Key=key, Body=string.encode()) def bytes_to_key(some_bytes: bytes, key: str) -> None: """Upload bytes as object to COS. Args: some_bytes (bytes): object to be stored. key (str): S3 key. """ BUCKET.put_object(Key=key, Body=some_bytes) def string_from_key(key: str) -> str: """Get object from COS as string. Args: key (str): S3 key. Returns: str: object. """ return BUCKET.Object(key).get()["Body"].read().decode("utf-8") def bytes_from_key(key: str) -> bytes: """Get object from COS as bytes. Args: key (str): S3 key. Returns: bytes: object. """ return BUCKET.Object(key).get()["Body"].read()