adding initial boilerplates
This commit is contained in:
141
python/aws/sqs/receiveFromSQS.py
Normal file
141
python/aws/sqs/receiveFromSQS.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import boto3
|
||||
import base64
|
||||
import math
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class receiveFromSQS(object):
|
||||
"""docstring for receiveFromSQS"""
|
||||
|
||||
def __init__(self, session, queueURL):
|
||||
super(receiveFromSQS, self).__init__()
|
||||
self.session = session
|
||||
self.sqs = session.client('sqs')
|
||||
self.queueURL = queueURL
|
||||
self.messages = []
|
||||
|
||||
@classmethod
|
||||
def createSession(cls, profileName, queueURL):
|
||||
session = boto3.Session(profile_name=profileName)
|
||||
return cls(session, queueURL)
|
||||
|
||||
def getQueueLength(self):
|
||||
attributeNames = ['ApproximateNumberOfMessages']
|
||||
self.queueAttributes = self.sqs.get_queue_attributes(
|
||||
QueueUrl=self.queueURL, AttributeNames=attributeNames
|
||||
)
|
||||
self.queueLength = int(
|
||||
self.queueAttributes['Attributes']['ApproximateNumberOfMessages']
|
||||
)
|
||||
return self.queueLength
|
||||
|
||||
def _receiveSQSMessage(
|
||||
self, totalNumberOfMessages, maxNumberOfMessages=10
|
||||
):
|
||||
self.resp = []
|
||||
self.loops = int(
|
||||
math.ceil(totalNumberOfMessages / maxNumberOfMessages)
|
||||
)
|
||||
loopTrack = 0
|
||||
if totalNumberOfMessages <= 10:
|
||||
maxNumberOfMessages = totalNumberOfMessages
|
||||
else:
|
||||
maxNumberOfMessagesFinal = 10 - (
|
||||
(self.loops * maxNumberOfMessages) - totalNumberOfMessages
|
||||
)
|
||||
print(maxNumberOfMessagesFinal)
|
||||
if self.loops == 0:
|
||||
raise RuntimeError('No messages in the queue')
|
||||
for i in range(0, self.loops):
|
||||
if loopTrack == self.loops - 1 and totalNumberOfMessages > 10:
|
||||
maxNumberOfMessages = maxNumberOfMessagesFinal
|
||||
self.resp.append(
|
||||
self.sqs.receive_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MaxNumberOfMessages=maxNumberOfMessages,
|
||||
)
|
||||
)
|
||||
try:
|
||||
entries = [
|
||||
{
|
||||
'Id': msg['MessageId'],
|
||||
'ReceiptHandle': msg['ReceiptHandle'],
|
||||
}
|
||||
for msg in self.resp[i]['Messages']
|
||||
]
|
||||
self._deleteSQSMessages(entries)
|
||||
loopTrack += 1
|
||||
except KeyError:
|
||||
print("No messages in the queue")
|
||||
return self
|
||||
|
||||
def _extractMessageFromSQS(self, totalNumberOfMessages):
|
||||
self.extractedMessages = []
|
||||
self.receiptHandles = []
|
||||
try:
|
||||
for i in range(0, self.loops):
|
||||
_loops = len(self.resp[i]['Messages'])
|
||||
for j in range(0, _loops):
|
||||
if 'Messages' in self.resp[i]:
|
||||
self.extractedMessages.append(
|
||||
self.resp[i]['Messages'][j]['Body']
|
||||
)
|
||||
else:
|
||||
print('No messages in the queue')
|
||||
except KeyError:
|
||||
print('No messages in the queue key')
|
||||
return self
|
||||
|
||||
def _deleteSQSMessages(self, entries):
|
||||
self.respDelete = self.sqs.delete_message_batch(
|
||||
QueueUrl=self.queueURL, Entries=entries
|
||||
)
|
||||
if len(self.respDelete['Successful']) != len(entries):
|
||||
raise RuntimeError(
|
||||
f'Failed to delete messages: entries={entries!r}'
|
||||
f' resp={self.respDelete!r}'
|
||||
)
|
||||
|
||||
def _decodeMessages(self):
|
||||
if len(self.extractedMessages) == 0:
|
||||
print('No messages to process')
|
||||
else:
|
||||
for message in self.extractedMessages:
|
||||
decoded = base64.b64decode(message).decode()
|
||||
self.messages.append(decoded)
|
||||
return self
|
||||
|
||||
def receiveAllMessages(self, b64=True, _totalNumberOfMessages=None):
|
||||
if _totalNumberOfMessages is None:
|
||||
totalNumberOfMessages = self.getQueueLength()
|
||||
else:
|
||||
totalNumberOfMessages = _totalNumberOfMessages
|
||||
self._receiveSQSMessage(totalNumberOfMessages)
|
||||
self._extractMessageFromSQS(
|
||||
totalNumberOfMessages=totalNumberOfMessages
|
||||
)
|
||||
if b64:
|
||||
self._decodeMessages()
|
||||
else:
|
||||
self.messages = self.extractedMessages
|
||||
return self
|
||||
|
||||
def receiveNMessages(self, numberOfMessages, b64=True):
|
||||
self.receiveAllMessages(
|
||||
b64=b64, _totalNumberOfMessages=numberOfMessages
|
||||
)
|
||||
return self
|
||||
|
||||
# def generateOutput(self, outputType='json'):
|
||||
# if outputType == 'json':
|
||||
# self.output = json.dumps(self.messages)
|
||||
# return self.output
|
||||
|
||||
def savetoDisk(self, path):
|
||||
self.timeNow = datetime.now().strftime('%d-%m-%Y_%H:%M:%S')
|
||||
if len(self.messages) > 0:
|
||||
with open(f'{path}/{self.timeNow}.json', 'w+') as outputFile:
|
||||
json.dump(self.messages, outputFile)
|
||||
else:
|
||||
print('No messages to save')
|
||||
63
python/aws/sqs/sendToSQS.py
Normal file
63
python/aws/sqs/sendToSQS.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import boto3
|
||||
import base64
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.append(os.getcwd())
|
||||
from pullTrafficInfo import getTrafficInfo
|
||||
|
||||
|
||||
class sendToSQS(object):
|
||||
"""docstring for sendToSQS"""
|
||||
|
||||
def __init__(self, session, queueURL):
|
||||
super(sendToSQS, self).__init__()
|
||||
self.session = session
|
||||
self.sqs = session.client('sqs')
|
||||
self.queueURL = queueURL
|
||||
|
||||
@classmethod
|
||||
def createSession(cls, profileName, queueURL):
|
||||
session = boto3.Session(profile_name=profileName)
|
||||
return cls(session, queueURL)
|
||||
|
||||
def sendMessage(self, message, messageGroupId, b64=True, dedup=False):
|
||||
currentTime = datetime.now().strftime('%H:%M:%S.%f')
|
||||
if b64:
|
||||
message = (base64.b64encode(message.encode())).decode()
|
||||
if not dedup:
|
||||
dedupId = hashlib.md5((message + currentTime).encode()).hexdigest()
|
||||
msg = self.sqs.send_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MessageBody=message,
|
||||
MessageGroupId=messageGroupId,
|
||||
MessageDeduplicationId=dedupId,
|
||||
)
|
||||
else:
|
||||
msg = self.sqs.send_message(
|
||||
QueueUrl=self.queueURL,
|
||||
MessageBody=message,
|
||||
MessageGroupId=messageGroupId,
|
||||
)
|
||||
if msg is not None:
|
||||
print(msg['MessageId'])
|
||||
|
||||
|
||||
# inst = sendToSQS.createSession(
|
||||
# profileName='plex-aws',
|
||||
# queueURL='https://sqs.eu-west-1.amazonaws.com'
|
||||
# '/745437999005/slack-bot.fifo',
|
||||
# )
|
||||
|
||||
# instM = (
|
||||
# getTrafficInfo.getTrafficURL('M62')
|
||||
# .findIncidents()
|
||||
# .getIncidentInformation()
|
||||
# .generateOutput()
|
||||
# )
|
||||
|
||||
# for _ in range(0, 5):
|
||||
# for item in instM.output:
|
||||
# inst.sendMessage(message=item, messageGroupId='slack-bot-M62')
|
||||
Reference in New Issue
Block a user