258 lines
9.0 KiB
Python
258 lines
9.0 KiB
Python
import boto3
|
|
import botocore
|
|
import os
|
|
from datetime import date, timedelta
|
|
import sys
|
|
import logging
|
|
import argparse
|
|
import glob
|
|
|
|
# Set Global Variables
|
|
log_location = 'pull.log'
|
|
|
|
remote_folder = ['bot_predictions/']
|
|
remote_file_prefix = ['blocking_suggestions_']
|
|
|
|
append_date = ['True']
|
|
date_format = ['%Y-%m-%d']
|
|
|
|
bucket = ['td-ingest-storage-williamhill']
|
|
access_key = ['AKIAYJXVWMRHQ2OGNHLA']
|
|
secret_key = ['0/4wxdBmpiU3gK1QHLk4me0zj2RHuNAcSOfgJm1B']
|
|
|
|
|
|
class downloadFiles(object):
|
|
"""docstring for downloadFiles"""
|
|
today = date.today()
|
|
yesterday = date.today() - timedelta(1)
|
|
|
|
def __init__(self,
|
|
client,
|
|
resource,
|
|
bucket,
|
|
remote_folder,
|
|
remote_file_prefix,
|
|
local_path,
|
|
append_date=False,
|
|
date_format=''):
|
|
super(downloadFiles, self).__init__()
|
|
self.client = client
|
|
self.resource = resource
|
|
self.bucket = bucket
|
|
self.append_date = append_date
|
|
self.date_format = date_format
|
|
self.remote_folder = self._folder_fixer(remote_folder)
|
|
self.dest = f'{self.remote_folder!s}{remote_file_prefix!s}'
|
|
self.local_path = local_path
|
|
self.remote_list, self.local_list, self.local_file_list = \
|
|
(list() for _ in range(3))
|
|
|
|
@staticmethod
|
|
def generate_date(date_format, relative_day='today'):
|
|
if relative_day == 'today':
|
|
date = downloadFiles.today.strftime(date_format)
|
|
elif relative_day == 'yesterday':
|
|
date = downloadFiles.yesterday.strftime(date_format)
|
|
return date
|
|
|
|
@staticmethod
|
|
def _folder_fixer(folder):
|
|
try:
|
|
if folder[-1] != '/':
|
|
folder = f'{folder}/'
|
|
except IndexError:
|
|
folder = ''
|
|
return folder
|
|
|
|
def get_path(self):
|
|
if self.local_path:
|
|
self.local_path = self._folder_fixer(self.local_path)
|
|
logger.info(f'path entered is {self.local_path}')
|
|
return self
|
|
else:
|
|
self.local_path = os.getcwd()
|
|
self.local_path = self._folder_fixer(self.local_path)
|
|
self.local_path = f'{self.local_path}blocking_suggestions/'
|
|
logger.info(f'no path entered, using current directory '
|
|
f'{self.local_path}')
|
|
return self
|
|
|
|
def get_files(self):
|
|
if self.append_date:
|
|
date_today = self.generate_date(self.date_format)
|
|
date_yesterday = self.generate_date(self.date_format, 'yesterday')
|
|
else:
|
|
date_today = ''
|
|
date_yesterday = ''
|
|
self.dest_list = []
|
|
self.dest_list.append(f'{self.dest!s}{date_today!s}')
|
|
self.dest_list.append(f'{self.dest!s}{date_yesterday!s}')
|
|
for dest in self.dest_list:
|
|
paginator = self.client.get_paginator('list_objects')
|
|
iterator = paginator.paginate(Bucket=self.bucket, Prefix=dest)
|
|
self.filtered = iterator.search('Contents[*].Key')
|
|
for i in self.filtered:
|
|
try:
|
|
self.remote_list.append(i)
|
|
self.local_list.append(
|
|
f'{self.local_path}{i[len(self.remote_folder):]}'
|
|
)
|
|
self.local_file_list.append(
|
|
f'{i[len(self.remote_folder):]}'
|
|
)
|
|
except TypeError:
|
|
logger.info('no files available to download -- exiting')
|
|
raise SystemExit
|
|
logger.debug(f'remote files are {self.remote_list}')
|
|
logger.debug(f'saving files locally to {self.local_list}')
|
|
return self
|
|
|
|
def get_history(self):
|
|
self.history_file = f'{self.local_path}.history.txt'
|
|
try:
|
|
logger.info('opening history file')
|
|
open(self.history_file, 'a').close()
|
|
pass
|
|
except FileNotFoundError:
|
|
logger.critical('history file cannot be found or created'
|
|
' - check permissions of the folder.')
|
|
raise
|
|
self.history_list = \
|
|
[line.rstrip('\n') for line in open(self.history_file)]
|
|
return self
|
|
|
|
def remove_files(self):
|
|
logger.info('attempting to clear current files')
|
|
current_files = glob.glob(f'{self.local_path}[!history.txt]*')
|
|
if current_files:
|
|
for i in current_files:
|
|
try:
|
|
os.remove(i)
|
|
logger.info(f'removed {i}')
|
|
except OSError:
|
|
logger.exception('Error:')
|
|
else:
|
|
logger.info('no files to remove')
|
|
return self
|
|
|
|
def download_files(self):
|
|
for remote_file, local_file_with_path, local_file in zip(
|
|
self.remote_list, self.local_list, self.local_file_list):
|
|
if local_file not in self.history_list:
|
|
with open(local_file_with_path, 'wb'), \
|
|
open(self.history_file, 'a') as hist:
|
|
try:
|
|
self.resource.Bucket(self.bucket).download_file(
|
|
remote_file, local_file_with_path)
|
|
hist.write(f'\n{local_file}')
|
|
logger.info(f'downloaded {local_file}')
|
|
except botocore.exceptions.ClientError as e:
|
|
if e.response['Error']['Code'] == '404':
|
|
print(f'The object {remote_file} does not exist.')
|
|
else:
|
|
raise
|
|
if local_file in self.history_list:
|
|
logger.debug(f'{local_file} already downloaded - skipping')
|
|
return self
|
|
|
|
|
|
def _call():
|
|
global args, debug
|
|
parser = argparse.ArgumentParser(description="""
|
|
downloads any new files for the current day from an S3 bucket. \
|
|
uses a local history file to track what has been \
|
|
previously downloaded in the download path.
|
|
""")
|
|
parser.add_argument('--path', type=str,
|
|
help='enter pull path to download to. if left \
|
|
blank will use the same location as the script.',
|
|
default='')
|
|
parser.add_argument('--debug', action='store_true', default=False,
|
|
help='Use this to log DEBUG information.')
|
|
|
|
args = parser.parse_args()
|
|
debug = vars(args)['debug']
|
|
|
|
if debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
else:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
main(_clients=_clients,
|
|
_resources=_resources,
|
|
_buckets=_buckets,
|
|
remote_folder=remote_folder,
|
|
remote_file_prefix=remote_file_prefix,
|
|
append_date=append_date,
|
|
date_format=date_format,
|
|
**vars(args))
|
|
|
|
|
|
def main(*args,
|
|
_clients={'client0': ''},
|
|
_resources={'resource0': ''},
|
|
_buckets={'bucket0': ''},
|
|
remote_folder=[''],
|
|
remote_file_prefix=[''],
|
|
append_date=['True'],
|
|
date_format=[''],
|
|
path='',
|
|
**kwargs):
|
|
logger.info('========= SCRIPT STARTED =========')
|
|
instance = downloadFiles(client=_clients['client0'],
|
|
resource=_resources['resource0'],
|
|
bucket=_buckets['bucket0'],
|
|
remote_folder=remote_folder[0],
|
|
remote_file_prefix=remote_file_prefix[0],
|
|
local_path=path,
|
|
append_date=append_date[0],
|
|
date_format=date_format[0])
|
|
instance.get_path().get_files().get_history().remove_files()\
|
|
.download_files()
|
|
logger.info('========= SCRIPT FINISHED =========')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
args, debug = '', ''
|
|
|
|
# define logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
c_handler = logging.StreamHandler(sys.stdout)
|
|
f_handler = logging.FileHandler(log_location)
|
|
|
|
c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
f_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
c_handler.setFormatter(c_format)
|
|
f_handler.setFormatter(f_format)
|
|
|
|
logger.addHandler(c_handler)
|
|
logger.addHandler(f_handler)
|
|
|
|
_clients = {}
|
|
_resources = {}
|
|
_buckets = {}
|
|
|
|
for i in range(0, len(bucket)):
|
|
_clients[f'client{i}'] =\
|
|
boto3.client('s3',
|
|
aws_access_key_id=f'{access_key[i]}',
|
|
aws_secret_access_key=f'{secret_key[i]}')
|
|
_resources[f'resource{i}'] =\
|
|
boto3.resource('s3',
|
|
aws_access_key_id=f'{access_key[i]}',
|
|
aws_secret_access_key=f'{secret_key[i]}')
|
|
_buckets[f'bucket{i}'] = f'{bucket[i]}'
|
|
|
|
try:
|
|
_length = len(remote_folder)
|
|
if _length == 0:
|
|
remote_folder = ['']
|
|
elif remote_folder[0] == 'root':
|
|
remote_folder = ['']
|
|
else:
|
|
pass
|
|
except NameError:
|
|
remote_folder = ['']
|
|
_call()
|