Source code for s3_tools.objects.upload

"""Upload files to S3 bucket."""
from concurrent import futures
from pathlib import Path
from typing import (
    Any,
    Dict,
    List,
    Tuple,
    Union,
)

import boto3

from s3_tools.utils import (
    _create_progress_bar,
    _get_future_output,
)


[docs]def upload_file_to_key( bucket: str, key: Union[str, Path], local_filename: Union[str, Path], progress=None, # type: ignore # No import if extra not installed task_id: int = -1, aws_auth: Dict[str, str] = {}, extra_args: Dict[str, str] = {}, ) -> str: """Upload one file from local disk and store into AWS S3 bucket. Parameters ---------- bucket: str AWS S3 bucket where the object will be stored. key: Union[str, Path] Key where the object will be stored. local_filename: Union[str, Path] Local file from where the data will be uploaded. progress: rich.Progress Instance of a rich Progress bar, by default None. task_id: int Task ID on the progress bar to be updated, by default -1. aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. extra_args: Dict[str, str] Extra arguments to be passed to the boto3 upload_file method, by default is empty. Allowed upload arguments: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS Returns ------- str The S3 full URL to the file. Examples -------- >>> upload_file_to_key( ... bucket="myBucket", ... key="myFiles/music.mp3", ... local_filename="files/music.mp3", ... ) http://s3.amazonaws.com/myBucket/myFiles/music.mp3 """ session = boto3.session.Session(**aws_auth) s3 = session.client("s3") s3.upload_file( Bucket=bucket, Key=Path(key).as_posix(), Filename=Path(local_filename).as_posix(), ExtraArgs=extra_args, ) if progress: progress.update(task_id, advance=1) return "{}/{}/{}".format(s3.meta.endpoint_url, bucket, key)
[docs]def upload_files_to_keys( bucket: str, paths_keys: List[Tuple[Union[str, Path], Union[str, Path]]], threads: int = 5, show_progress: bool = False, aws_auth: Dict[str, str] = {}, as_paths: bool = False, default_extra_args: Dict[str, str] = {}, extra_args_per_key: List[Dict[str, str]] = [], ) -> List[Tuple[Union[str, Path], Union[str, Path], Any]]: """Upload list of files to specific objects. Parameters ---------- bucket : str AWS S3 bucket where the objects will be stored. paths_keys : List[Tuple[Union[str, Path], Union[str, Path]]] List with a tuple of local path to be uploaded and S3 key destination. e.g. [("Local_Path", "S3_Key"), ("Local_Path", "S3_Key")] threads : int, optional Number of parallel uploads, by default 5. show_progress: bool Show progress bar on console, by default False. (Need to install extra [progress] to be used) aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. as_paths: bool If True, the keys are returned as Path objects, otherwise as strings, by default is False. default_extra_args: Dict[str, str] Extra arguments to be passed to the boto3 upload_file method, by default is empty. The extra arguments will be applied to all S3 keys. extra_args_per_key: List[Dict[str, str]] Extra arguments to be passed for each S3 key to the boto3 upload_file method, by default is empty. The default extra arguments will be merged with the extra arguments passed for each key. Returns ------- List[Tuple[Union[str, Path], Union[str, Path], Any]] A list with tuples formed by the "Local_Path", "S3_Key", and the result of the upload. If successful will have True, if not will contain the error message. Attention, the output list may not follow the same input order. Raises ------ ValueError extra_args_per_key when used must have the same length of paths_keys. Examples -------- >>> upload_files_to_keys( ... bucket="myBucket", ... paths_keys=[ ... ("MyFiles/myFile.data", "myData/myFile.data"), ... ("MyFiles/myMusic/awesome.mp3", "myData/myMusic/awesome.mp3"), ... ("MyFiles/myDocs/paper.doc", "myData/myDocs/paper.doc"), ... ], ... ) [ ("MyFiles/myMusic/awesome.mp3", "myData/myMusic/awesome.mp3", True), ("MyFiles/myDocs/paper.doc", "myData/myDocs/paper.doc", True), ("MyFiles/myFile.data", "myData/myFile.data", True), ] """ if len(extra_args_per_key) != 0 and len(extra_args_per_key) != len(paths_keys): raise ValueError("The length of extra_args_per_key must be the same as paths_keys.") extra_arguments = [{}] * len(paths_keys) if len(extra_args_per_key) == 0 else extra_args_per_key if show_progress: progress, task_id = _create_progress_bar("Uploading", len(paths_keys)) progress.start() progress.start_task(task_id) else: progress, task_id = None, -1 with futures.ThreadPoolExecutor(max_workers=threads) as executor: # Create a dictionary to map the future execution with the (S3 key, Local filename) # dict = {future: values} executions = { executor.submit( upload_file_to_key, bucket, s3_key, filename, progress, task_id, aws_auth, {**default_extra_args, **extra_args}, ): {"s3": s3_key, "fn": filename} for (filename, s3_key), extra_args in zip(paths_keys, extra_arguments) } output = [ (executions[future]["fn"], executions[future]["s3"], _get_future_output(future)) for future in futures.as_completed(executions) ] if show_progress: progress.stop() if as_paths: output = [(Path(key), Path(fn), result) for key, fn, result in output] return output
[docs]def upload_folder_to_prefix( bucket: str, prefix: Union[str, Path], folder: Union[str, Path], search_str: str = "*", threads: int = 5, show_progress: bool = False, aws_auth: Dict[str, str] = {}, as_paths: bool = False, default_extra_args: Dict[str, str] = {}, ) -> List[Tuple[Union[str, Path], Union[str, Path], Any]]: """Upload local folder to a S3 prefix. Function to upload all files for a given folder (recursive) and store them into a S3 bucket under a prefix. The local folder structure will be replicated into S3. Parameters ---------- bucket : str AWS S3 bucket where the object will be stored. prefix : Union[str, Path] Prefix where the objects will be under. folder : Union[str, Path] Local folder path where files are stored. Prefer to use the full path for the folder. search_str : str. A match string to select all the files to upload, by default "*". The string follows the rglob function pattern from the pathlib package. threads : int, optional Number of parallel uploads, by default 5 show_progress: bool Show progress bar on console, by default False. (Need to install extra [progress] to be used) aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. as_paths: bool If True, the keys are returned as Path objects, otherwise as strings, by default is False. default_extra_args: Dict[str, str] Extra arguments to be passed to the boto3 upload_file method, by default is empty. The extra arguments will be applied to all S3 keys. Returns ------- List[Tuple[Union[str, Path], Union[str, Path], Any]] A list with tuples formed by the "Local_Path", "S3_Key", and the result of the upload. If successful will have True, if not will contain the error message. Examples -------- >>> upload_folder_to_prefix( ... bucket="myBucket", ... prefix="myFiles", ... folder="/usr/files", ... ) [ ("/usr/files/music.mp3", "myFiles/music.mp3", True), ("/usr/files/awesome.wav", "myFiles/awesome.wav", True), ("/usr/files/data/metadata.json", "myFiles/data/metadata.json", True), ] """ paths = [p for p in Path(folder).rglob(search_str) if p.is_file()] paths_keys: List[Tuple[Union[str, Path], Union[str, Path]]] = [ ( p.as_posix(), Path(prefix).joinpath(p.relative_to(Path(folder))).as_posix(), # S3 key ) for p in paths ] return upload_files_to_keys(bucket, paths_keys, threads, show_progress, aws_auth, as_paths, default_extra_args)