40 Commits

Author SHA1 Message Date
8e8469579e updating beam pipeline to use GroupByKey 2021-09-26 20:29:11 +01:00
4e3771c728 adding latest beam pipeline code for dataflow 2021-09-26 17:15:35 +01:00
8856a9763f updating prospector.yaml 2021-09-26 17:15:24 +01:00
fded858932 moving dataflow notes 2021-09-26 17:15:17 +01:00
bb71d55f8c adding latest beam pipeline code for dataflow 2021-09-26 16:23:19 +01:00
8047b5ced4 adding latest beam pipeline code for dataflow 2021-09-26 16:16:58 +01:00
9f53c66975 adding latest beam pipeline code for dataflow 2021-09-26 15:57:00 +01:00
e6ec110d54 updating download data script for new location in GCS 2021-09-26 14:56:53 +01:00
83807616e0 Merge branch 'wip/pathlib' into develop 2021-09-26 14:56:21 +01:00
7f874fa6f6 updating README.md 2021-09-26 14:56:01 +01:00
b8a997084d removing requirements.txt for documentation 2021-09-26 14:55:48 +01:00
c4e81065b1 adding pyenv 3.7.9 2021-09-26 14:55:05 +01:00
62bd0196ad removing shard_name_template from output file 2021-09-26 06:11:42 +01:00
7f9b7e4bfd moving inputs/outputs to use pathlib 2021-09-26 06:03:55 +01:00
7962f40e32 adding initial docs 2021-09-26 01:34:06 +01:00
2a43ea1946 adding download script for data set 2021-09-26 01:18:23 +01:00
07d176be79 updating .gitignore 2021-09-26 01:10:53 +01:00
f804e85cc3 updating .gitignore 2021-09-26 01:10:32 +01:00
9fdc6dce05 migrate beam pipeline to main.py 2021-09-26 01:10:09 +01:00
54cf5e3e36 updating prospector.yaml 2021-09-26 01:09:48 +01:00
2e42a453b0 adding latest beam pipeline code 2021-09-25 22:15:58 +01:00
adfbd8e93d updating prospector.yaml 2021-09-25 22:15:46 +01:00
1bd54f188d updating folder structure for data input/output 2021-09-25 22:15:39 +01:00
a7c52b1085 updating .gitignore 2021-09-25 22:15:10 +01:00
24420c8935 adding latest beam pipeline code 2021-09-25 18:25:08 +01:00
44f346deff Merge remote-tracking branch 'origin/develop' into develop 2021-09-25 16:48:04 +01:00
a37e7817c3 adding latest beam pipeline code 2021-09-25 16:47:52 +01:00
539e4c7786 adding latesty beam pipeline code 2021-09-25 16:47:17 +01:00
214ce77d8f adding debug.py 2021-09-25 16:47:08 +01:00
47a4ac4bc3 adding latest beam pipeline code 2021-09-25 01:44:37 +01:00
aa61ea9c57 adding latest beam pipeline code 2021-09-25 00:48:41 +01:00
94cc22a385 adding data exploration report + code 2021-09-25 00:48:34 +01:00
a05182892a adding documentation 2021-09-25 00:48:13 +01:00
c38a10ca2f updating beam to install gcp extras 2021-09-25 00:48:04 +01:00
ab993fc030 adding prospector.yaml 2021-09-25 00:47:49 +01:00
9cf5662600 updating notes 2021-09-24 16:52:40 +01:00
0f262daf39 adding report.py 2021-09-24 16:52:34 +01:00
972d0a852a updating dev dependencies 2021-09-24 16:52:25 +01:00
5301d5ff04 updating .gitignore 2021-09-24 16:51:56 +01:00
6db2fa59b9 adding initial skeleton 2021-09-24 16:01:03 +01:00
18 changed files with 4813 additions and 1 deletions

145
.gitignore vendored Normal file
View File

@@ -0,0 +1,145 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# USER
# ignore the data csv files for input/output
**/data/**/*.csv
**/data/**/*.json
.vscode/settings.json

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.7.9

View File

@@ -1,2 +1,9 @@
# street_group_tech_test
Technical Test for Street Group
Technical Test for Street Group for Daniel Tomlinson.
## Documentation
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
https://dtomlinson91.github.io/street_group_tech_test/

View File

@@ -0,0 +1,5 @@
"""
Daniel Tomlinson (dtomlinson@panaetius.co.uk).
Technical test for Street Group.
"""

View File

@@ -0,0 +1,24 @@
import apache_beam as beam
class DebugShowEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
yield element
return None
class DebugShowColumnWithValueIn(beam.DoFn):
def __init__(self, index, value):
self.index = index
self.value = value
def process(self, element):
column = element[self.index]
if self.value in column:
yield element
return None

331
analyse_properties/main.py Normal file
View File

@@ -0,0 +1,331 @@
import argparse
from datetime import datetime
import hashlib
import itertools
import json
import logging
import pathlib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def slice_by_range(element, *ranges):
"""
Slice a list with multiple ranges.
Args:
element : The element.
*ranges (tuple): Tuples containing a start,end index to slice the element.
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
Returns:
list: The list sliced by the ranges
"""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
"""
Drop the entire row if a given column is empty.
Args:
element : The element
Returns:
None: If the length of the column is 0, drop the element.
Yields:
element: If the length of the column isn't 0, keep the element.
"""
column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""If two given items in a list are both empty, drop this entry from the PCollection."""
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
"""Split an item in a list into two separate items in the PCollection."""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
"""
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element):
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode())
# add the hash as a key to the data.
new_element = (hashed_string.hexdigest(), list(element))
yield new_element
class DeduplicateByID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
"""
def process(self, element):
if len(list(element[1])) > 0:
deduplicated_element = (element[0], [list(element[1])[0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
def process(self, element):
element_no_id = element[-1][0]
yield element_no_id
class ConvertDataToDict(beam.DoFn):
"""Convert the processed data into a dict to be exported as a JSON object."""
@staticmethod
def get_latest_transaction(transaction_dates):
"""Get the date of the latest transaction."""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element):
# Group together all the transactions for the property.
property_transactions = [
{
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
}
for entry in element[-1]
]
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
# "readable_address": None,
"flat_appartment": list(element[-1])[0][4],
"builing": list(element[-1])[0][10],
"number": list(element[-1])[0][3],
"street": list(element[-1])[0][5],
"locality": list(element[-1])[0][6],
"town": list(element[-1])[0][7],
"district": list(element[-1])[0][8],
"county": list(element[-1])[0][9],
"postcode": list(element[-1])[0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
}
# Create a human readable address to go in the dict.
# json_object["readable_address"] = self.get_readable_address(
# [
# json_object["flat_appartment"],
# json_object["builing"],
# f'{json_object["number"]} {json_object["street"]}',
# json_object["postcode"],
# ],
# [
# json_object["locality"],
# json_object["town"],
# json_object["district"],
# json_object["county"],
# ],
# )
yield json_object
def run(argv=None, save_main_session=True):
"""Entrypoint and definition of the pipeline."""
# Default input/output files
input_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-monthly-update-new-version.csv"
)
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-monthly-update-new-version"
)
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input", dest="input", default=str(input_file), help="Input file."
)
parser.add_argument(
"--output", dest="output", default=str(output_file), help="Output file."
)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options. save_main_session needed for DataFlow for global imports.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# Load in the data from a csv file.
with beam.Pipeline(options=pipeline_options) as pipeline:
# Load the data
load = (
pipeline
| "Read input data" >> beam.io.ReadFromText(known_args.input)
| "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element])
)
# Clean the data by dropping unneeded rows.
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupByKey()
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
)
# Prepare the data by generating an ID using the uniquely identifying
# information only and grouping them by this ID.
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupByKey()
# | beam.Map(print)
)
# Format the data into a dict.
formatted = (
prepare
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# Save the data to a .json file.
(
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps)
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=known_args.output,
file_name_suffix=".json",
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()

5
docs/index.md Normal file
View File

@@ -0,0 +1,5 @@
# Welcome
## Introduction
This documentation accompanies the technical test for the Street Group.

5
download_data.sh Executable file
View File

@@ -0,0 +1,5 @@
# Full data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input
# Monthly update data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input

File diff suppressed because one or more lines are too long

35
exploration/report.py Normal file
View File

@@ -0,0 +1,35 @@
from importlib import resources
import pandas as pd
from pandas_profiling import ProfileReport
def main():
with resources.path("analyse_properties.data", "pp-complete.csv") as csv_file:
df_report = pd.read_csv(
csv_file,
names=[
"transaction_id",
"price",
"date_of_transfer",
"postcode",
"property_type",
"old_new",
"duration",
"paon",
"saon",
"street",
"locality",
"town_city",
"district",
"county",
"ppd_category",
"record_status",
],
)
profile = ProfileReport(df_report, title="Price Paid Data", minimal=True)
profile.to_file("price_paid_data_report.html")
if __name__ == "__main__":
main()

40
mkdocs.yaml Normal file
View File

@@ -0,0 +1,40 @@
site_name: The Street Group Technical Test
repo_url: https://github.com/dtomlinson91/street_group_tech_test
use_directory_urls: false
nav:
- Documentation:
- Welcome: index.md
# - Installation: documentation/installation.md
# - Usage: documentation/usage.md
# - Comments and Caveats:
# - Introduction: comments_caveats/introduction.md
# - Time limit: comments_caveats/time_limit.md
# - Third party libraries: comments_caveats/third_party_libraries.md
# - Areas of improvement and comments: comments_caveats/area_of_improvement_comments.md
# - Similar names algorithm: comments_caveats/similar_names.md
# - Reference:
# - deduplicator.main: reference/api_documentation_main.md
# - Changelog: changelog/changelog.md
theme:
name: material
palette:
primary: indigo
accent: blue
feature:
tabs: true
markdown_extensions:
- admonition
- codehilite:
guess_lang: true
- toc:
permalink: true
- pymdownx.superfences
# - pymdownx.arithmatex:
# generic: true
plugins:
- search:
lang: en
extra_javascript:
- javascripts/config.js
- https://polyfill.io/v3/polyfill.min.js?features=es6
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js

View File

@@ -0,0 +1,7 @@
# Answers
## CSV
Read a csv file into Beam:
<https://stackoverflow.com/a/41171867>

View File

@@ -0,0 +1,12 @@
# Beam Documentation
## Transforms
FlatMap:
<https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/>
I/O Transforms:
<https://beam.apache.org/documentation/io/built-in/>

View File

@@ -0,0 +1,24 @@
# DataFlow
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
Export env variable:
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
Run the pipeline:
python -m analyse_properties.main \
--region europe-west2 \
--input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \
--output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot/tmp
## Errors
Unsubscriptable error on window:
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>

5
notes/links.md Normal file
View File

@@ -0,0 +1,5 @@
# Links
## Data
https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads

2628
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

79
prospector.yaml Normal file
View File

@@ -0,0 +1,79 @@
# This will enable almost every single warning
output-format: vscode
doc-warnings: true
# allow-shorthand: false
strictness: none
ignore-patterns:
- (^|/)\..+
pylint:
run: true
disable:
- fixme
- bad-continuation
- missing-module-docstring
- logging-fstring-interpolation
- missing-function-docstring
- abstract-method
- missing-class-docstring
- super-init-not-called
- arguments-differ
- inconsistent-return-statements
- expression-not-assigned
enable:
options:
max-locals: 15
max-returns: 6
max-branches: 12
max-statements: 50
# max-parents: 7
max-attributes: 20
min-public-methods: 0
max-public-methods: 25
max-module-lines: 1000
max-line-length: 88
mccabe:
run: true
options:
max-complexity: 10
pep8:
run: true
options:
max-line-length: 88
single-line-if-stmt: n
disable:
- E501 # line too long
pyroma:
run: false
disable:
- PYR19
- PYR16
pep257:
disable:
- D000
- D203
# - D213
- D212 # multiline docstrings on first line only.
- D404
- D100
- D407 # numpy docstring format
- D107 # missing docstring in __init__
# Docstrings ending with newlines and : in Returns block.
- D413
- D406
- D103
- D101 # missing docstring in public class
- D102 # missing docstring in public method
pyflakes:
disable:
- F401 # unused import
dodgy:
run: true

20
pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[tool.poetry]
name = "analyse_properties"
version = "0.1.0"
description = ""
authors = ["Daniel Tomlinson <dtomlinson@panaetius.co.uk>"]
[tool.poetry.dependencies]
python = "^3.7"
apache-beam = {extras = ["gcp"], version = "^2.32.0"}
mkdocs = "^1.2.2"
mkdocs-material = "^7.3.0"
[tool.poetry.dev-dependencies]
# pytest = "^5.2"
prospector = "^1.5.1"
pandas-profiling = "^3.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"