"""Download S3 objects to files."""
from concurrent import futures
from pathlib import Path
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
import boto3
from s3_tools.objects.list import list_objects
from s3_tools.utils import (
_create_progress_bar,
_get_future_output,
)
[docs]def download_key_to_file(
bucket: str,
key: Union[str, Path],
local_filename: Union[str, Path],
progress=None, # type: ignore # No import if extra not installed
task_id: int = -1,
aws_auth: Dict[str, str] = {},
extra_args: Dict[str, str] = {},
) -> bool:
"""Retrieve one object from AWS S3 bucket and store into local disk.
Parameters
----------
bucket: str
AWS S3 bucket where the object is stored.
key: Union[str, Path]
Key where the object is stored.
local_filename: Union[str, Path]
Local file where the data will be downloaded to.
progress: rich.Progress
Instance of a rich Progress bar, by default None.
task_id: int
Task ID on the progress bar to be updated, by default -1.
aws_auth: Dict[str, str]
Contains AWS credentials, by default is empty.
extra_args: Dict[str, str]
Extra arguments to be passed to the boto3 download_file method, by default is empty.
Allowed download arguments:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS
Returns
-------
bool
True if the local file exists.
Examples
--------
>>> download_key_to_file(
... bucket="myBucket",
... key="myData/myFile.data",
... local_filename="theFile.data",
... )
True
"""
session = boto3.session.Session(**aws_auth)
s3 = session.client("s3")
Path(local_filename).parent.mkdir(parents=True, exist_ok=True)
s3.download_file(
Bucket=bucket,
Key=Path(key).as_posix(),
Filename=Path(local_filename).as_posix(),
ExtraArgs=extra_args,
)
if progress:
progress.update(task_id, advance=1)
return Path(local_filename).exists()
[docs]def download_keys_to_files(
bucket: str,
keys_paths: List[Tuple[Union[str, Path], Union[str, Path]]],
threads: int = 5,
show_progress: bool = False,
aws_auth: Dict[str, str] = {},
as_paths: bool = False,
default_extra_args: Dict[str, str] = {},
extra_args_per_key: List[Dict[str, str]] = [],
) -> List[Tuple[Union[str, Path], Union[str, Path], Any]]:
"""Download list of objects to specific paths.
Parameters
----------
bucket: str
AWS S3 bucket where the objects are stored.
keys_paths: List[Tuple[Union[str, Path], Union[str, Path]]]
List with a tuple of S3 key to be downloaded and local path to be stored.
e.g. [
("S3_Key", "Local_Path"),
(Path("S3_Key"), "Local_Path"),
("S3_Key", Path("Local_Path")),
(Path("S3_Key"), Path("Local_Path")),
]
threads: int
Number of parallel downloads, by default 5.
show_progress: bool
Show progress bar on console, by default False.
(Need to install extra [progress] to be used)
aws_auth: Dict[str, str]
Contains AWS credentials, by default is empty.
as_paths: bool
If True, the keys are returned as Path objects, otherwise as strings, by default is False.
default_extra_args: Dict[str, str]
Extra arguments to be passed to the boto3 download_file method, by default is empty.
The extra arguments will be applied to all S3 keys.
extra_args_per_key: List[Dict[str, str]]
Extra arguments to be passed for each S3 key to the boto3 download_file method, by default is empty.
The default extra arguments will be merged with the extra arguments passed for each key.
Returns
-------
List[Tuple]
A list with tuples formed by the "S3_Key", "Local_Path", and the result of the download.
If successful will have True, if not will contain the error message.
Attention, the output list may not follow the same input order.
Examples
--------
>>> download_keys_to_files(
... bucket="myBucket",
... keys_paths=[
... ("myData/myFile.data", "MyFiles/myFile.data"),
... ("myData/myMusic/awesome.mp3", "MyFiles/myMusic/awesome.mp3"),
... ("myData/myDocs/paper.doc", "MyFiles/myDocs/paper.doc"),
... ]
... )
[
("myData/myMusic/awesome.mp3", "MyFiles/myMusic/awesome.mp3", True),
("myData/myDocs/paper.doc", "MyFiles/myDocs/paper.doc", True),
("myData/myFile.data", "MyFiles/myFile.data", True),
]
"""
if len(extra_args_per_key) != 0 and len(extra_args_per_key) != len(keys_paths):
raise ValueError("The length of extra_args_per_key must be the same as keys_paths.")
extra_arguments = [{}] * len(keys_paths) if len(extra_args_per_key) == 0 else extra_args_per_key
if show_progress:
progress, task_id = _create_progress_bar("Downloading", len(keys_paths))
progress.start()
progress.start_task(task_id)
else:
progress, task_id = None, -1
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
# Create a dictionary to map the future execution with the (S3 key, Local filename)
# dict = {future: values}
executions = {
executor.submit(
download_key_to_file,
bucket,
s3_key,
filename,
progress,
task_id,
aws_auth,
{**default_extra_args, **extra_args},
): {"s3": s3_key, "fn": filename}
for (s3_key, filename), extra_args in zip(keys_paths, extra_arguments)
}
output = [
(executions[future]["s3"], executions[future]["fn"], _get_future_output(future))
for future in futures.as_completed(executions)
]
if show_progress:
progress.stop()
if as_paths:
output = [(Path(key), Path(fn), result) for key, fn, result in output]
else:
output = [(Path(key).as_posix(), Path(fn).as_posix(), result) for key, fn, result in output]
return output
[docs]def download_prefix_to_folder(
bucket: str,
prefix: Union[str, Path],
folder: Union[str, Path],
search_str: Optional[str] = None,
remove_prefix: bool = True,
threads: int = 5,
show_progress: bool = False,
aws_auth: Dict[str, str] = {},
as_paths: bool = False,
default_extra_args: Dict[str, str] = {},
) -> List[Tuple[Union[str, Path], Union[str, Path], Any]]:
"""Download objects to local folder.
Function to retrieve all files under a prefix on S3 and store them into local folder.
Parameters
----------
bucket: str
AWS S3 bucket where the objects are stored.
prefix: Union[str, Path]
Prefix where the objects are under.
folder: Union[str, Path]
Local folder path where files will be stored.
search_str: str
Basic search string to filter out keys on result (uses Unix shell-style wildcards), by default is None.
For more about the search check "fnmatch" package.
remove_prefix: bool
If True will remove the the prefix when writing to local folder.
The remaining "folders" on the key will be created on the local folder.
threads: int
Number of parallel downloads, by default 5.
show_progress: bool
Show progress bar on console, by default False.
(Need to install extra [progress] to be used)
aws_auth: Dict[str, str]
Contains AWS credentials, by default is empty.
as_paths: bool
If True, the keys are returned as Path objects, otherwise as strings, by default is False.
default_extra_args: Dict[str, str]
Extra arguments to be passed to the boto3 download_file method, by default is empty.
The extra arguments will be applied to all S3 keys.
Returns
-------
List[Tuple]
A list with tuples formed by the "S3_Key", "Local_Path", and the result of the download.
If successful will have True, if not will contain the error message.
Examples
--------
>>> download_prefix_to_folder(
... bucket="myBucket",
... prefix="myData",
... folder="myFiles",
... )
[
("myData/myFile.data", "MyFiles/myFile.data", True),
("myData/myMusic/awesome.mp3", "MyFiles/myMusic/awesome.mp3", True),
("myData/myDocs/paper.doc", "MyFiles/myDocs/paper.doc", True),
]
"""
s3_keys = list_objects(
bucket=bucket,
prefix=prefix,
search_str=search_str,
aws_auth=aws_auth,
as_paths=as_paths,
)
keys_paths: List[Tuple[Union[str, Path], Union[str, Path]]] = [(
key,
"{}/{}".format(
Path(folder).as_posix(),
Path(key).as_posix().replace(Path(prefix).as_posix(), "")[1:] if remove_prefix else key
)
) for key in s3_keys]
return download_keys_to_files(bucket, keys_paths, threads, show_progress, aws_auth, as_paths, default_extra_args)