import boto3 import base64 import math class receiveFromSQS(object): """docstring for receiveFromSQS""" def __init__(self, session, queueURL): super(receiveFromSQS, self).__init__() self.session = session self.sqs = session.client('sqs') self.queueURL = queueURL self.messages = [] @classmethod def createSession(cls, profileName, queueURL): session = boto3.Session(profile_name=profileName) return cls(session, queueURL) def getQueueLength(self): attributeNames = ['ApproximateNumberOfMessages'] self.queueAttributes = self.sqs.get_queue_attributes( QueueUrl=self.queueURL, AttributeNames=attributeNames ) self.queueLength = int( self.queueAttributes['Attributes']['ApproximateNumberOfMessages'] ) return self.queueLength def _receiveSQSMessage( self, totalNumberOfMessages, maxNumberOfMessages=10 ): self.resp = [] self.loops = int( math.ceil(totalNumberOfMessages / maxNumberOfMessages) ) if totalNumberOfMessages <= 10: maxNumberOfMessages = totalNumberOfMessages else: # Find how many times total-loop*max, on last loop # replace max with this maxNumberOfMessagesFinal = self.loops * maxNumberOfMessages - abs(10 - ( totalNumberOfMessages - (self.loops * maxNumberOfMessages) )) print(maxNumberOfMessagesFinal) if self.loops == 0: raise RuntimeError('No messages in the queue') for i in range(0, self.loops): loopTrack = 0 if loopTrack == self.loops: maxNumberOfMessages = maxNumberOfMessagesFinal self.resp.append( self.sqs.receive_message( QueueUrl=self.queueURL, MaxNumberOfMessages=maxNumberOfMessages, ) ) entries = [ # Needs Keyerror try {'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']} for msg in self.resp[i]['Messages'] ] self._deleteSQSMessages(entries) loopTrack += 1 return self def _extractMessageFromSQS(self, totalNumberOfMessages): self.extractedMessages = [] self.receiptHandles = [] try: for i in range(0, self.loops): _loops = len(self.resp[i]['Messages']) for j in range(0, _loops): if 'Messages' in self.resp[i]: self.extractedMessages.append( self.resp[i]['Messages'][j]['Body'] ) else: print('No messages in the queue') except KeyError: print('No messages in the queue key') return self def _deleteSQSMessages(self, entries): self.respDelete = self.sqs.delete_message_batch( QueueUrl=self.queueURL, Entries=entries ) if len(self.respDelete['Successful']) != len(entries): raise RuntimeError( f'Failed to delete messages: entries={entries!r}' f' resp={self.respDelete!r}' ) def _decodeMessages(self): if len(self.extractedMessages) == 0: print('No messages to process') else: for message in self.extractedMessages: decoded = base64.b64decode(message).decode() self.messages.append(decoded) return self def receiveAllMessages(self, b64=True, _totalNumberOfMessages=None): if _totalNumberOfMessages is None: totalNumberOfMessages = self.getQueueLength() else: totalNumberOfMessages = _totalNumberOfMessages self._receiveSQSMessage(totalNumberOfMessages) self._extractMessageFromSQS( totalNumberOfMessages=totalNumberOfMessages ) if b64: self._decodeMessages() else: self.messages = self.extractedMessages return self def receiveNMessages(self, numberOfMessages, b64=True): self.receiveAllMessages( b64=b64, _totalNumberOfMessages=numberOfMessages ) pass inst = receiveFromSQS.createSession( profileName='plex-aws', queueURL='https://sqs.eu-west-1.amazonaws' '.com/745437999005/slack-bot.fifo', ) inst.receiveNMessages(numberOfMessages=12) for item in inst.messages: print(item)