Files
python-VM/slack-bot/traffic-scraper/prd/pullTrafficInfo.py
2019-12-05 03:56:49 +00:00

205 lines
6.5 KiB
Python

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from datetime import datetime
import re
import emoji
import random
import json
import os
from chromedriver_py import binary_path
class getTrafficInfo(object):
"""
Pulls the latest traffic information for a major road or
motorway from trafficdelays.co.uk.
Parameters:
motorway: str -> a string containing the A-road or motorway
driver: str -> the browser/driver to run under (chrome default)
Usage:
** required **
Create a driver for your motorway:
getTrafficInfo.getTrafficURL(motorway, driver)
**options**
Get information for incident types:
getTrafficInfo.findIncidents()
Get HTML from the page:
getTrafficInfo.getIncidentHTML()
Get a count of how many incidents for each type:
getTrafficInfo.getIncidentCount()
Get the text detailing the incident:
getTrafficInfo.getIncidentInformation()
Generate a sequential list for an output:
getTrafficInfo.getOutput()
"""
def __init__(self, browser, motorway):
super(getTrafficInfo, self).__init__()
self.browser = browser
self.motorway = motorway
self.incidentTypes = ['congestion', 'accident']
def __del__(self):
print('Quitting')
self.browser.quit()
@classmethod
def getTrafficURL(cls, motorway, driver='chrome'):
url = (
'https://www.trafficdelays.co.uk/'
+ motorway.lower()
+ '-traffic-delays'
)
browser = getTrafficInfo.getWebDriver(driver, url)
getTrafficInfo.verfiyMotorway(browser, motorway)
return cls(browser, motorway)
@staticmethod
def getWebDriver(driver, url):
driver = driver.lower()
if driver == 'chrome':
# Prepare the option for the Chromedriver
options = webdriver.ChromeOptions()
options.add_argument('headless')
# Start the Chromedriver
browser = webdriver.Chrome(
executable_path=binary_path,
options=options,
)
browser.get(url)
return browser
else:
raise Exception(f'Driver {driver} not supported')
@staticmethod
def verfiyMotorway(browser, motorway):
try:
verify = browser.find_element_by_xpath(
'/html/body/div[1]/div/div/' 'div/section/div/div/' 'div[1]'
)
if verify.text in (
'It looks like the link pointing here '
'was faulty. Maybe try searching?'
):
raise Exception(
f'No traffic information available for' f' {motorway}'
)
except NoSuchElementException:
pass
def findIncidents(self):
self.incidentBrowser = []
for item in self.incidentTypes:
xpath = f'//*[@id="{item}"]'
self.incidentBrowser.append(
self.browser.find_element_by_xpath(xpath)
)
self.getIncidentHTML()
return self
def getIncidentHTML(self):
self.incidentHTML = []
for item in self.incidentBrowser:
self.incidentHTML.append(item.get_attribute('innerHTML'))
return self
def getIncidentCount(self):
self.incidentCount = []
for item, i in zip(
self.incidentBrowser, range(0, len(self.incidentHTML))
):
self.incidentCount.append(self.incidentHTML[i].count('<li>'))
return self
def getIncidentInformation(self):
self.incidentInformation = []
pattern = r".*\<li\>.*title=\".*\".*\>((.|\n)*?)\<br\>"
for item in self.incidentHTML:
self.incidentInformation.append(re.findall(pattern, item))
return self
def generateOutput(self):
self.getIncidentCount()
self.output = []
self.sarcasticMessage = [
(
f'Hey Andy, have you thought about getting'
' the train?'
+ emoji.emojize(f' :bullettrain_front:', use_aliases=True)
),
(
f'Hey Andy, maybe flying would be quicker?'
+ emoji.emojize(f' :helicopter:', use_aliases=True)
),
(
f'Don\'t fret, Andy can always work from'
' home!'
+ emoji.emojize(f' :house_with_garden:', use_aliases=True)
),
]
currentTime = datetime.now().strftime('%H:%M:%S')
# self.output.append('START')
self.output.append(
emoji.emojize(
' Let\'s check the latest updates'
' from Highways England as of'
f' {currentTime}!'
' :police_car::rotating_light:',
use_aliases=True,
)
)
for item, i in zip(
self.incidentCount, range(0, len(self.incidentTypes))
):
if item == 0:
self.output.append(
emoji.emojize(
f'There are currently no'
' reported'
f' {self.incidentTypes[i]} incidents on'
f' the {self.motorway} :thinking_face:',
use_aliases=True,
)
)
pass
else:
self.output.append(
emoji.emojize(
f'There are currently'
f' {self.incidentCount[i]} reported'
f' {self.incidentTypes[i]} incidents'
f' on the {self.motorway}'
f' :scream:',
use_aliases=True,
)
)
try:
self.output.append(self.incidentInformation[i][0][0])
except IndexError:
pass
self.output.append(random.choice(self.sarcasticMessage))
# self.output.append('END')
return self
def savetoDisk(self, path=os.getcwd()):
self.timeNow = datetime.now().strftime('%d-%m-%Y_%H:%M:%S')
if len(self.output) > 0:
with open(f'{path}/{self.timeNow}.json', 'w+') as outputFile:
json.dump(self.output, outputFile)
else:
print('No messages to save')
# inst = getTrafficInfo.getTrafficURL('M62').findIncidents() \
# .getIncidentInformation().generateOutput()
# for i in inst.output:
# print(i)