mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 11:55:45 +00:00
Compare commits
56 Commits
v1.0
...
wip/datafl
| Author | SHA1 | Date | |
|---|---|---|---|
| 76434fae5b | |||
| 886a37ca94 | |||
| 3263b3dd8b | |||
| dffc6aa553 | |||
| f9eeb8bfad | |||
| cad6612ebe | |||
| 391861d80c | |||
| f60beb4565 | |||
| f2ed60426d | |||
| 7db1edb90c | |||
| 3a74579440 | |||
| 377e3c703f | |||
| a8fc06c764 | |||
| eaa36877f6 | |||
| 1941fcb7bf | |||
| 99e67c2840 | |||
| 8e8469579e | |||
| 4e3771c728 | |||
| 8856a9763f | |||
| fded858932 | |||
| bb71d55f8c | |||
| 8047b5ced4 | |||
| 9f53c66975 | |||
| e6ec110d54 | |||
| 83807616e0 | |||
| 7f874fa6f6 | |||
| b8a997084d | |||
| c4e81065b1 | |||
| 62bd0196ad | |||
| 7f9b7e4bfd | |||
| 7962f40e32 | |||
| 2a43ea1946 | |||
| 07d176be79 | |||
| f804e85cc3 | |||
| 9fdc6dce05 | |||
| 54cf5e3e36 | |||
| 2e42a453b0 | |||
| adfbd8e93d | |||
| 1bd54f188d | |||
| a7c52b1085 | |||
| 24420c8935 | |||
| 44f346deff | |||
| a37e7817c3 | |||
| 539e4c7786 | |||
| 214ce77d8f | |||
| 47a4ac4bc3 | |||
| aa61ea9c57 | |||
| 94cc22a385 | |||
| a05182892a | |||
| c38a10ca2f | |||
| ab993fc030 | |||
| 9cf5662600 | |||
| 0f262daf39 | |||
| 972d0a852a | |||
| 5301d5ff04 | |||
| 6db2fa59b9 |
145
.gitignore
vendored
Normal file
145
.gitignore
vendored
Normal file
@@ -0,0 +1,145 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# USER
|
||||
|
||||
# ignore the data csv files for input/output
|
||||
**/data/**/*.csv
|
||||
**/data/**/*.json
|
||||
.vscode/settings.json
|
||||
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.7.9
|
||||
@@ -1,2 +1,9 @@
|
||||
# street_group_tech_test
|
||||
Technical Test for Street Group
|
||||
|
||||
Technical Test for Street Group for Daniel Tomlinson.
|
||||
|
||||
## Documentation
|
||||
|
||||
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
|
||||
|
||||
https://dtomlinson91.github.io/street_group_tech_test/
|
||||
|
||||
5
analyse_properties/__init__.py
Normal file
5
analyse_properties/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""
|
||||
Daniel Tomlinson (dtomlinson@panaetius.co.uk).
|
||||
|
||||
Technical test for Street Group.
|
||||
"""
|
||||
30
analyse_properties/debug.py
Normal file
30
analyse_properties/debug.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import apache_beam as beam
|
||||
|
||||
|
||||
class DebugShowEmptyColumn(beam.DoFn):
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
|
||||
def process(self, element):
|
||||
column = element[self.index]
|
||||
if len(column) == 0:
|
||||
yield element
|
||||
return None
|
||||
|
||||
|
||||
class DebugShowColumnWithValueIn(beam.DoFn):
|
||||
def __init__(self, index, value):
|
||||
self.index = index
|
||||
self.value = value
|
||||
|
||||
def process(self, element):
|
||||
column = element[self.index]
|
||||
if self.value in column:
|
||||
yield element
|
||||
return None
|
||||
|
||||
|
||||
class DebugPrint(beam.DoFn):
|
||||
def process(self, element):
|
||||
print(element)
|
||||
yield element
|
||||
402
analyse_properties/main.py
Normal file
402
analyse_properties/main.py
Normal file
@@ -0,0 +1,402 @@
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
import apache_beam as beam
|
||||
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||
|
||||
from analyse_properties.debug import * # noqa
|
||||
|
||||
def slice_by_range(element, *ranges):
|
||||
"""
|
||||
Slice a list with multiple ranges.
|
||||
|
||||
Args:
|
||||
element : The element.
|
||||
*ranges (tuple): Tuples containing a start,end index to slice the element.
|
||||
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
|
||||
|
||||
Returns:
|
||||
list: The list sliced by the ranges
|
||||
"""
|
||||
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
|
||||
|
||||
|
||||
class DropRecordsSingleEmptyColumn(beam.DoFn):
|
||||
"""
|
||||
Drop the entire row if a given column is empty.
|
||||
|
||||
Args:
|
||||
index : The index of the column in the list.
|
||||
|
||||
Returns:
|
||||
None: If the length of the column is 0, drop the element.
|
||||
|
||||
Yields:
|
||||
element: If the length of the column is >0, keep the element.
|
||||
"""
|
||||
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
|
||||
def process(self, element):
|
||||
column = element[self.index]
|
||||
if len(column) == 0:
|
||||
return None
|
||||
yield element
|
||||
|
||||
|
||||
class DropRecordsTwoEmptyColumn(beam.DoFn):
|
||||
"""
|
||||
Drop the entire row if both of two given columns are empty.
|
||||
|
||||
Args:
|
||||
index_0 : The index of the first column in the list.
|
||||
index_1 : The index of the second column in the list.
|
||||
|
||||
Returns:
|
||||
None: If the length of both columns is 0, drop the element.
|
||||
|
||||
Yields:
|
||||
element: If the length of both columns is >0, keep the element.
|
||||
"""
|
||||
|
||||
def __init__(self, index_0, index_1):
|
||||
self.index_0 = index_0
|
||||
self.index_1 = index_1
|
||||
|
||||
def process(self, element):
|
||||
column_0 = element[self.index_0]
|
||||
column_1 = element[self.index_1]
|
||||
if len(column_0) == 0 and len(column_1) == 0:
|
||||
return None
|
||||
yield element
|
||||
|
||||
|
||||
class SplitColumn(beam.DoFn):
|
||||
"""
|
||||
Split one column into two columns by a character.
|
||||
|
||||
Args:
|
||||
index : The index of the column in the list.
|
||||
split_char: The character to split the column by.
|
||||
"""
|
||||
|
||||
def __init__(self, index, split_char):
|
||||
self.index = index
|
||||
self.split_char = split_char
|
||||
|
||||
def process(self, element):
|
||||
# If there is a split based on the split_char, then keep the second result in
|
||||
# place (street number) and append the first result (building) at the end.
|
||||
try:
|
||||
part_0, part_1 = element[self.index].split(self.split_char)
|
||||
element[self.index] = part_1.strip()
|
||||
element.append(part_0.strip())
|
||||
yield element
|
||||
except ValueError:
|
||||
# append a blank column to keep column numbers consistent.
|
||||
element.append("")
|
||||
yield element
|
||||
|
||||
|
||||
class CreateMappingTable(beam.DoFn):
|
||||
"""
|
||||
Create a mapping table to be used as a side-input.
|
||||
|
||||
This mapping table has a key of an ID generated across all columns and a value of
|
||||
the raw property data.
|
||||
|
||||
The table is used to populate the raw property data after a GroupByKey using
|
||||
only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
|
||||
"""
|
||||
|
||||
def process(self, element):
|
||||
# Join the row into a string.
|
||||
unique_string = ",".join(element)
|
||||
# Hash the string.
|
||||
hashed_string = hashlib.md5(unique_string.encode())
|
||||
# Format the resulting PCollection with the key of id and value of raw data.
|
||||
new_element = (hashed_string.hexdigest(), list(element))
|
||||
yield new_element
|
||||
|
||||
|
||||
class CreateUniquePropertyID(beam.DoFn):
|
||||
"""
|
||||
Create a unique property ID which does not include the price and date of sale.
|
||||
|
||||
Uses each row of the mapping table to create a PCollection with a key of the
|
||||
unique property ID and a value of the ID generated across all columns.
|
||||
"""
|
||||
|
||||
def process(self, element):
|
||||
unique_string = ",".join(element[-1][2:])
|
||||
hashed_string = hashlib.md5(unique_string.encode())
|
||||
new_element = (hashed_string.hexdigest(), element[0])
|
||||
yield new_element
|
||||
|
||||
|
||||
class DeduplicateIDs(beam.DoFn):
|
||||
"""Deduplicate a list of IDs."""
|
||||
|
||||
def process(self, element):
|
||||
deduplicated_list = list(set(element[-1]))
|
||||
new_element = (element[0], deduplicated_list)
|
||||
yield new_element
|
||||
|
||||
|
||||
def insert_data_for_id(element, mapping_table):
|
||||
"""
|
||||
Replace the ID with the raw data from the mapping table.
|
||||
|
||||
Args:
|
||||
element: The element.
|
||||
mapping_table (dict): The mapping table.
|
||||
|
||||
Yields:
|
||||
The element with IDs replaced with raw data.
|
||||
"""
|
||||
replaced_list = [mapping_table[data_id] for data_id in element[-1]]
|
||||
new_element = (element[0], replaced_list)
|
||||
yield new_element
|
||||
|
||||
|
||||
class ConvertDataToDict(beam.DoFn):
|
||||
"""Convert the processed data into a dict to be exported as a JSON object."""
|
||||
|
||||
@staticmethod
|
||||
def get_latest_transaction(transaction_dates):
|
||||
"""
|
||||
Get the date of the latest transaction for a list of dates.
|
||||
|
||||
Args:
|
||||
transaction_dates (str): A date in the form "%Y-%m-%d".
|
||||
|
||||
Returns:
|
||||
str: The year in the form "%Y" of the latest transaction date.
|
||||
"""
|
||||
transaction_dates = [
|
||||
datetime.strptime(individual_transaction, "%Y-%m-%d")
|
||||
for individual_transaction in transaction_dates
|
||||
]
|
||||
return max(transaction_dates).strftime("%Y")
|
||||
|
||||
@staticmethod
|
||||
def get_readable_address(address_components, address_comparisons):
|
||||
"""
|
||||
Create a human readable address from the locality/town/district/county columns.
|
||||
|
||||
Args:
|
||||
address_components (list): The preceeding parts of the address (street, postcode etc.)
|
||||
address_comparisons (list): The locality/town/district/county.
|
||||
|
||||
Returns:
|
||||
str: The complete address deduplicated & cleaned.
|
||||
"""
|
||||
# Get pairwise comparison to see if two locality/town/district/counties
|
||||
# are equivalent
|
||||
pairwise_comparison = [
|
||||
x == y
|
||||
for i, x in enumerate(address_comparisons)
|
||||
for j, y in enumerate(address_comparisons)
|
||||
if i > j
|
||||
]
|
||||
# Create a mask to eliminate the redundant parts of the address
|
||||
mask = [True, True, True, True]
|
||||
if pairwise_comparison[0]:
|
||||
mask[1] = False
|
||||
if pairwise_comparison[1] or pairwise_comparison[2]:
|
||||
mask[2] = False
|
||||
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
|
||||
mask[3] = False
|
||||
# Apply the mask
|
||||
applied_mask = list(itertools.compress(address_comparisons, mask))
|
||||
# Filter out empty items in list
|
||||
deduplicated_address_part = list(filter(None, applied_mask))
|
||||
# Filter out any missing parts of the address components
|
||||
cleaned_address_components = list(filter(None, address_components))
|
||||
|
||||
# Return the readable address
|
||||
return "\n".join(
|
||||
itertools.chain.from_iterable(
|
||||
[
|
||||
cleaned_address_components[0:-1],
|
||||
deduplicated_address_part,
|
||||
[cleaned_address_components[-1]],
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
def process(self, element):
|
||||
# Group together all the transactions for the property.
|
||||
property_transactions = [
|
||||
{
|
||||
"price": int(entry[0]),
|
||||
"transaction_date": entry[1].replace(" 00:00", ""),
|
||||
"year": int(entry[1][0:4]),
|
||||
}
|
||||
for entry in element[-1]
|
||||
]
|
||||
|
||||
# Create the dict to hold all the information about the property.
|
||||
json_object = {
|
||||
"property_id": element[0],
|
||||
"readable_address": None,
|
||||
"flat_appartment": list(element[-1])[0][4],
|
||||
"builing": list(element[-1])[0][10],
|
||||
"number": list(element[-1])[0][3],
|
||||
"street": list(element[-1])[0][5],
|
||||
"locality": list(element[-1])[0][6],
|
||||
"town": list(element[-1])[0][7],
|
||||
"district": list(element[-1])[0][8],
|
||||
"county": list(element[-1])[0][9],
|
||||
"postcode": list(element[-1])[0][2],
|
||||
"property_transactions": property_transactions,
|
||||
"latest_transaction_year": int(self.get_latest_transaction(
|
||||
[
|
||||
transaction["transaction_date"]
|
||||
for transaction in property_transactions
|
||||
]
|
||||
)),
|
||||
}
|
||||
|
||||
# Create a human readable address to go in the dict.
|
||||
json_object["readable_address"] = self.get_readable_address(
|
||||
[
|
||||
json_object["flat_appartment"],
|
||||
json_object["builing"],
|
||||
f'{json_object["number"]} {json_object["street"]}',
|
||||
json_object["postcode"],
|
||||
],
|
||||
[
|
||||
json_object["locality"],
|
||||
json_object["town"],
|
||||
json_object["district"],
|
||||
json_object["county"],
|
||||
],
|
||||
)
|
||||
yield json_object
|
||||
|
||||
|
||||
def run(argv=None, save_main_session=True):
|
||||
"""Entrypoint and definition of the pipeline."""
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
# Default input/output files
|
||||
input_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "input"
|
||||
/ "pp-2020.csv"
|
||||
# / "pp-complete.csv"
|
||||
)
|
||||
output_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "output"
|
||||
/ "pp-2020"
|
||||
# / "pp-complete"
|
||||
)
|
||||
|
||||
# Arguments
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--input",
|
||||
dest="input",
|
||||
default=str(input_file),
|
||||
help="Full path to the input file.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
dest="output",
|
||||
default=str(output_file),
|
||||
help="Full path to the output file without extension.",
|
||||
)
|
||||
known_args, pipeline_args = parser.parse_known_args(argv)
|
||||
|
||||
# Pipeline options. save_main_session needed for DataFlow for global imports.
|
||||
pipeline_options = PipelineOptions(pipeline_args)
|
||||
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
|
||||
|
||||
with beam.Pipeline(options=pipeline_options) as pipeline:
|
||||
# Load the data
|
||||
load = (
|
||||
pipeline
|
||||
| "Read input data" >> beam.io.ReadFromText(known_args.input)
|
||||
| "Split by ','" >> beam.Map(lambda element: element.split(","))
|
||||
| "Remove leading and trailing quotes"
|
||||
>> beam.Map(lambda element: [el.strip('"') for el in element])
|
||||
)
|
||||
|
||||
# Clean the data.
|
||||
clean_drop = (
|
||||
load
|
||||
| "Drop unneeded columns"
|
||||
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
|
||||
| "Convert to Upper Case"
|
||||
>> beam.Map(lambda element: [e.upper() for e in element])
|
||||
| "Strip leading/trailing whitespace"
|
||||
>> beam.Map(lambda element: [e.strip() for e in element])
|
||||
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||
| "Drop empty PAON if missing SAON"
|
||||
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||
| "Split PAON into two columns if separated by comma"
|
||||
>> beam.ParDo(SplitColumn(3, ","))
|
||||
)
|
||||
|
||||
# Create a mapping table
|
||||
mapping_table_raw = (
|
||||
clean_drop
|
||||
| "Create a mapping table with key of id_all_columns and value of cleaned data."
|
||||
>> beam.ParDo(CreateMappingTable())
|
||||
)
|
||||
|
||||
# Condense mapping table into a single dict.
|
||||
mapping_table_condensed = (
|
||||
mapping_table_raw
|
||||
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
|
||||
)
|
||||
|
||||
# Prepare the data by creating IDs, grouping together and using mapping table
|
||||
# to reinsert raw data.
|
||||
prepared = (
|
||||
mapping_table_raw
|
||||
| "Create unique ID ignoring price & date"
|
||||
>> beam.ParDo(CreateUniquePropertyID())
|
||||
| "Group by ID"
|
||||
>> beam.GroupByKey()
|
||||
| "Deduplicate to eliminate repeated transactions"
|
||||
>> beam.ParDo(DeduplicateIDs())
|
||||
| "Insert the raw data using the mapping table"
|
||||
>> beam.FlatMap(
|
||||
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
|
||||
)
|
||||
)
|
||||
|
||||
# Format the data into a dict.
|
||||
formatted = (
|
||||
prepared
|
||||
| "Convert the prepared data into a dict object"
|
||||
>> beam.ParDo(ConvertDataToDict())
|
||||
)
|
||||
|
||||
# Save the data to a .json file.
|
||||
(
|
||||
formatted
|
||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
||||
| "Format output" >> beam.Map(json.dumps, indent=2)
|
||||
| "Save to .json file"
|
||||
>> beam.io.WriteToText(
|
||||
file_path_prefix=known_args.output,
|
||||
file_name_suffix=".json",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
run()
|
||||
5
docs/index.md
Normal file
5
docs/index.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Welcome
|
||||
|
||||
## Introduction
|
||||
|
||||
This documentation accompanies the technical test for the Street Group.
|
||||
1439
docs/pandas-profiling/report.html
Normal file
1439
docs/pandas-profiling/report.html
Normal file
File diff suppressed because one or more lines are too long
8
download_data.sh
Executable file
8
download_data.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
# Full data set
|
||||
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input
|
||||
|
||||
# Monthly update data set
|
||||
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input
|
||||
|
||||
# 2020 data set
|
||||
wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input
|
||||
38
exploration/report.py
Normal file
38
exploration/report.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import pathlib
|
||||
|
||||
import pandas as pd
|
||||
from pandas_profiling import ProfileReport
|
||||
|
||||
|
||||
def main():
|
||||
input_file = (
|
||||
pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-complete.csv"
|
||||
)
|
||||
with input_file.open() as csv:
|
||||
df_report = pd.read_csv(
|
||||
csv,
|
||||
names=[
|
||||
"transaction_id",
|
||||
"price",
|
||||
"date_of_transfer",
|
||||
"postcode",
|
||||
"property_type",
|
||||
"old_new",
|
||||
"duration",
|
||||
"paon",
|
||||
"saon",
|
||||
"street",
|
||||
"locality",
|
||||
"town_city",
|
||||
"district",
|
||||
"county",
|
||||
"ppd_category",
|
||||
"record_status",
|
||||
],
|
||||
)
|
||||
profile = ProfileReport(df_report, title="Price Paid Data", minimal=True)
|
||||
profile.to_file("price_paid_data_report.html")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
40
mkdocs.yaml
Normal file
40
mkdocs.yaml
Normal file
@@ -0,0 +1,40 @@
|
||||
site_name: The Street Group Technical Test
|
||||
repo_url: https://github.com/dtomlinson91/street_group_tech_test
|
||||
use_directory_urls: false
|
||||
nav:
|
||||
- Documentation:
|
||||
- Welcome: index.md
|
||||
# - Installation: documentation/installation.md
|
||||
# - Usage: documentation/usage.md
|
||||
# - Comments and Caveats:
|
||||
# - Introduction: comments_caveats/introduction.md
|
||||
# - Time limit: comments_caveats/time_limit.md
|
||||
# - Third party libraries: comments_caveats/third_party_libraries.md
|
||||
# - Areas of improvement and comments: comments_caveats/area_of_improvement_comments.md
|
||||
# - Similar names algorithm: comments_caveats/similar_names.md
|
||||
# - Reference:
|
||||
# - deduplicator.main: reference/api_documentation_main.md
|
||||
# - Changelog: changelog/changelog.md
|
||||
theme:
|
||||
name: material
|
||||
palette:
|
||||
primary: indigo
|
||||
accent: blue
|
||||
feature:
|
||||
tabs: true
|
||||
markdown_extensions:
|
||||
- admonition
|
||||
- codehilite:
|
||||
guess_lang: true
|
||||
- toc:
|
||||
permalink: true
|
||||
- pymdownx.superfences
|
||||
# - pymdownx.arithmatex:
|
||||
# generic: true
|
||||
plugins:
|
||||
- search:
|
||||
lang: en
|
||||
extra_javascript:
|
||||
- javascripts/config.js
|
||||
- https://polyfill.io/v3/polyfill.min.js?features=es6
|
||||
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js
|
||||
7
notes/documentation/answers.md
Normal file
7
notes/documentation/answers.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Answers
|
||||
|
||||
## CSV
|
||||
|
||||
Read a csv file into Beam:
|
||||
|
||||
<https://stackoverflow.com/a/41171867>
|
||||
12
notes/documentation/beam_docs.md
Normal file
12
notes/documentation/beam_docs.md
Normal file
@@ -0,0 +1,12 @@
|
||||
# Beam Documentation
|
||||
|
||||
## Transforms
|
||||
|
||||
FlatMap:
|
||||
|
||||
<https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/>
|
||||
|
||||
|
||||
I/O Transforms:
|
||||
|
||||
<https://beam.apache.org/documentation/io/built-in/>
|
||||
93
notes/documentation/dataflow.md
Normal file
93
notes/documentation/dataflow.md
Normal file
@@ -0,0 +1,93 @@
|
||||
# DataFlow
|
||||
|
||||
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
|
||||
|
||||
## Examples
|
||||
|
||||
Full example of beam pipeline on dataflow:
|
||||
|
||||
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset>
|
||||
|
||||
## Setup
|
||||
|
||||
Export env variable:
|
||||
|
||||
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
|
||||
|
||||
## Run pipeline
|
||||
|
||||
### Dataflow
|
||||
|
||||
#### Yearly dataset
|
||||
|
||||
```bash
|
||||
python -m analyse_properties.main \
|
||||
--region europe-west1 \
|
||||
--input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \
|
||||
--output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \
|
||||
--runner DataflowRunner \
|
||||
--project street-group \
|
||||
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
|
||||
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
|
||||
--no_use_public_ips \
|
||||
--worker_machine_type=n1-highmem-2
|
||||
```
|
||||
|
||||
#### Full dataset
|
||||
|
||||
```bash
|
||||
python -m analyse_properties.main \
|
||||
--region europe-west1 \
|
||||
--input gs://street-group-technical-test-dmot-euw1/input/pp-complete.csv \
|
||||
--output gs://street-group-technical-test-dmot-euw1/output/pp-complete \
|
||||
--runner DataflowRunner \
|
||||
--project street-group \
|
||||
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
|
||||
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
|
||||
--no_use_public_ips \
|
||||
--worker_machine_type=n1-highmem-8 \
|
||||
--num_workers=3 \
|
||||
--autoscaling_algorithm=NONE
|
||||
```
|
||||
|
||||
### Locally
|
||||
|
||||
Run the pipeline locally:
|
||||
|
||||
`python -m analyse_properties.main --runner DirectRunner`
|
||||
|
||||
## Errors
|
||||
|
||||
Unsubscriptable error on window:
|
||||
|
||||
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
|
||||
|
||||
## Documentation
|
||||
|
||||
Running in its own private VPC without public IPs
|
||||
|
||||
- <https://stackoverflow.com/questions/58893082/which-compute-engine-quotas-need-to-be-updated-to-run-dataflow-with-50-workers>
|
||||
- <https://cloud.google.com/dataflow/docs/guides/specifying-networks#subnetwork_parameter>
|
||||
|
||||
Error help
|
||||
|
||||
- <https://cloud.google.com/dataflow/docs/guides/common-errors>
|
||||
- <https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline>
|
||||
|
||||
Scaling
|
||||
|
||||
Using DataFlowPrime: <https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime#enable-prime>
|
||||
Use `--experiments=enable_prime`
|
||||
|
||||
Deploying a pipeline (with scaling options): <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline>
|
||||
|
||||
Available VM types (with pricing): <https://cloud.google.com/compute/vm-instance-pricing#n1_predefined>
|
||||
|
||||
Performance
|
||||
|
||||
Sideinput performance: <https://stackoverflow.com/questions/48242320/google-dataflow-apache-beam-python-side-input-from-pcollection-kills-perform>
|
||||
|
||||
Common use cases:
|
||||
|
||||
- Part 1 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
|
||||
- Part 2 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2>
|
||||
5
notes/links.md
Normal file
5
notes/links.md
Normal file
@@ -0,0 +1,5 @@
|
||||
# Links
|
||||
|
||||
## Data
|
||||
|
||||
https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
|
||||
27
notes/tmp/errordata
Normal file
27
notes/tmp/errordata
Normal file
@@ -0,0 +1,27 @@
|
||||
"Error message from worker: Traceback (most recent call last):
|
||||
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
|
||||
work_executor.execute()
|
||||
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 181, in execute
|
||||
op.finish()
|
||||
File "dataflow_worker/native_operations.py", line 93, in dataflow_worker.native_operations.NativeWriteOperation.finish
|
||||
File "dataflow_worker/native_operations.py", line 94, in dataflow_worker.native_operations.NativeWriteOperation.finish
|
||||
File "dataflow_worker/native_operations.py", line 95, in dataflow_worker.native_operations.NativeWriteOperation.finish
|
||||
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/nativeavroio.py", line 308, in __exit__
|
||||
self._data_file_writer.flush()
|
||||
File "fastavro/_write.pyx", line 664, in fastavro._write.Writer.flush
|
||||
File "fastavro/_write.pyx", line 639, in fastavro._write.Writer.dump
|
||||
File "fastavro/_write.pyx", line 451, in fastavro._write.snappy_write_block
|
||||
File "fastavro/_write.pyx", line 458, in fastavro._write.snappy_write_block
|
||||
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystemio.py", line 200, in write
|
||||
self._uploader.put(b)
|
||||
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py", line 720, in put
|
||||
self._conn.send_bytes(data.tobytes())
|
||||
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
|
||||
self._send_bytes(m[offset:offset + size])
|
||||
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
|
||||
header = struct.pack("!i", n)
|
||||
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
|
||||
"
|
||||
|
||||
|
||||
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"
|
||||
44
notes/tmp/exampledata
Normal file
44
notes/tmp/exampledata
Normal file
@@ -0,0 +1,44 @@
|
||||
[{
|
||||
"property_id": "3cf3c06632c46754696f2017933702f3",
|
||||
"flat_appartment": "",
|
||||
"builing": "",
|
||||
"number": "63",
|
||||
"street": "ROTTON PARK STREET",
|
||||
"locality": "",
|
||||
"town": "BIRMINGHAM",
|
||||
"district": "BIRMINGHAM",
|
||||
"county": "WEST MIDLANDS",
|
||||
"postcode": "B16 0AE",
|
||||
"property_transactions": [
|
||||
{ "price": "385000", "transaction_date": "2021-01-08", "year": "2021" },
|
||||
{ "price": "701985", "transaction_date": "2019-03-28", "year": "2019" },
|
||||
{ "price": "1748761", "transaction_date": "2020-05-27", "year": "2020" }
|
||||
],
|
||||
"latest_transaction_year": "2021"
|
||||
},
|
||||
{
|
||||
"property_id": "c650d5d7bb0daf0a19bb2cacabbee74e",
|
||||
"readable_address": "16 STATION ROAD\nPARKGATE\nNESTON\nCHESHIRE WEST AND CHESTER\nCH64 6QJ",
|
||||
"flat_appartment": "",
|
||||
"builing": "",
|
||||
"number": "16",
|
||||
"street": "STATION ROAD",
|
||||
"locality": "PARKGATE",
|
||||
"town": "NESTON",
|
||||
"district": "CHESHIRE WEST AND CHESTER",
|
||||
"county": "CHESHIRE WEST AND CHESTER",
|
||||
"postcode": "CH64 6QJ",
|
||||
"property_transactions": [
|
||||
{
|
||||
"price": "280000",
|
||||
"transaction_date": "2020-11-30",
|
||||
"year": "2020"
|
||||
},
|
||||
{
|
||||
"price": "265000",
|
||||
"transaction_date": "2020-05-29",
|
||||
"year": "2020"
|
||||
}
|
||||
],
|
||||
"latest_transaction_year": "2020"
|
||||
}]
|
||||
16
notes/tmp/runningdata
Normal file
16
notes/tmp/runningdata
Normal file
@@ -0,0 +1,16 @@
|
||||
|
||||
|
||||
Create Mapping table
|
||||
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
|
||||
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
|
||||
|
||||
Condensing
|
||||
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
|
||||
|
||||
|
||||
Prepared
|
||||
GroupByKey
|
||||
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
|
||||
|
||||
deduplicated
|
||||
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])
|
||||
2628
poetry.lock
generated
Normal file
2628
poetry.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
80
prospector.yaml
Normal file
80
prospector.yaml
Normal file
@@ -0,0 +1,80 @@
|
||||
# This will enable almost every single warning
|
||||
output-format: vscode
|
||||
doc-warnings: true
|
||||
# allow-shorthand: false
|
||||
strictness: none
|
||||
|
||||
ignore-patterns:
|
||||
- (^|/)\..+
|
||||
|
||||
pylint:
|
||||
run: true
|
||||
disable:
|
||||
- fixme
|
||||
- bad-continuation
|
||||
- missing-module-docstring
|
||||
- logging-fstring-interpolation
|
||||
- missing-function-docstring
|
||||
- abstract-method
|
||||
- missing-class-docstring
|
||||
- super-init-not-called
|
||||
- arguments-differ
|
||||
- inconsistent-return-statements
|
||||
- expression-not-assigned
|
||||
- line-too-long
|
||||
enable:
|
||||
|
||||
options:
|
||||
max-locals: 15
|
||||
max-returns: 6
|
||||
max-branches: 12
|
||||
max-statements: 50
|
||||
# max-parents: 7
|
||||
max-attributes: 20
|
||||
min-public-methods: 0
|
||||
max-public-methods: 25
|
||||
max-module-lines: 1000
|
||||
max-line-length: 88
|
||||
|
||||
mccabe:
|
||||
run: true
|
||||
options:
|
||||
max-complexity: 10
|
||||
|
||||
pep8:
|
||||
run: true
|
||||
options:
|
||||
max-line-length: 88
|
||||
single-line-if-stmt: n
|
||||
disable:
|
||||
- E501 # line too long
|
||||
|
||||
pyroma:
|
||||
run: false
|
||||
disable:
|
||||
- PYR19
|
||||
- PYR16
|
||||
|
||||
pep257:
|
||||
disable:
|
||||
- D000
|
||||
- D203
|
||||
# - D213
|
||||
- D212 # multiline docstrings on first line only.
|
||||
- D404
|
||||
- D100
|
||||
- D407 # numpy docstring format
|
||||
- D107 # missing docstring in __init__
|
||||
# Docstrings ending with newlines and : in Returns block.
|
||||
- D413
|
||||
- D406
|
||||
- D103
|
||||
- D101 # missing docstring in public class
|
||||
- D102 # missing docstring in public method
|
||||
|
||||
pyflakes:
|
||||
disable:
|
||||
- F401 # unused import
|
||||
|
||||
dodgy:
|
||||
run: true
|
||||
23
pyproject.toml
Normal file
23
pyproject.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[tool.poetry]
|
||||
name = "analyse_properties"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["Daniel Tomlinson <dtomlinson@panaetius.co.uk>"]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.7"
|
||||
apache-beam = {extras = ["gcp"], version = "^2.32.0"}
|
||||
mkdocs = "^1.2.2"
|
||||
mkdocs-material = "^7.3.0"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
# pytest = "^5.2"
|
||||
prospector = "^1.5.1"
|
||||
pandas-profiling = "^3.0.0"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
"analyse-properties" = "analyse_properties.main:run"
|
||||
Reference in New Issue
Block a user