import boto3 import botocore import os from datetime import date import sys import logging import argparse import glob # Set Global Variables log_location = 'pull.log' remote_folder = ['bot_predictions'] remote_file_prefix = ['blocking_suggestions_'] append_date = [True] date_format = ['%Y-%m-%d'] bucket = ['td-ingest-storage-williamhill'] access_key = ['AKIAYJXVWMRHQ2OGNHLA'] secret_key = ['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'] class downloadFiles(object): """docstring for downloadFiles""" today = date.today() def __init__(self, client, resource, bucket, remote_folder, remote_file_prefix, local_path, append_date=False, date_format=''): super(downloadFiles, self).__init__() self.client = client self.resource = resource self.bucket = bucket self.append_date = append_date self.date_format = date_format self.remote_folder = self._folder_fixer(remote_folder) self.dest = f'{self.remote_folder!s}{remote_file_prefix!s}' self.local_path = local_path self.remote_list, self.local_list, self.local_file_list = \ (list() for _ in range(3)) @staticmethod def generate_date(date_format): date = downloadFiles.today.strftime(date_format) return date @staticmethod def _folder_fixer(folder): try: if folder[-1] != '/': folder = f'{folder}/' except IndexError: folder = '' return folder def get_path(self): if self.local_path: self.local_path = self._folder_fixer(self.local_path) logger.info(f'path entered is {self.local_path}') return self else: self.local_path = os.getcwd() self.local_path = self._folder_fixer(self.local_path) self.local_path = f'{self.local_path}blocking_suggestions/' logger.info(f'no path entered, using current directory ' f'{self.local_path}') return self def get_files(self): if self.append_date: date = f'{self.generate_date(self.date_format)!s}' else: date = '' self.dest = f'{self.dest!s}{date!s}' paginator = self.client.get_paginator('list_objects') iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.dest) self.filtered = iterator.search('Contents[*].Key') for i in self.filtered: try: self.remote_list.append(i) self.local_list.append( f'{self.local_path}{i[len(self.remote_folder):]}' ) self.local_file_list.append( f'{i[len(self.remote_folder):]}' ) except TypeError: logger.info('no files available to download -- exiting') raise SystemExit logger.debug(f'remote files are {self.remote_list}') logger.debug(f'saving files locally to {self.local_list}') return self def get_history(self): self.history_file = f'{self.local_path}.history.txt' try: logger.info('opening history file') open(self.history_file, 'a').close() pass except FileNotFoundError: logger.critical('history file cannot be found or created' ' - check permissions of the folder.') raise self.history_list = \ [line.rstrip('\n') for line in open(self.history_file)] return self def remove_files(self): logger.info('attempting to clear current files') current_files = glob.glob(f'{self.local_path}[!history.txt]*') if current_files: for i in current_files: try: os.remove(i) logger.info(f'removed {i}') except OSError: logger.exception('Error:') else: logger.info('no files to remove') return self def download_files(self): for remote_file, local_file_with_path, local_file in zip( self.remote_list, self.local_list, self.local_file_list): if local_file not in self.history_list: with open(local_file_with_path, 'wb'), \ open(self.history_file, 'a') as hist: try: self.resource.Bucket(self.bucket).download_file( remote_file, local_file_with_path) hist.write(f'\n{local_file}') logger.info(f'downloaded {local_file}') except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == '404': print(f'The object {remote_file} does not exist.') else: raise if local_file in self.history_list: logger.debug(f'{local_file} already downloaded - skipping') return self def _call(): global args, debug parser = argparse.ArgumentParser(description=""" downloads any new files for the current day from an S3 bucket. \ uses a local history file to track what has been \ previously downloaded in the download path. """) parser.add_argument('--path', type=str, help='enter pull path to download to. if left \ blank will use the same location as the script.', default='') parser.add_argument('--debug', action='store_true', default=False, help='Use this to log DEBUG information.') args = parser.parse_args() debug = vars(args)['debug'] if debug: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) main(_clients=_clients, _resources=_resources, _buckets=_buckets, remote_folder=remote_folder, remote_file_prefix=remote_file_prefix, append_date=append_date, date_format=date_format, **vars(args)) def main(*args, _clients={'client0': ''}, _resources={'resource0': ''}, _buckets={'bucket0': ''}, remote_folder=[''], remote_file_prefix=[''], append_date=['True'], date_format=[''], path='', **kwargs): logger.info('========= SCRIPT STARTED =========') instance = downloadFiles(client=_clients['client0'], resource=_resources['resource0'], bucket=_buckets['bucket0'], remote_folder=remote_folder[0], remote_file_prefix=remote_file_prefix[0], local_path=path, append_date=append_date[0], date_format=date_format[0]) instance.get_path().get_files().get_history().remove_files()\ .download_files() logger.info('========= SCRIPT FINISHED =========') if __name__ == '__main__': args, debug = '', '' # define logging logger = logging.getLogger(__name__) c_handler = logging.StreamHandler(sys.stdout) f_handler = logging.FileHandler(log_location) c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') f_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) logger.addHandler(c_handler) logger.addHandler(f_handler) _clients = {} _resources = {} _buckets = {} for i in range(0, len(bucket)): _clients[f'client{i}'] =\ boto3.client('s3', aws_access_key_id=f'{access_key[i]}', aws_secret_access_key=f'{secret_key[i]}') _resources[f'resource{i}'] =\ boto3.resource('s3', aws_access_key_id=f'{access_key[i]}', aws_secret_access_key=f'{secret_key[i]}') _buckets[f'bucket{i}'] = f'{bucket[i]}' try: _length = len(remote_folder) if _length == 0: remote_folder = [''] elif remote_folder[0] == 'root': remote_folder = [''] else: pass except NameError: remote_folder = [''] _call()