Source code for catcher_modules.service.s3

from os.path import join
from typing import List

from catcher.steps.external_step import ExternalStep
from catcher.steps.step import Step, update_variables
from catcher.utils.logger import debug
from catcher.utils.misc import fill_template_str


[docs]class S3(ExternalStep): """ Allows you to get/put/list/delete files in Amazon `S3 <https://aws.amazon.com/s3/>`_ Useful hint: for local testing you can use `Minio <https://min.io/>`_ run in docker as it is S3 API compatible. :Input: :config: s3 config object, used in other s3 commands. - key_id: access key id - secret_key: secret for the access key - region: region. *Optional*. - url: endpoint_url url. Can be used to run against Minio. *Optional* :put: put file to s3 - config: s3 config object - path: path including the filename. First dir treats like a bucket. F.e. /my_bucket/subdir/file or my_bucket/subfir/file - content: file's content. *Optional* - content_resource: path to a file. *Optional*. Either `content` or `content_resource` must be set. :get: Get file from s3 - config: s3 config object - path: path including the filename :list: List S3 directory - config: s3 config object - path: path to the directory being listed :delete: Delete file or directory from S3 - config: s3 config object - path: path to the deleted - recursive: if path is directory and recursive is true - will delete directory with all content. *Optional*, default is false. :Examples: Put data into s3 :: s3: put: config: '{{ s3_config }}' path: /foo/bar/file.csv content: '{{ my_data }}' Get data from s3 :: s3: get: config: '{{ s3_config }}' path: /foo/bar/file.csv register: {csv: '{{ OUTPUT }}'} List files :: s3: list: config: '{{ s3_config }}' path: /foo/bar/ register: {files: '{{ OUTPUT }}'} Delete file :: s3: delete: config: '{{ s3_config }}' path: '/remove/me' recursive: true """ @update_variables def action(self, includes: dict, variables: dict) -> any: body = self.simple_input(variables) method = Step.filter_predefined_keys(body) # get/put/list oper = body[method] conf = oper['config'] import boto3 s3_client = boto3.client('s3', endpoint_url=conf.get('url'), aws_access_key_id=conf['key_id'], aws_secret_access_key=conf['secret_key'], region_name=conf.get('region') ) path = oper['path'] if method == 'get': return variables, self._get_file(s3_client, path) elif method == 'put': content = oper.get('content') if not content: if 'content_resource' not in oper: raise ValueError('No content for s3 put') with open(join(variables['RESOURCES_DIR'], oper['content_resource']), 'r') as f: content = f.read() content = fill_template_str(content, variables) return variables, self._put_file(s3_client, path, content) elif method == 'list': return variables, self._list_dir(conf, path) elif method == 'delete': return variables, self._delete(conf, path) else: raise AttributeError('unknown method: ' + method) def _get_file(self, s3_client, path): bucket, filename = self._parse_path(path) debug('Get {}/{}'.format(bucket, filename)) response = s3_client.get_object(Bucket=bucket, Key=filename) # TODO check response return response['Body'].read().decode() def _put_file(self, s3_client, path, content, retry=True): from botocore.exceptions import ClientError bucket, filename = self._parse_path(path) debug('Put {}/{}'.format(bucket, filename)) try: res = s3_client.put_object(Bucket=bucket, Key=filename, Body=content) return self._check_response(res) except ClientError as e: if retry and hasattr(e, 'response') and 'Error' in e.response and 'Code' in e.response['Error']: if e.response['Error']['Code'] == 'NoSuchBucket': res = s3_client.create_bucket(Bucket=bucket) self._check_response(res) return self._put_file(s3_client, path, content, False) raise e def _list_dir(self, conf: dict, path: str) -> List[str]: import boto3 res = boto3.resource('s3', endpoint_url=conf.get('url'), aws_access_key_id=conf['key_id'], aws_secret_access_key=conf['secret_key'], region_name=conf.get('region') ) bucket, rest = self._parse_path(path) bucket = res.Bucket(bucket) data = [] for obj in bucket.objects.all(): if obj.key.startswith(rest): data += [obj.key] return data def _delete(self, conf: dict, path: str): bucket, filename = self._parse_path(path) try: files = self._list_dir(conf, path) except: files = [] if len(files) > 1 or (len(files) == 1 and not path.endswith(files[0])): [self._delete(conf, join(bucket, file)) for file in files] # delete files in directory debug('Delete {}/{}'.format(bucket, filename)) import boto3 res = boto3.resource('s3', endpoint_url=conf.get('url'), aws_access_key_id=conf['key_id'], aws_secret_access_key=conf['secret_key'], region_name=conf.get('region') ) if not filename: res.Bucket(bucket).delete() else: obj = res.Object(bucket, filename) obj.delete() @staticmethod def _check_response(res): if 'ResponseMetadata' in res and 'HTTPStatusCode' in res['ResponseMetadata'] \ and res['ResponseMetadata']['HTTPStatusCode'] == 200: return True raise Exception("Operation failed") @staticmethod def _parse_path(path: str): splitted = [s for s in path.split('/') if s != ''] return splitted[0], '/'.join(splitted[1:])