Source code for s3_tools.objects.copy

"""Copy S3 objects."""
from concurrent import futures
from pathlib import Path
from typing import (
    Dict,
    List,
    Optional,
    Tuple,
    Union,
)

import boto3

from s3_tools.objects.list import list_objects


[docs]def copy_object( source_bucket: str, source_key: Union[str, Path], destination_bucket: str, destination_key: Union[str, Path], aws_auth: Dict[str, str] = {} ) -> None: """Copy S3 object from source bucket and key to destination. Parameters ---------- source_bucket : str S3 bucket where the object is stored. source_key : Union[str, Path] S3 key where the object is referenced. destination_bucket : str S3 destination bucket. destination_key : Union[str, Path] S3 destination key. aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. Examples -------- >>> copy_object( ... source_bucket='bucket', ... source_key='myFiles/song.mp3', ... destination_bucket='bucket', ... destination_key='myMusic/song.mp3', ... ) """ session = boto3.session.Session(**aws_auth) s3 = session.resource("s3") s3.meta.client.copy( {'Bucket': source_bucket, 'Key': Path(source_key).as_posix()}, destination_bucket, Path(destination_key).as_posix() )
[docs]def copy_keys( source_bucket: str, source_keys: List[Union[str, Path]], destination_bucket: str, destination_keys: List[Union[str, Path]], threads: int = 5, aws_auth: Dict[str, str] = {} ) -> None: """Copy a list of S3 objects from source bucket to destination. Parameters ---------- source_bucket : str S3 bucket where the objects are stored. source_keys : List[Union[str, Path]] S3 keys where the objects are referenced. destination_bucket : str S3 destination bucket. destination_keys : List[Union[str, Path]] S3 destination keys. threads : int, optional Number of parallel uploads, by default 5. aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. Raises ------ IndexError When the source_keys and destination_keys have different length. ValueError When the keys list is empty. Examples -------- >>> copy_keys( ... source_bucket='bucket', ... source_keys=[ ... 'myFiles/song.mp3', ... Path('myFiles/photo.jpg'), ... ], ... destination_bucket='bucket', ... destination_keys=[ ... Path('myMusic/song.mp3'), ... 'myPhotos/photo.jpg', ... ] ... ) """ if len(source_keys) != len(destination_keys): raise IndexError("Key lists must have the same length") if len(source_keys) == 0: raise ValueError("Key list length must be greater than zero") with futures.ThreadPoolExecutor(max_workers=threads) as executor: executors = ( executor.submit(copy_object, source_bucket, source, destination_bucket, destination, aws_auth) for source, destination in zip(source_keys, destination_keys) ) for ex in executors: ex.result()
[docs]def copy_prefix( source_bucket: str, source_prefix: Union[str, Path], destination_bucket: str, change_prefix: Optional[Tuple[Union[str, Path], Union[str, Path]]] = None, filter_keys: Optional[str] = None, threads: int = 5, aws_auth: Dict[str, str] = {} ) -> None: """Copy S3 objects from source bucket to destination based on prefix filter. Parameters ---------- source_bucket : str S3 bucket where the objects are stored. source_prefix : Union[str, Path] S3 prefix where the objects are referenced. destination_bucket : str S3 destination bucket. change_prefix : Tuple[Union[str, Path], Union[str, Path]], optional Text to be replaced in keys prefixes, by default is None. The first element is the text to be replaced, the second is the replacement text. filter_keys : str, optional Basic search string to filter out keys on result (uses Unix shell-style wildcards), by default is None. For more about the search check "fnmatch" package. threads : int, optional Number of parallel uploads, by default 5. aws_auth: Dict[str, str] Contains AWS credentials, by default is empty. Examples -------- >>> copy_prefix( ... source_bucket='MyBucket', ... source_prefix='myFiles', ... destination_bucket='OtherBucket', ... filter_keys='*images*', ... change_prefix=('myFiles', 'backup') ... ) """ source_keys = list_objects( bucket=source_bucket, prefix=source_prefix, search_str=filter_keys, aws_auth=aws_auth ) destination_keys = source_keys if change_prefix is None else [ Path(key).as_posix().replace( Path(change_prefix[0]).as_posix(), Path(change_prefix[1]).as_posix() ) for key in source_keys ] copy_keys( source_bucket=source_bucket, source_keys=source_keys, destination_bucket=destination_bucket, destination_keys=destination_keys, threads=threads, aws_auth=aws_auth )