Files
python-VM/boto3/pull.py

250 lines
8.4 KiB
Python

import boto3
import botocore
import os
from datetime import date
import sys
import logging
import argparse
import glob
# Set Global Variables
log_location = 'pull.log'
remote_folder = ['bot_predictions']
remote_file_prefix = ['blocking_suggestions_']
append_date = [True]
date_format = ['%Y-%m-%d']
bucket = ['td-ingest-storage-williamhill']
access_key = ['AKIAYJXVWMRHQ2OGNHLA']
secret_key = ['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']
class downloadFiles(object):
"""docstring for downloadFiles"""
today = date.today()
def __init__(self,
client,
resource,
bucket,
remote_folder,
remote_file_prefix,
local_path,
append_date=False,
date_format=''):
super(downloadFiles, self).__init__()
self.client = client
self.resource = resource
self.bucket = bucket
self.append_date = append_date
self.date_format = date_format
self.remote_folder = self._folder_fixer(remote_folder)
self.dest = f'{self.remote_folder!s}{remote_file_prefix!s}'
self.local_path = local_path
self.remote_list, self.local_list, self.local_file_list = \
(list() for _ in range(3))
@staticmethod
def generate_date(date_format):
date = downloadFiles.today.strftime(date_format)
return date
@staticmethod
def _folder_fixer(folder):
try:
if folder[-1] != '/':
folder = f'{folder}/'
except IndexError:
folder = ''
return folder
def get_path(self):
if self.local_path:
self.local_path = self._folder_fixer(self.local_path)
logger.info(f'path entered is {self.local_path}')
return self
else:
self.local_path = os.getcwd()
self.local_path = self._folder_fixer(self.local_path)
self.local_path = f'{self.local_path}blocking_suggestions/'
logger.info(f'no path entered, using current directory '
f'{self.local_path}')
return self
def get_files(self):
if self.append_date:
date = f'{self.generate_date(self.date_format)!s}'
else:
date = ''
self.dest = f'{self.dest!s}{date!s}'
paginator = self.client.get_paginator('list_objects')
iterator = paginator.paginate(Bucket=self.bucket, Prefix=self.dest)
self.filtered = iterator.search('Contents[*].Key')
for i in self.filtered:
try:
self.remote_list.append(i)
self.local_list.append(
f'{self.local_path}{i[len(self.remote_folder):]}'
)
self.local_file_list.append(
f'{i[len(self.remote_folder):]}'
)
except TypeError:
logger.info('no files available to download -- exiting')
raise SystemExit
logger.debug(f'remote files are {self.remote_list}')
logger.debug(f'saving files locally to {self.local_list}')
return self
def get_history(self):
self.history_file = f'{self.local_path}.history.txt'
try:
logger.info('opening history file')
open(self.history_file, 'a').close()
pass
except FileNotFoundError:
logger.critical('history file cannot be found or created'
' - check permissions of the folder.')
raise
self.history_list = \
[line.rstrip('\n') for line in open(self.history_file)]
return self
def remove_files(self):
logger.info('attempting to clear current files')
current_files = glob.glob(f'{self.local_path}[!history.txt]*')
if current_files:
for i in current_files:
try:
os.remove(i)
logger.info(f'removed {i}')
except OSError:
logger.exception('Error:')
else:
logger.info('no files to remove')
return self
def download_files(self):
for remote_file, local_file_with_path, local_file in zip(
self.remote_list, self.local_list, self.local_file_list):
if local_file not in self.history_list:
with open(local_file_with_path, 'wb'), \
open(self.history_file, 'a') as hist:
try:
self.resource.Bucket(self.bucket).download_file(
remote_file, local_file_with_path)
hist.write(f'\n{local_file}')
logger.info(f'downloaded {local_file}')
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
print(f'The object {remote_file} does not exist.')
else:
raise
if local_file in self.history_list:
logger.debug(f'{local_file} already downloaded - skipping')
return self
def _call():
global args, debug
parser = argparse.ArgumentParser(description="""
downloads any new files for the current day from an S3 bucket. \
uses a local history file to track what has been \
previously downloaded in the download path.
""")
parser.add_argument('--path', type=str,
help='enter pull path to download to. if left \
blank will use the same location as the script.',
default='')
parser.add_argument('--debug', action='store_true', default=False,
help='Use this to log DEBUG information.')
args = parser.parse_args()
debug = vars(args)['debug']
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
main(_clients=_clients,
_resources=_resources,
_buckets=_buckets,
remote_folder=remote_folder,
remote_file_prefix=remote_file_prefix,
append_date=append_date,
date_format=date_format,
**vars(args))
def main(*args,
_clients={'client0': ''},
_resources={'resource0': ''},
_buckets={'bucket0': ''},
remote_folder=[''],
remote_file_prefix=[''],
append_date=['True'],
date_format=[''],
path='',
**kwargs):
logger.info('========= SCRIPT STARTED =========')
instance = downloadFiles(client=_clients['client0'],
resource=_resources['resource0'],
bucket=_buckets['bucket0'],
remote_folder=remote_folder[0],
remote_file_prefix=remote_file_prefix[0],
local_path=path,
append_date=append_date[0],
date_format=date_format[0])
instance.get_path().get_files().get_history().remove_files()\
.download_files()
logger.info('========= SCRIPT FINISHED =========')
if __name__ == '__main__':
args, debug = '', ''
# define logging
logger = logging.getLogger(__name__)
c_handler = logging.StreamHandler(sys.stdout)
f_handler = logging.FileHandler(log_location)
c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
f_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
logger.addHandler(c_handler)
logger.addHandler(f_handler)
_clients = {}
_resources = {}
_buckets = {}
for i in range(0, len(bucket)):
_clients[f'client{i}'] =\
boto3.client('s3',
aws_access_key_id=f'{access_key[i]}',
aws_secret_access_key=f'{secret_key[i]}')
_resources[f'resource{i}'] =\
boto3.resource('s3',
aws_access_key_id=f'{access_key[i]}',
aws_secret_access_key=f'{secret_key[i]}')
_buckets[f'bucket{i}'] = f'{bucket[i]}'
try:
_length = len(remote_folder)
if _length == 0:
remote_folder = ['']
elif remote_folder[0] == 'root':
remote_folder = ['']
else:
pass
except NameError:
remote_folder = ['']
_call()