121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
import boto3
|
|
import base64
|
|
import math
|
|
|
|
|
|
class receiveFromSQS(object):
|
|
"""docstring for receiveFromSQS"""
|
|
|
|
def __init__(self, session, queueURL):
|
|
super(receiveFromSQS, self).__init__()
|
|
self.session = session
|
|
self.sqs = session.client('sqs')
|
|
self.queueURL = queueURL
|
|
self.messages = []
|
|
|
|
@classmethod
|
|
def createSession(cls, profileName, queueURL):
|
|
session = boto3.Session(profile_name=profileName)
|
|
return cls(session, queueURL)
|
|
|
|
def getQueueLength(self):
|
|
attributeNames = ['ApproximateNumberOfMessages']
|
|
self.queueAttributes = self.sqs.get_queue_attributes(
|
|
QueueUrl=self.queueURL, AttributeNames=attributeNames
|
|
)
|
|
self.queueLength = int(
|
|
self.queueAttributes['Attributes']['ApproximateNumberOfMessages']
|
|
)
|
|
return self.queueLength
|
|
|
|
def _receiveSQSMessage(
|
|
self,
|
|
totalNumberOfMessages,
|
|
maxNumberOfMessages=10,
|
|
deleteOnReceipt=False,
|
|
):
|
|
self.response = []
|
|
loops = int(math.ceil(totalNumberOfMessages / maxNumberOfMessages))
|
|
for i in range(0, loops * 2):
|
|
self.response.append(
|
|
self.sqs.receive_message(
|
|
QueueUrl=self.queueURL,
|
|
MaxNumberOfMessages=maxNumberOfMessages,
|
|
)
|
|
)
|
|
return self
|
|
|
|
def _extractMessageFromSQS(self, totalNumberOfMessages, deleteOnReceipt):
|
|
self.extractedMessages = []
|
|
self.receiptHandles = []
|
|
try:
|
|
loops = len(self.response)
|
|
for i in range(0, loops):
|
|
if 'Messages' in self.response[i]:
|
|
_message = self.response[i]['Messages']
|
|
for rawMessage in _message:
|
|
self.extractedMessages.append(rawMessage['Body'])
|
|
# self.receiptHandles.append(rawMessage['ReceiptHandle'])
|
|
if deleteOnReceipt:
|
|
self.sqs.delete_message(
|
|
QueueUrl=self.queueURL,
|
|
ReceiptHandle=rawMessage['ReceiptHandle'],
|
|
)
|
|
else:
|
|
print('No messages in the queue')
|
|
except KeyError:
|
|
print('No messages in the queue')
|
|
return self
|
|
|
|
def _decodeMessages(self):
|
|
if len(self.extractedMessages) == 0:
|
|
print('No messages to process')
|
|
else:
|
|
for message in self.extractedMessages:
|
|
decoded = base64.b64decode(message).decode()
|
|
self.messages.append(decoded)
|
|
return self
|
|
|
|
def _deleteMessage(self, ReceiptHandle):
|
|
self.sqs.delete_message(
|
|
QueueUrl=self.queueURL, ReceiptHandle=ReceiptHandle
|
|
)
|
|
|
|
def receiveAllMessages(self, b64=True, deleteOnReceipt=False):
|
|
totalNumberOfMessages = self.getQueueLength()
|
|
self._receiveSQSMessage(
|
|
totalNumberOfMessages, deleteOnReceipt=deleteOnReceipt
|
|
)
|
|
self._extractMessageFromSQS(
|
|
totalNumberOfMessages=totalNumberOfMessages,
|
|
deleteOnReceipt=deleteOnReceipt
|
|
)
|
|
if b64:
|
|
self._decodeMessages()
|
|
else:
|
|
self.messages = self.extractedMessages
|
|
# if deleteOnReceipt:
|
|
# for receipt in self.receiptHandles:
|
|
# self._deleteMessage(receipt)
|
|
return self
|
|
|
|
|
|
inst = receiveFromSQS.createSession(
|
|
profileName='plex-aws',
|
|
queueURL='https://sqs.eu-west-1.amazonaws'
|
|
'.com/745437999005/slack-bot.fifo',
|
|
)
|
|
|
|
inst.receiveAllMessages(deleteOnReceipt=False)
|
|
for item in inst.messages:
|
|
print(item)
|
|
|
|
# queueLength = inst.getQueueLength()
|
|
|
|
# response = inst._receiveSQSMessage(queueLength)._extractMessageFromSQS(
|
|
# queueLength
|
|
# )
|
|
# # messages = inst._receiveSQSMessage(12)._extractMessageFromSQS(12)
|
|
# for item in response.message:
|
|
# print(item)
|