v0.1 of send and receive scripts

This commit is contained in:
2019-10-06 23:04:08 +01:00
parent 43984913b8
commit a843d35c7b
15 changed files with 279 additions and 28 deletions

View File

@@ -0,0 +1,70 @@
from bs4 import BeautifulSoup
from selenium import webdriver
import emoji
from datetime import datetime
import re
url = 'https://www.trafficdelays.co.uk/m62-traffic-delays/'
# prepare the option for the chrome driver
options = webdriver.ChromeOptions()
options.add_argument('headless')
# start the chrome driver
browser = webdriver.Chrome(options=options)
browser.get(url)
html = browser.page_source
congestion = browser.find_element_by_xpath('//*[@id="congestion"]')
accident = browser.find_element_by_xpath('//*[@id="accident"]')
congestionText = congestion.text
congestionHTML = congestion.get_attribute('innerHTML')
congestionCount = congestionHTML.count('<li>')
pattern = r".*\<li\>.*title=\".*\".*\>((.|\n)*?)\<br\>"
congestionRegexExtraction = re.findall(pattern, congestionHTML)
accidentText = accident.text
accidentHTML = accident.get_attribute('innerHTML')
accidentCount = accidentHTML.count('<li>')
accidentRegexExtraction = re.findall(pattern, accidentHTML)
def printBreak():
print('\n')
currentTime = datetime.now().strftime('%H:%M:%S')
print(emoji.emojize('Did someone say M62 :anguished:!? Let'
'\'s check the latest updates from Highways'
f' England! as of {currentTime}'
':police_car::rotating_light:',
use_aliases=True))
printBreak()
if congestionCount == 0:
print(emoji.emojize(f'There are currently no reported congestions on the'
f' M62 :thinking_face:', use_aliases=True))
if congestionCount != 0:
print(emoji.emojize(f'There are currently {congestionCount} incident(s)'
f' on the M62 :scream:', use_aliases=True))
for i in range(0, congestionCount):
print(congestionRegexExtraction[i][0] + '\n')
printBreak()
if accidentCount == 0:
print(emoji.emojize(f'There are currently no reported accidents on the'
f' M62 :thinking_face:', use_aliases=True))
if accidentCount != 0:
print(emoji.emojize(f'There are currently {accidentCount} incident(s)'
f' on the M62 :scream:', use_aliases=True))
for i in range(0, accidentCount):
print(accidentRegexExtraction[i][0] + '\n')
printBreak()
print(f'Hey Andy, have you thought about getting the train?'
+ emoji.emojize(f' :bullettrain_front:', use_aliases=True))
print(f'Hey Andy, maybe flying would be quicker?'
+ emoji.emojize(f' :helicopter:', use_aliases=True))
print(f'Don\'t fret, he can always work from home!'
+ emoji.emojize(f' :house_with_garden:', use_aliases=True))
browser.quit()

View File

@@ -0,0 +1,71 @@
from bs4 import BeautifulSoup
from selenium import webdriver
import emoji
from datetime import datetime
import re
url = 'https://www.trafficdelays.co.uk/southern-england/'
# prepare the option for the chrome driver
options = webdriver.ChromeOptions()
options.add_argument('headless')
# start the chrome driver
browser = webdriver.Chrome(options=options)
browser.get(url)
html = browser.page_source
soup = BeautifulSoup(html, features='lxml')
# soup.find_all(class_='alerts-severity-Severe')
def printBreak():
print('\n')
# table = soup.find_all('td')
# list = []
# for item in table:
# list.append(item)
# for i in range(0, 4):
# print(list[i])
# printBreak()
# totalItems = int(len(list) / 4)
# for i in range(0, 4):
# print(list[i].string)
# printBreak()
# newList = ([x.text for x in soup.find_all('td')])
currentTime = datetime.now().strftime('%H:%M:%S')
print(emoji.emojize('Did someone say M62 :anguished:!? Let'
'\'s check the latest updates from Highways'
' England! :police_car::rotating_light:',
use_aliases=True))
print(f'As of {currentTime}, there is currently {newList[2]} {newList[1]}'
f' on the {newList[0]}')
# print(list[3].prettify())
desc = str(list[3])
descSplit = desc.split('<')
pattern = r"\>(.+?)\s+?\:\s+?(.+?)$"
for item in descSplit:
# print(item)
regex = re.findall(pattern, item)
# print(f'Type: {type(regex)}')
# print(f'Matches: {regex}')
try:
printBreak()
print(f'itemOne: {regex[0][0]}')
print(f'itemTwo: {regex[0][1]}')
except IndexError:
pass
# print(f'itemTwo: {regex[1]}')

View File

@@ -0,0 +1,64 @@
import boto3
import os
import sys
sys.path.append(os.getcwd())
sys.path.append('/home/dtomlinson/projects/slack-bot/traffic-scraper/prd')
from pullTrafficInfo import getTrafficInfo
import base64
# import emoji
import hashlib
from datetime import datetime
motorway = 'A3'
currentTime = datetime.now().strftime('%H:%M:%S')
session = boto3.Session(profile_name='plex-aws')
sqs = session.client('sqs')
# queue = sqs.create_queue(QueueName='test', Attributes={'DelaySeconds': '5'})
inst = getTrafficInfo.getTrafficURL(motorway).findIncidents() \
.getIncidentInformation().generateOutput()
# encoded = base64.b64encode(inst.output[0].encode())
# decoded = base64.b64decode(encoded).decode()
queueURL = 'https://sqs.eu-west-1.amazonaws.com/745437999005/slack-bot.fifo'
# message = sqs.send_message(QueueUrl=queueURL,
# MessageBody='string',
# MessageGroupId='slack-bot-motorway')
# response = sqs.receive_message(QueueUrl=queueURL,
# MaxNumberOfMessages=5)
# if 'Messages' in response:
# for message in response['Messages']:
# print(message['Body'])
# else:
# print('Queue is empty')
# for item in inst.output:
# print(item)
# encoded = (base64.b64encode(item.encode())).decode()
# dedupId = hashlib.md5((item + currentTime).encode()).hexdigest()
# msg = sqs.send_message(QueueUrl=queueURL,
# MessageBody=encoded,
# MessageGroupId=f'slack-bot-{motorway}',
# MessageDeduplicationId=dedupId)
# if msg is not None:
# print(msg["MessageId"])
# # print(encoded)
# # print(encoded.encode())
# # print(base64.b64decode(encoded.encode()).decode())
response = sqs.receive_message(QueueUrl=queueURL,
MaxNumberOfMessages=10)
if 'Messages' in response:
for message in response['Messages']:
# print(message['Body'])
# break
decoded = base64.b64decode(message['Body'].encode())
print(decoded.decode())
else:
print('Queue is empty')

View File

@@ -0,0 +1,164 @@
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from datetime import datetime
import re
import emoji
import random
class getTrafficInfo(object):
"""
Pulls the latest traffic information for a major road or
motorway from trafficdelays.co.uk.
Parameters:
motorway: str -> a string containing the A-road or motorway
driver: str -> the browser/driver to run under (chrome default)
Usage:
** required **
Create a driver for your motorway:
getTrafficInfo.getTrafficURL(motorway, driver)
**options**
Get information for incident types:
getTrafficInfo.findIncidents()
Get HTML from the page:
getTrafficInfo.getIncidentHTML()
Get a count of how many incidents for each type:
getTrafficInfo.getIncidentCount()
Get the text detailing the incident:
getTrafficInfo.getIncidentInformation()
Generate a sequential list for an output:
getTrafficInfo.getOutput()
"""
def __init__(self, browser, motorway):
super(getTrafficInfo, self).__init__()
self.browser = browser
self.motorway = motorway
self.incidentTypes = ['congestion', 'accident']
# def __del__(self):
# print('Quitting')
# self.browser.quit()
@classmethod
def getTrafficURL(cls, motorway, driver='chrome'):
url = ('https://www.trafficdelays.co.uk/' + motorway.lower()
+ '-traffic-delays')
browser = getTrafficInfo.getWebDriver(driver, url)
getTrafficInfo.verfiyMotorway(browser, motorway)
return cls(browser, motorway)
@staticmethod
def getWebDriver(driver, url):
driver = driver.lower()
if driver == 'chrome':
# Prepare the option for the Chromedriver
options = webdriver.ChromeOptions()
options.add_argument('headless')
# Start the Chromedriver
browser = webdriver.Chrome(options=options)
browser.get(url)
return browser
else:
raise Exception(f'Driver {driver} not supported')
@staticmethod
def verfiyMotorway(browser, motorway):
try:
verify = browser.find_element_by_xpath('/html/body/div[1]/div/div/'
'div/section/div/div/'
'div[1]')
if verify.text in ('It looks like the link pointing here '
'was faulty. Maybe try searching?'):
raise Exception(f'No traffic information available for'
f' {motorway}')
except NoSuchElementException:
pass
def findIncidents(self):
self.incidentBrowser = []
for item in self.incidentTypes:
xpath = f'//*[@id="{item}"]'
self.incidentBrowser.append(self.browser.find_element_by_xpath
(xpath))
self.getIncidentHTML()
return self
def getIncidentHTML(self):
self.incidentHTML = []
for item in self.incidentBrowser:
self.incidentHTML.append(item.get_attribute('innerHTML'))
return self
def getIncidentCount(self):
self.incidentCount = []
for item, i in zip(self.incidentBrowser,
range(0, len(self.incidentHTML))):
self.incidentCount.append(self.incidentHTML[i].count('<li>'))
return self
def getIncidentInformation(self):
self.incidentInformation = []
pattern = r".*\<li\>.*title=\".*\".*\>((.|\n)*?)\<br\>"
for item in self.incidentHTML:
self.incidentInformation.append(re.findall(pattern, item))
return self
def generateOutput(self):
self.getIncidentCount()
self.output = []
self.sarcasticMessage = [(f'Hey Andy, have you thought about getting'
' the train?'
+ emoji.emojize(f' :bullettrain_front:',
use_aliases=True)),
(f'Hey Andy, maybe flying would be quicker?'
+ emoji.emojize(f' :helicopter:',
use_aliases=True)),
(f'Don\'t fret, Andy can always work from'
' home!'
+ emoji.emojize(f' :house_with_garden:',
use_aliases=True))]
currentTime = datetime.now().strftime('%H:%M:%S')
self.output.append('START')
self.output.append(emoji.emojize(f'Did someone say {self.motorway}!?'
' :anguished:'
' Let\'s check the latest updates'
' from Highways England as of'
f' {currentTime}!'
' :police_car::rotating_light:',
use_aliases=True))
for item, i in zip(self.incidentCount,
range(0, len(self.incidentTypes))):
if item == 0:
self.output.append(emoji.emojize
(f'There are currently no'
' reported'
f' {self.incidentTypes[i]} incidents on'
f' the {self.motorway} :thinking_face:',
use_aliases=True))
pass
else:
self.output.append(emoji.emojize
(f'There are currently'
f' {self.incidentCount[i]} reported'
f' {self.incidentTypes[i]} incidents'
f' reported on the {self.motorway}'
f' :scream:',
use_aliases=True))
self.output.append(self.incidentInformation[0][i][0])
self.output.append(random.choice(self.sarcasticMessage))
self.output.append('END')
return self
# inst = getTrafficInfo.getTrafficURL('A50').findIncidents() \
# .getIncidentInformation().generateOutput()
# for i in inst.output:
# print(i)

View File

@@ -0,0 +1,44 @@
import boto3
import base64
class receiveFromSQS(object):
"""docstring for receiveFromSQS"""
def __init__(self, session, queueURL):
super(receiveFromSQS, self).__init__()
self.session = session
self.sqs = session.client('sqs')
self.queueURL = queueURL
@classmethod
def createSession(cls, profileName, queueURL):
session = boto3.Session(profile_name=profileName)
return cls(session, queueURL)
def receiveMessage(self, b64=True, MaxNumberOfMessages=10, delete=False,
endString='END'):
while True:
response = self.sqs\
.receive_message(QueueUrl=self.queueURL,
MaxNumberOfMessages=MaxNumberOfMessages)
if 'Messages' in response:
for message in response['Messages']:
if b64:
decoded = base64.b64decode(message['Body'])
msg = decoded.decode()
print(msg)
if msg == endString:
return self
else:
msg = print(message['Body'])
if msg == endString:
return self
else:
print('Queue is empty')
break
inst = receiveFromSQS.createSession(profileName='plex-aws',
queueURL='https://sqs.eu-west-1.amazonaws'
'.com/745437999005/slack-bot.fifo')
inst.receiveMessage()

View File

@@ -0,0 +1,50 @@
import boto3
import base64
import hashlib
from datetime import datetime
import os
import sys
sys.path.append(os.getcwd())
from pullTrafficInfo import getTrafficInfo
class sendToSQS(object):
"""docstring for sendToSQS"""
def __init__(self, session, queueURL):
super(sendToSQS, self).__init__()
self.session = session
self.sqs = session.client('sqs')
self.queueURL = queueURL
@classmethod
def createSession(cls, profileName, queueURL):
session = boto3.Session(profile_name=profileName)
return cls(session, queueURL)
def sendMessage(self, message, messageGroupId, b64=True, dedup=True):
currentTime = datetime.now().strftime('%H:%M:%S')
if b64:
message = (base64.b64encode(message.encode())).decode()
if dedup:
dedupId = hashlib.md5((message + currentTime).encode()).hexdigest()
msg = self.sqs.send_message(QueueUrl=self.queueURL,
MessageBody=message,
MessageGroupId=messageGroupId,
MessageDeduplicationId=dedupId)
else:
msg = self.sqs.send_message(QueueUrl=self.queueURL,
MessageBody=message,
MessageGroupId=messageGroupId)
if msg is not None:
print(msg["MessageId"])
inst = sendToSQS.createSession(profileName='plex-aws',
queueURL='https://sqs.eu-west-1.amazonaws.com'
'/745437999005/slack-bot.fifo')
instM = getTrafficInfo.getTrafficURL('M62').findIncidents() \
.getIncidentInformation().generateOutput()
for item in instM.output:
inst.sendMessage(message=item, messageGroupId='slack-bot-M62', dedup=True)