paccmann / cos.py
jannisborn's picture
update
ec53722 unverified
"""COS utitities."""
import logging
import os
import tempfile
from io import BufferedReader
from typing import List, Optional, Tuple
from urllib.parse import urlparse
import boto3
from boto3_type_annotations.s3 import Bucket
from botocore.client import Config
logger = logging.getLogger(__name__)
def connect_bucket(s3_uri: str) -> Tuple[Bucket, List[str]]:
parsed_uri = urlparse(s3_uri)
# parse bucket and path, where path can be empty list
_, bucket_name, *split_key = parsed_uri.path.split("/")
# parsing credentials and host
credentials, host = parsed_uri.netloc.split("@")
# getting keys
access, secret = credentials.split(":")
# establish connection
connection = boto3.resource(
"s3",
endpoint_url="http://{}".format(host),
aws_access_key_id=access,
aws_secret_access_key=secret,
config=Config(signature_version="s3v4"),
region_name="us-east-1",
)
return connection.Bucket(bucket_name), split_key
def ensure_filepath_from_uri(file_uri: str) -> str:
"""
Get a file on the local storage.
In case the file_uri provided is a S3 URI, dowloads the
file and return the local path.
Args:
file_uri (str): a uri, either filesystem or S3.
Returns:
str: the path to the file on the local filesystem.
"""
if file_uri.startswith("s3://"):
try:
bucket, split_key = connect_bucket(file_uri)
path = os.path.join(*split_key)
# create a file handle for storing the file locally
a_file = tempfile.NamedTemporaryFile(delete=False)
# make sure we close the file
a_file.close()
# download the file
bucket.download_file(path, a_file.name)
return a_file.name
except Exception:
message = "Getting file from COS failed " "for the provided URI: {}".format(
file_uri
)
logger.exception(message)
raise RuntimeError(message)
else:
logger.debug(f"Searching for {file_uri}")
if os.path.exists(file_uri):
return file_uri
else:
message = "File not found on local filesystem."
logger.error(message)
raise RuntimeError(message)
# COS configuration
COS_BUCKET_URI = os.environ.get(
"COS_BUCKET_URI", os.path.join(os.getcwd(), "artifacts")
)
COS_UPLOAD_POLICY = os.environ.get("COS_UPLOAD_POLICY", "public-read-write")
# results prefix
RESULTS_PREFIX = "results"
def download_from_key(key: str, file_path: Optional[str] = None) -> None:
"""Download a single file from COS.
If no file_path is given, object name is taken as relative local path.
Args:
key (str): S3 key.
file_path (str, optional): Path of downloaded file. Defaults to None.
"""
file_path = key if file_path is None else file_path
os.makedirs(os.path.dirname(file_path), exist_ok=True)
BUCKET.download_file(key, file_path)
def upload_to_key(file_path: str, key: str) -> None:
"""Upload local file to COS.
Args:
file_path (str): Local filepath.
key (str): S3 key.
"""
BUCKET.upload_file(file_path, key)
def fileobject_to_key(readable_binary: BufferedReader, key: str) -> None:
"""Upload readable, binary file from handle to COS.
Args:
readable_binary (BufferedReader): filehandle, e.g. opened in 'rb' mode.
key (str): S3 key.
"""
BUCKET.upload_fileobj(readable_binary, key)
def delete_from_key(key_or_prefix: str) -> None:
"""Delete all files matching given prefix from COS.
Args:
key_or_prefix (str): S3 uri including object name prefix.
"""
BUCKET.objects.filter(Prefix=key_or_prefix).delete()
def string_to_key(string: str, key: str) -> None:
"""Upload string as object to COS.
Args:
string (str): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=string.encode())
def bytes_to_key(some_bytes: bytes, key: str) -> None:
"""Upload bytes as object to COS.
Args:
some_bytes (bytes): object to be stored.
key (str): S3 key.
"""
BUCKET.put_object(Key=key, Body=some_bytes)
def string_from_key(key: str) -> str:
"""Get object from COS as string.
Args:
key (str): S3 key.
Returns:
str: object.
"""
return BUCKET.Object(key).get()["Body"].read().decode("utf-8")
def bytes_from_key(key: str) -> bytes:
"""Get object from COS as bytes.
Args:
key (str): S3 key.
Returns:
bytes: object.
"""
return BUCKET.Object(key).get()["Body"].read()