import boto3 import base64 import math import json from datetime import datetime class receiveFromSQS(object): """docstring for receiveFromSQS""" def __init__(self, session, queueURL): super(receiveFromSQS, self).__init__() self.session = session self.sqs = session.client('sqs') self.queueURL = queueURL self.messages = [] @classmethod def createSession(cls, profileName, queueURL): session = boto3.Session(profile_name=profileName) return cls(session, queueURL) def getQueueLength(self): attributeNames = ['ApproximateNumberOfMessages'] self.queueAttributes = self.sqs.get_queue_attributes( QueueUrl=self.queueURL, AttributeNames=attributeNames ) self.queueLength = int( self.queueAttributes['Attributes']['ApproximateNumberOfMessages'] ) return self.queueLength def _receiveSQSMessage( self, totalNumberOfMessages, maxNumberOfMessages=10 ): self.resp = [] self.loops = int( math.ceil(totalNumberOfMessages / maxNumberOfMessages) ) loopTrack = 0 if totalNumberOfMessages <= 10: maxNumberOfMessages = totalNumberOfMessages else: maxNumberOfMessagesFinal = 10 - ( (self.loops * maxNumberOfMessages) - totalNumberOfMessages ) print(maxNumberOfMessagesFinal) if self.loops == 0: raise RuntimeError('No messages in the queue') for i in range(0, self.loops): if loopTrack == self.loops - 1 and totalNumberOfMessages > 10: maxNumberOfMessages = maxNumberOfMessagesFinal self.resp.append( self.sqs.receive_message( QueueUrl=self.queueURL, MaxNumberOfMessages=maxNumberOfMessages, ) ) try: entries = [ { 'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle'], } for msg in self.resp[i]['Messages'] ] self._deleteSQSMessages(entries) loopTrack += 1 except KeyError: print("No messages in the queue") return self def _extractMessageFromSQS(self, totalNumberOfMessages): self.extractedMessages = [] self.receiptHandles = [] try: for i in range(0, self.loops): _loops = len(self.resp[i]['Messages']) for j in range(0, _loops): if 'Messages' in self.resp[i]: self.extractedMessages.append( self.resp[i]['Messages'][j]['Body'] ) else: print('No messages in the queue') except KeyError: print('No messages in the queue key') return self def _deleteSQSMessages(self, entries): self.respDelete = self.sqs.delete_message_batch( QueueUrl=self.queueURL, Entries=entries ) if len(self.respDelete['Successful']) != len(entries): raise RuntimeError( f'Failed to delete messages: entries={entries!r}' f' resp={self.respDelete!r}' ) def _decodeMessages(self): if len(self.extractedMessages) == 0: print('No messages to process') else: for message in self.extractedMessages: decoded = base64.b64decode(message).decode() self.messages.append(decoded) return self def receiveAllMessages(self, b64=True, _totalNumberOfMessages=None): if _totalNumberOfMessages is None: totalNumberOfMessages = self.getQueueLength() else: totalNumberOfMessages = _totalNumberOfMessages self._receiveSQSMessage(totalNumberOfMessages) self._extractMessageFromSQS( totalNumberOfMessages=totalNumberOfMessages ) if b64: self._decodeMessages() else: self.messages = self.extractedMessages return self def receiveNMessages(self, numberOfMessages, b64=True): self.receiveAllMessages( b64=b64, _totalNumberOfMessages=numberOfMessages ) return self def generateOutput(self, type='json'): if type == 'json': self.output = json.dumps(self.messages) return self.output def savetoDisk(self, path): self.timeNow = datetime.now().strftime('%d-%m-%Y_%H:%M:%S') if self.output is None: self.generateOutput() if len(self.messages) > 0: with open(f'{path}/{self.timeNow}.json', 'w+') as outputFile: outputFile.write(self.output) else: print('No messages to save') inst = receiveFromSQS.createSession( profileName='plex-aws', queueURL='https://sqs.eu-west-1.amazonaws' '.com/745437999005/slack-bot.fifo', ) output = inst.receiveNMessages(numberOfMessages=4).generateOutput(type='json') inst.savetoDisk( '/Users/dtomlinson/OneDrive - William Hill' ' Organisation Limited/Mac/git_repos/python-VM/slack-bot/traffic-scraper' '/prd' ) # inst.receiveAllMessages() for item in inst.messages: print(item)