28 Commits

Author SHA1 Message Date
5505dbf24a adding docker commands for spark 2021-09-26 05:34:30 +01:00
2a1c4fe68e adding spark runner 2021-09-26 05:33:39 +01:00
7962f40e32 adding initial docs 2021-09-26 01:34:06 +01:00
2a43ea1946 adding download script for data set 2021-09-26 01:18:23 +01:00
07d176be79 updating .gitignore 2021-09-26 01:10:53 +01:00
f804e85cc3 updating .gitignore 2021-09-26 01:10:32 +01:00
9fdc6dce05 migrate beam pipeline to main.py 2021-09-26 01:10:09 +01:00
54cf5e3e36 updating prospector.yaml 2021-09-26 01:09:48 +01:00
2e42a453b0 adding latest beam pipeline code 2021-09-25 22:15:58 +01:00
adfbd8e93d updating prospector.yaml 2021-09-25 22:15:46 +01:00
1bd54f188d updating folder structure for data input/output 2021-09-25 22:15:39 +01:00
a7c52b1085 updating .gitignore 2021-09-25 22:15:10 +01:00
24420c8935 adding latest beam pipeline code 2021-09-25 18:25:08 +01:00
44f346deff Merge remote-tracking branch 'origin/develop' into develop 2021-09-25 16:48:04 +01:00
a37e7817c3 adding latest beam pipeline code 2021-09-25 16:47:52 +01:00
539e4c7786 adding latesty beam pipeline code 2021-09-25 16:47:17 +01:00
214ce77d8f adding debug.py 2021-09-25 16:47:08 +01:00
47a4ac4bc3 adding latest beam pipeline code 2021-09-25 01:44:37 +01:00
aa61ea9c57 adding latest beam pipeline code 2021-09-25 00:48:41 +01:00
94cc22a385 adding data exploration report + code 2021-09-25 00:48:34 +01:00
a05182892a adding documentation 2021-09-25 00:48:13 +01:00
c38a10ca2f updating beam to install gcp extras 2021-09-25 00:48:04 +01:00
ab993fc030 adding prospector.yaml 2021-09-25 00:47:49 +01:00
9cf5662600 updating notes 2021-09-24 16:52:40 +01:00
0f262daf39 adding report.py 2021-09-24 16:52:34 +01:00
972d0a852a updating dev dependencies 2021-09-24 16:52:25 +01:00
5301d5ff04 updating .gitignore 2021-09-24 16:51:56 +01:00
6db2fa59b9 adding initial skeleton 2021-09-24 16:01:03 +01:00
20 changed files with 4781 additions and 0 deletions

145
.gitignore vendored Normal file
View File

@@ -0,0 +1,145 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# USER
# ignore the data csv files for input/output
**/data/**/*.csv
**/data/**/*.json
.vscode/settings.json

View File

@@ -0,0 +1,5 @@
"""
Daniel Tomlinson (dtomlinson@panaetius.co.uk).
Technical test for Street Group.
"""

View File

View File

@@ -0,0 +1,24 @@
import apache_beam as beam
class DebugShowEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
yield element
return None
class DebugShowColumnWithValueIn(beam.DoFn):
def __init__(self, index, value):
self.index = index
self.value = value
def process(self, element):
column = element[self.index]
if self.value in column:
yield element
return None

313
analyse_properties/main.py Normal file
View File

@@ -0,0 +1,313 @@
import csv
from datetime import datetime
import hashlib
import io
from importlib import resources
import itertools
import pathlib
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
"""Read in a csv file."""
return csv.reader(io.TextIOWrapper(csv_file.open()))
def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges."""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
"""If a given item in a list is empty, drop this entry from the PCollection."""
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""If two given items in a list are both empty, drop this entry from the PCollection."""
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
"""Split an item in a list into two separate items in the PCollection."""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
"""
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element):
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode())
# append the hash to the end
element.append(hashed_string.hexdigest())
yield element
class DeduplicateByID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
"""
def process(self, element):
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
def process(self, element):
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
class ConvertDataToDict(beam.DoFn):
"""Convert the processed data into a dict to be exported as a JSON object."""
@staticmethod
def get_latest_transaction(transaction_dates):
"""Get the date of the latest transaction."""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element):
# Group together all the transactions for the property.
property_transactions = [
{
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
}
for entry in element[-1]
]
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
# "readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
}
# Create a human readable address to go in the dict.
# json_object["readable_address"] = self.get_readable_address(
# [
# json_object["flat_appartment"],
# json_object["builing"],
# f'{json_object["number"]} {json_object["street"]}',
# json_object["postcode"],
# ],
# [
# json_object["locality"],
# json_object["town"],
# json_object["district"],
# json_object["county"],
# ],
# )
yield json_object
def main():
# Load in the data from a csv file.
# csv_data = resources.path(
# "analyse_properties.data.input",
# "pp-monthly-update-new-version.csv"
# # "analyse_properties.data.input",
# # "pp-complete.csv",
# )
options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
)
with beam.Pipeline(options=options) as pipeline:
# Load the data
# with csv_data as csv_data_file:
# # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
# load = (
# pipeline
# | fileio.MatchFiles(str(csv_data_file))
# | fileio.ReadMatches()
# | beam.FlatMap(csv_reader)
# )
load = pipeline | beam.Create(
[
"🍓Strawberry,🥕Carrot,🍆Eggplant",
"🍅Tomato,🥔Potato",
]
)
# Clean the data by dropping unneeded rows.
# clean_drop = (
# load
# | "Drop unneeded columns"
# >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
# | "Convert to Upper Case"
# >> beam.Map(lambda element: [e.upper() for e in element])
# | "Strip leading/trailing whitespace"
# >> beam.Map(lambda element: [e.strip() for e in element])
# | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
# | "Drop empty PAON if missing SAON"
# >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | "Split PAON into two columns if separated by comma"
# >> beam.ParDo(SplitColumn(3, ","))
# )
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
# clean_deduplicate = (
# clean_drop
# | "Generate unique ID for all columns"
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
# | "Group by the ID for all columns"
# >> beam.GroupBy(lambda element: element[-1])
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
# )
# # Prepare the data by generating an ID using the uniquely identifying information only
# # and grouping them by this ID.
# prepare = (
# clean_deduplicate
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
# | "Generate unique ID ignoring price & date"
# >> beam.ParDo(GenerateUniqueID())
# | "Group by the ID ignoring price & date"
# >> beam.GroupBy(lambda element: element[-1])
# )
# # Format the data into a dict.
# formatted = (
# prepare
# | "Convert the prepared data into a dict object"
# >> beam.ParDo(ConvertDataToDict())
# )
# # Save the data to a .json file.
# output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
# # output_file = "/tmp/file"
# (
# formatted
# | "Combine into one PCollection" >> beam.combiners.ToList()
# | beam.Map(print)
# # | "Save to .json file"
# # >> beam.io.WriteToText(
# # file_path_prefix=str(output_file),
# # file_name_suffix=".json",
# # shard_name_template="",
# # )
# )
if __name__ == "__main__":
main()

5
docs/index.md Normal file
View File

@@ -0,0 +1,5 @@
# Welcome
## Introduction
This documentation accompanies the technical test for the Street Group.

5
download_data.sh Executable file
View File

@@ -0,0 +1,5 @@
# Full data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input
# Monthly update data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input

File diff suppressed because one or more lines are too long

35
exploration/report.py Normal file
View File

@@ -0,0 +1,35 @@
from importlib import resources
import pandas as pd
from pandas_profiling import ProfileReport
def main():
with resources.path("analyse_properties.data", "pp-complete.csv") as csv_file:
df_report = pd.read_csv(
csv_file,
names=[
"transaction_id",
"price",
"date_of_transfer",
"postcode",
"property_type",
"old_new",
"duration",
"paon",
"saon",
"street",
"locality",
"town_city",
"district",
"county",
"ppd_category",
"record_status",
],
)
profile = ProfileReport(df_report, title="Price Paid Data", minimal=True)
profile.to_file("price_paid_data_report.html")
if __name__ == "__main__":
main()

40
mkdocs.yaml Normal file
View File

@@ -0,0 +1,40 @@
site_name: The Street Group Technical Test
repo_url: https://github.com/dtomlinson91/street_group_tech_test
use_directory_urls: false
nav:
- Documentation:
- Welcome: index.md
# - Installation: documentation/installation.md
# - Usage: documentation/usage.md
# - Comments and Caveats:
# - Introduction: comments_caveats/introduction.md
# - Time limit: comments_caveats/time_limit.md
# - Third party libraries: comments_caveats/third_party_libraries.md
# - Areas of improvement and comments: comments_caveats/area_of_improvement_comments.md
# - Similar names algorithm: comments_caveats/similar_names.md
# - Reference:
# - deduplicator.main: reference/api_documentation_main.md
# - Changelog: changelog/changelog.md
theme:
name: material
palette:
primary: indigo
accent: blue
feature:
tabs: true
markdown_extensions:
- admonition
- codehilite:
guess_lang: true
- toc:
permalink: true
- pymdownx.superfences
# - pymdownx.arithmatex:
# generic: true
plugins:
- search:
lang: en
extra_javascript:
- javascripts/config.js
- https://polyfill.io/v3/polyfill.min.js?features=es6
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js

View File

@@ -0,0 +1,14 @@
docker run --rm \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest
docker pull apache/beam_spark_job_server:2.33.0_rc1
docker run --rm \
-e SPARK_DRIVER_MEMORY=8g \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest

View File

@@ -0,0 +1,7 @@
# Answers
## CSV
Read a csv file into Beam:
<https://stackoverflow.com/a/41171867>

View File

@@ -0,0 +1,12 @@
# Beam Documentation
## Transforms
FlatMap:
<https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/>
I/O Transforms:
<https://beam.apache.org/documentation/io/built-in/>

5
notes/links.md Normal file
View File

@@ -0,0 +1,5 @@
# Links
## Data
https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads

2628
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

78
prospector.yaml Normal file
View File

@@ -0,0 +1,78 @@
# This will enable almost every single warning
output-format: vscode
doc-warnings: true
# allow-shorthand: false
strictness: none
ignore-patterns:
- (^|/)\..+
pylint:
run: true
disable:
- fixme
- bad-continuation
- missing-module-docstring
- logging-fstring-interpolation
- missing-function-docstring
- abstract-method
- missing-class-docstring
- super-init-not-called
- arguments-differ
- inconsistent-return-statements
enable:
options:
max-locals: 15
max-returns: 6
max-branches: 12
max-statements: 50
# max-parents: 7
max-attributes: 20
min-public-methods: 0
max-public-methods: 25
max-module-lines: 1000
max-line-length: 88
mccabe:
run: true
options:
max-complexity: 10
pep8:
run: true
options:
max-line-length: 88
single-line-if-stmt: n
disable:
- E501 # line too long
pyroma:
run: false
disable:
- PYR19
- PYR16
pep257:
disable:
- D000
- D203
# - D213
- D212 # multiline docstrings on first line only.
- D404
- D100
- D407 # numpy docstring format
- D107 # missing docstring in __init__
# Docstrings ending with newlines and : in Returns block.
- D413
- D406
- D103
- D101 # missing docstring in public class
- D102 # missing docstring in public method
pyflakes:
disable:
- F401 # unused import
dodgy:
run: true

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[tool.poetry]
name = "analyse_properties"
version = "0.1.0"
description = ""
authors = ["Daniel Tomlinson <dtomlinson@panaetius.co.uk>"]
[tool.poetry.dependencies]
python = "^3.7"
apache-beam = {extras = ["gcp"], version = "^2.32.0"}
mkdocs = "^1.2.2"
mkdocs-material = "^7.3.0"
[tool.poetry.dev-dependencies]
# pytest = "^5.2"
prospector = "^1.5.1"
pandas-profiling = "^3.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

6
requirements-docs.txt Normal file
View File

@@ -0,0 +1,6 @@
apache-beam==2.32.0; python_version >= "3.6"
avro-python3==1.9.2.1; python_version >= "3.6"
cachetools==4.2.2; python_version >= "3.6" and python_version < "4.0" and (python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.6.0")
certifi==2021.5.30; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3.6"
mkdocs-material==7.3.0
mkdocs==1.2.2; python_version >= "3.6"