From 0773fa28c5bc1871483252af85d39191d5ed13f7 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Thu, 10 Oct 2019 01:19:06 +0100 Subject: [PATCH] updating receive script --- .../pullTrafficInfo.cpython-37.pyc | Bin 5044 -> 5044 bytes .../traffic-scraper/prd/receiveFromSQS.py | 111 ++++++++++-------- slack-bot/traffic-scraper/prd/sendToSQS.py | 5 +- 3 files changed, 65 insertions(+), 51 deletions(-) diff --git a/slack-bot/traffic-scraper/prd/__pycache__/pullTrafficInfo.cpython-37.pyc b/slack-bot/traffic-scraper/prd/__pycache__/pullTrafficInfo.cpython-37.pyc index d0a080d16baab822b5258edbb41452c1bc817eb9..a78f1a7769435790789439f93d3f0a845b43ca17 100644 GIT binary patch delta 20 acmdm@zD1qeiI&E3eoQWyX<=mkFj delta 20 acmdm@zD1qeiI diff --git a/slack-bot/traffic-scraper/prd/receiveFromSQS.py b/slack-bot/traffic-scraper/prd/receiveFromSQS.py index ccc47b1..dc68e01 100644 --- a/slack-bot/traffic-scraper/prd/receiveFromSQS.py +++ b/slack-bot/traffic-scraper/prd/receiveFromSQS.py @@ -29,44 +29,68 @@ class receiveFromSQS(object): return self.queueLength def _receiveSQSMessage( - self, - totalNumberOfMessages, - maxNumberOfMessages=10, - deleteOnReceipt=False, + self, totalNumberOfMessages, maxNumberOfMessages=10 ): - self.response = [] - loops = int(math.ceil(totalNumberOfMessages / maxNumberOfMessages)) - for i in range(0, loops * 2): - self.response.append( + self.resp = [] + self.loops = int( + math.ceil(totalNumberOfMessages / maxNumberOfMessages) + ) + if totalNumberOfMessages <= 10: + maxNumberOfMessages = totalNumberOfMessages + else: + # Find how many times total-loop*max, on last loop + # replace max with this + maxNumberOfMessagesFinal = self.loops * maxNumberOfMessages - abs(10 - ( + totalNumberOfMessages - (self.loops * maxNumberOfMessages) + )) + print(maxNumberOfMessagesFinal) + if self.loops == 0: + raise RuntimeError('No messages in the queue') + for i in range(0, self.loops): + loopTrack = 0 + if loopTrack == self.loops: + maxNumberOfMessages = maxNumberOfMessagesFinal + self.resp.append( self.sqs.receive_message( QueueUrl=self.queueURL, MaxNumberOfMessages=maxNumberOfMessages, ) ) + entries = [ # Needs Keyerror try + {'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']} + for msg in self.resp[i]['Messages'] + ] + self._deleteSQSMessages(entries) + loopTrack += 1 return self - def _extractMessageFromSQS(self, totalNumberOfMessages, deleteOnReceipt): + def _extractMessageFromSQS(self, totalNumberOfMessages): self.extractedMessages = [] self.receiptHandles = [] try: - loops = len(self.response) - for i in range(0, loops): - if 'Messages' in self.response[i]: - _message = self.response[i]['Messages'] - for rawMessage in _message: - self.extractedMessages.append(rawMessage['Body']) - # self.receiptHandles.append(rawMessage['ReceiptHandle']) - if deleteOnReceipt: - self.sqs.delete_message( - QueueUrl=self.queueURL, - ReceiptHandle=rawMessage['ReceiptHandle'], - ) - else: - print('No messages in the queue') + for i in range(0, self.loops): + _loops = len(self.resp[i]['Messages']) + for j in range(0, _loops): + if 'Messages' in self.resp[i]: + self.extractedMessages.append( + self.resp[i]['Messages'][j]['Body'] + ) + else: + print('No messages in the queue') except KeyError: - print('No messages in the queue') + print('No messages in the queue key') return self + def _deleteSQSMessages(self, entries): + self.respDelete = self.sqs.delete_message_batch( + QueueUrl=self.queueURL, Entries=entries + ) + if len(self.respDelete['Successful']) != len(entries): + raise RuntimeError( + f'Failed to delete messages: entries={entries!r}' + f' resp={self.respDelete!r}' + ) + def _decodeMessages(self): if len(self.extractedMessages) == 0: print('No messages to process') @@ -76,29 +100,27 @@ class receiveFromSQS(object): self.messages.append(decoded) return self - def _deleteMessage(self, ReceiptHandle): - self.sqs.delete_message( - QueueUrl=self.queueURL, ReceiptHandle=ReceiptHandle - ) - - def receiveAllMessages(self, b64=True, deleteOnReceipt=False): - totalNumberOfMessages = self.getQueueLength() - self._receiveSQSMessage( - totalNumberOfMessages, deleteOnReceipt=deleteOnReceipt - ) + def receiveAllMessages(self, b64=True, _totalNumberOfMessages=None): + if _totalNumberOfMessages is None: + totalNumberOfMessages = self.getQueueLength() + else: + totalNumberOfMessages = _totalNumberOfMessages + self._receiveSQSMessage(totalNumberOfMessages) self._extractMessageFromSQS( - totalNumberOfMessages=totalNumberOfMessages, - deleteOnReceipt=deleteOnReceipt + totalNumberOfMessages=totalNumberOfMessages ) if b64: self._decodeMessages() else: self.messages = self.extractedMessages - # if deleteOnReceipt: - # for receipt in self.receiptHandles: - # self._deleteMessage(receipt) return self + def receiveNMessages(self, numberOfMessages, b64=True): + self.receiveAllMessages( + b64=b64, _totalNumberOfMessages=numberOfMessages + ) + pass + inst = receiveFromSQS.createSession( profileName='plex-aws', @@ -106,15 +128,6 @@ inst = receiveFromSQS.createSession( '.com/745437999005/slack-bot.fifo', ) -inst.receiveAllMessages(deleteOnReceipt=False) +inst.receiveNMessages(numberOfMessages=12) for item in inst.messages: print(item) - -# queueLength = inst.getQueueLength() - -# response = inst._receiveSQSMessage(queueLength)._extractMessageFromSQS( -# queueLength -# ) -# # messages = inst._receiveSQSMessage(12)._extractMessageFromSQS(12) -# for item in response.message: -# print(item) diff --git a/slack-bot/traffic-scraper/prd/sendToSQS.py b/slack-bot/traffic-scraper/prd/sendToSQS.py index 6b7b81f..7bf1997 100644 --- a/slack-bot/traffic-scraper/prd/sendToSQS.py +++ b/slack-bot/traffic-scraper/prd/sendToSQS.py @@ -58,5 +58,6 @@ instM = ( .generateOutput() ) -for item in instM.output: - inst.sendMessage(message=item, messageGroupId="slack-bot-M62") +for _ in range(0, 5): + for item in instM.output: + inst.sendMessage(message=item, messageGroupId="slack-bot-M62")