updating receiveFromSQS.py
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import boto3
|
||||
import base64
|
||||
import math
|
||||
|
||||
|
||||
class receiveFromSQS(object):
|
||||
@@ -19,59 +20,80 @@ class receiveFromSQS(object):
|
||||
def getQueueLength(self):
|
||||
attributeNames = ['ApproximateNumberOfMessages']
|
||||
self.queueAttributes = self.sqs.get_queue_attributes(
|
||||
QueueUrl=self.queueURL,
|
||||
AttributeNames=attributeNames,
|
||||
QueueUrl=self.queueURL, AttributeNames=attributeNames
|
||||
)
|
||||
self.queueLength = int(
|
||||
self.queueAttributes['Attributes'][
|
||||
'ApproximateNumberOfMessages'
|
||||
]
|
||||
self.queueAttributes['Attributes']['ApproximateNumberOfMessages']
|
||||
)
|
||||
return self.queueLength
|
||||
|
||||
def _receiveSQSMessage(self, maxNumberOfMessages=10):
|
||||
response = self.sqs.receive_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MaxNumberOfMessages=maxNumberOfMessages,
|
||||
)
|
||||
return response
|
||||
|
||||
def _extractMessageFromSQS(self):
|
||||
pass
|
||||
|
||||
def receiveMessage(
|
||||
def _receiveSQSMessage(
|
||||
self,
|
||||
b64=True,
|
||||
MaxNumberOfMessages=10,
|
||||
delete=False,
|
||||
endString='END',
|
||||
totalNumberOfMessages,
|
||||
maxNumberOfMessages=10,
|
||||
deleteOnReceipt=False,
|
||||
):
|
||||
while True:
|
||||
response = self.sqs.receive_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MaxNumberOfMessages=MaxNumberOfMessages,
|
||||
self.response = []
|
||||
loops = int(math.ceil(totalNumberOfMessages / maxNumberOfMessages))
|
||||
for i in range(0, loops):
|
||||
self.response.append(
|
||||
self.sqs.receive_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MaxNumberOfMessages=maxNumberOfMessages,
|
||||
)
|
||||
)
|
||||
if 'Messages' in response:
|
||||
for message in response['Messages']:
|
||||
if b64:
|
||||
decoded = base64.b64decode(
|
||||
message['Body']
|
||||
)
|
||||
msg = decoded.decode()
|
||||
print(msg)
|
||||
if msg == endString:
|
||||
return self
|
||||
else:
|
||||
msg = print(message['Body'])
|
||||
if msg == endString:
|
||||
return self
|
||||
else:
|
||||
print('Queue is empty')
|
||||
break
|
||||
return self
|
||||
|
||||
def receiveAllMessages(self, b64=True, delete=False):
|
||||
while True:
|
||||
self.response = self._receiveSQSMessage()
|
||||
def _extractMessageFromSQS(self, totalNumberOfMessages):
|
||||
self.messages = []
|
||||
try:
|
||||
loops = len(self.response)
|
||||
for i in range(0, loops):
|
||||
if 'Messages' in self.response[i]:
|
||||
_message = self.response[i]['Messages']
|
||||
for rawMessage in _message:
|
||||
self.messages.append(rawMessage['Body'])
|
||||
else:
|
||||
print("No messages in the queue")
|
||||
except KeyError:
|
||||
print("No messages in the queue")
|
||||
return self
|
||||
|
||||
# def receiveMessage(
|
||||
# self, b64=True, MaxNumberOfMessages=10, delete=False, endString='END'
|
||||
# ):
|
||||
# while True:
|
||||
# response = self.sqs.receive_message(
|
||||
# QueueUrl=self.queueURL,
|
||||
# MaxNumberOfMessages=MaxNumberOfMessages
|
||||
# )
|
||||
# if 'Messages' in response:
|
||||
# for message in response['Messages']:
|
||||
# if b64:
|
||||
# decoded = base64.b64decode(message['Body'])
|
||||
# msg = decoded.decode()
|
||||
# if msg == endString:
|
||||
# return self
|
||||
# else:
|
||||
# msg = print(message['Body'])
|
||||
# if msg == endString:
|
||||
# return self
|
||||
# else:
|
||||
# print('Queue is empty')
|
||||
# break
|
||||
|
||||
def receiveAllMessages(self, b64=True, deleteOnReceipt=False):
|
||||
totalNumberOfMessages = self.getQueueLength()
|
||||
self._receiveSQSMessage(
|
||||
totalNumberOfMessages,
|
||||
deleteOnReceipt=deleteOnReceipt
|
||||
)
|
||||
self._extractMessageFromSQS(
|
||||
totalNumberOfMessages=totalNumberOfMessages
|
||||
)
|
||||
if b64:
|
||||
pass # WRITE THIS IN!
|
||||
return self
|
||||
|
||||
|
||||
inst = receiveFromSQS.createSession(
|
||||
@@ -79,4 +101,16 @@ inst = receiveFromSQS.createSession(
|
||||
queueURL='https://sqs.eu-west-1.amazonaws'
|
||||
'.com/745437999005/slack-bot.fifo',
|
||||
)
|
||||
inst.getQueueLength()
|
||||
|
||||
inst.receiveAllMessages()
|
||||
for item in inst.messages:
|
||||
print(item)
|
||||
|
||||
# queueLength = inst.getQueueLength()
|
||||
|
||||
# response = inst._receiveSQSMessage(queueLength)._extractMessageFromSQS(
|
||||
# queueLength
|
||||
# )
|
||||
# # messages = inst._receiveSQSMessage(12)._extractMessageFromSQS(12)
|
||||
# for item in response.message:
|
||||
# print(item)
|
||||
|
||||
Reference in New Issue
Block a user