56 Commits

Author SHA1 Message Date
76434fae5b adding entrypoint command 2021-09-27 21:18:28 +01:00
886a37ca94 updating report.py 2021-09-27 21:18:14 +01:00
3263b3dd8b moving panda-profiling report into docs 2021-09-27 21:18:06 +01:00
dffc6aa553 adding debug print 2021-09-27 21:17:49 +01:00
f9eeb8bfad adding latest beam pipeline code for dataflow 2021-09-27 21:17:39 +01:00
cad6612ebe updating dataflow notes 2021-09-27 03:39:40 +01:00
391861d80c adding latest beam pipeline code 2021-09-27 03:39:30 +01:00
f60beb4565 updating dataflow notes 2021-09-27 03:18:49 +01:00
f2ed60426d adding temporary notes 2021-09-27 03:18:42 +01:00
7db1edb90c updating download_data script with pp-2020 dataset 2021-09-27 03:18:33 +01:00
3a74579440 adding latest beam pipeline code for dataflow with group optimisation 2021-09-27 03:18:17 +01:00
377e3c703f updating dataflow documentation 2021-09-27 01:35:48 +01:00
a8fc06c764 adding latest beam pipeline code for dataflow with group optimisation 2021-09-26 23:28:58 +01:00
eaa36877f6 update dataflow documentation with new commands for vpc 2021-09-26 23:28:35 +01:00
1941fcb7bf update prospector.yaml 2021-09-26 23:28:21 +01:00
99e67c2840 updating download_data script with new bucket 2021-09-26 23:28:12 +01:00
8e8469579e updating beam pipeline to use GroupByKey 2021-09-26 20:29:11 +01:00
4e3771c728 adding latest beam pipeline code for dataflow 2021-09-26 17:15:35 +01:00
8856a9763f updating prospector.yaml 2021-09-26 17:15:24 +01:00
fded858932 moving dataflow notes 2021-09-26 17:15:17 +01:00
bb71d55f8c adding latest beam pipeline code for dataflow 2021-09-26 16:23:19 +01:00
8047b5ced4 adding latest beam pipeline code for dataflow 2021-09-26 16:16:58 +01:00
9f53c66975 adding latest beam pipeline code for dataflow 2021-09-26 15:57:00 +01:00
e6ec110d54 updating download data script for new location in GCS 2021-09-26 14:56:53 +01:00
83807616e0 Merge branch 'wip/pathlib' into develop 2021-09-26 14:56:21 +01:00
7f874fa6f6 updating README.md 2021-09-26 14:56:01 +01:00
b8a997084d removing requirements.txt for documentation 2021-09-26 14:55:48 +01:00
c4e81065b1 adding pyenv 3.7.9 2021-09-26 14:55:05 +01:00
62bd0196ad removing shard_name_template from output file 2021-09-26 06:11:42 +01:00
7f9b7e4bfd moving inputs/outputs to use pathlib 2021-09-26 06:03:55 +01:00
7962f40e32 adding initial docs 2021-09-26 01:34:06 +01:00
2a43ea1946 adding download script for data set 2021-09-26 01:18:23 +01:00
07d176be79 updating .gitignore 2021-09-26 01:10:53 +01:00
f804e85cc3 updating .gitignore 2021-09-26 01:10:32 +01:00
9fdc6dce05 migrate beam pipeline to main.py 2021-09-26 01:10:09 +01:00
54cf5e3e36 updating prospector.yaml 2021-09-26 01:09:48 +01:00
2e42a453b0 adding latest beam pipeline code 2021-09-25 22:15:58 +01:00
adfbd8e93d updating prospector.yaml 2021-09-25 22:15:46 +01:00
1bd54f188d updating folder structure for data input/output 2021-09-25 22:15:39 +01:00
a7c52b1085 updating .gitignore 2021-09-25 22:15:10 +01:00
24420c8935 adding latest beam pipeline code 2021-09-25 18:25:08 +01:00
44f346deff Merge remote-tracking branch 'origin/develop' into develop 2021-09-25 16:48:04 +01:00
a37e7817c3 adding latest beam pipeline code 2021-09-25 16:47:52 +01:00
539e4c7786 adding latesty beam pipeline code 2021-09-25 16:47:17 +01:00
214ce77d8f adding debug.py 2021-09-25 16:47:08 +01:00
47a4ac4bc3 adding latest beam pipeline code 2021-09-25 01:44:37 +01:00
aa61ea9c57 adding latest beam pipeline code 2021-09-25 00:48:41 +01:00
94cc22a385 adding data exploration report + code 2021-09-25 00:48:34 +01:00
a05182892a adding documentation 2021-09-25 00:48:13 +01:00
c38a10ca2f updating beam to install gcp extras 2021-09-25 00:48:04 +01:00
ab993fc030 adding prospector.yaml 2021-09-25 00:47:49 +01:00
9cf5662600 updating notes 2021-09-24 16:52:40 +01:00
0f262daf39 adding report.py 2021-09-24 16:52:34 +01:00
972d0a852a updating dev dependencies 2021-09-24 16:52:25 +01:00
5301d5ff04 updating .gitignore 2021-09-24 16:51:56 +01:00
6db2fa59b9 adding initial skeleton 2021-09-24 16:01:03 +01:00
21 changed files with 5056 additions and 1 deletions

145
.gitignore vendored Normal file
View File

@@ -0,0 +1,145 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# USER
# ignore the data csv files for input/output
**/data/**/*.csv
**/data/**/*.json
.vscode/settings.json

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.7.9

View File

@@ -1,2 +1,9 @@
# street_group_tech_test
Technical Test for Street Group
Technical Test for Street Group for Daniel Tomlinson.
## Documentation
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
https://dtomlinson91.github.io/street_group_tech_test/

View File

@@ -0,0 +1,5 @@
"""
Daniel Tomlinson (dtomlinson@panaetius.co.uk).
Technical test for Street Group.
"""

View File

@@ -0,0 +1,30 @@
import apache_beam as beam
class DebugShowEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
yield element
return None
class DebugShowColumnWithValueIn(beam.DoFn):
def __init__(self, index, value):
self.index = index
self.value = value
def process(self, element):
column = element[self.index]
if self.value in column:
yield element
return None
class DebugPrint(beam.DoFn):
def process(self, element):
print(element)
yield element

402
analyse_properties/main.py Normal file
View File

@@ -0,0 +1,402 @@
import argparse
from datetime import datetime
import hashlib
import itertools
import json
import logging
import pathlib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from analyse_properties.debug import * # noqa
def slice_by_range(element, *ranges):
"""
Slice a list with multiple ranges.
Args:
element : The element.
*ranges (tuple): Tuples containing a start,end index to slice the element.
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
Returns:
list: The list sliced by the ranges
"""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
"""
Drop the entire row if a given column is empty.
Args:
index : The index of the column in the list.
Returns:
None: If the length of the column is 0, drop the element.
Yields:
element: If the length of the column is >0, keep the element.
"""
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""
Drop the entire row if both of two given columns are empty.
Args:
index_0 : The index of the first column in the list.
index_1 : The index of the second column in the list.
Returns:
None: If the length of both columns is 0, drop the element.
Yields:
element: If the length of both columns is >0, keep the element.
"""
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
"""
Split one column into two columns by a character.
Args:
index : The index of the column in the list.
split_char: The character to split the column by.
"""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the second result in
# place (street number) and append the first result (building) at the end.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
# append a blank column to keep column numbers consistent.
element.append("")
yield element
class CreateMappingTable(beam.DoFn):
"""
Create a mapping table to be used as a side-input.
This mapping table has a key of an ID generated across all columns and a value of
the raw property data.
The table is used to populate the raw property data after a GroupByKey using
only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
"""
def process(self, element):
# Join the row into a string.
unique_string = ",".join(element)
# Hash the string.
hashed_string = hashlib.md5(unique_string.encode())
# Format the resulting PCollection with the key of id and value of raw data.
new_element = (hashed_string.hexdigest(), list(element))
yield new_element
class CreateUniquePropertyID(beam.DoFn):
"""
Create a unique property ID which does not include the price and date of sale.
Uses each row of the mapping table to create a PCollection with a key of the
unique property ID and a value of the ID generated across all columns.
"""
def process(self, element):
unique_string = ",".join(element[-1][2:])
hashed_string = hashlib.md5(unique_string.encode())
new_element = (hashed_string.hexdigest(), element[0])
yield new_element
class DeduplicateIDs(beam.DoFn):
"""Deduplicate a list of IDs."""
def process(self, element):
deduplicated_list = list(set(element[-1]))
new_element = (element[0], deduplicated_list)
yield new_element
def insert_data_for_id(element, mapping_table):
"""
Replace the ID with the raw data from the mapping table.
Args:
element: The element.
mapping_table (dict): The mapping table.
Yields:
The element with IDs replaced with raw data.
"""
replaced_list = [mapping_table[data_id] for data_id in element[-1]]
new_element = (element[0], replaced_list)
yield new_element
class ConvertDataToDict(beam.DoFn):
"""Convert the processed data into a dict to be exported as a JSON object."""
@staticmethod
def get_latest_transaction(transaction_dates):
"""
Get the date of the latest transaction for a list of dates.
Args:
transaction_dates (str): A date in the form "%Y-%m-%d".
Returns:
str: The year in the form "%Y" of the latest transaction date.
"""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components, address_comparisons):
"""
Create a human readable address from the locality/town/district/county columns.
Args:
address_components (list): The preceeding parts of the address (street, postcode etc.)
address_comparisons (list): The locality/town/district/county.
Returns:
str: The complete address deduplicated & cleaned.
"""
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element):
# Group together all the transactions for the property.
property_transactions = [
{
"price": int(entry[0]),
"transaction_date": entry[1].replace(" 00:00", ""),
"year": int(entry[1][0:4]),
}
for entry in element[-1]
]
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": list(element[-1])[0][4],
"builing": list(element[-1])[0][10],
"number": list(element[-1])[0][3],
"street": list(element[-1])[0][5],
"locality": list(element[-1])[0][6],
"town": list(element[-1])[0][7],
"district": list(element[-1])[0][8],
"county": list(element[-1])[0][9],
"postcode": list(element[-1])[0][2],
"property_transactions": property_transactions,
"latest_transaction_year": int(self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
)),
}
# Create a human readable address to go in the dict.
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
def run(argv=None, save_main_session=True):
"""Entrypoint and definition of the pipeline."""
logging.getLogger().setLevel(logging.INFO)
# Default input/output files
input_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-2020.csv"
# / "pp-complete.csv"
)
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-2020"
# / "pp-complete"
)
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default=str(input_file),
help="Full path to the input file.",
)
parser.add_argument(
"--output",
dest="output",
default=str(output_file),
help="Full path to the output file without extension.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options. save_main_session needed for DataFlow for global imports.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
# Load the data
load = (
pipeline
| "Read input data" >> beam.io.ReadFromText(known_args.input)
| "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element])
)
# Clean the data.
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# Create a mapping table
mapping_table_raw = (
clean_drop
| "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable())
)
# Condense mapping table into a single dict.
mapping_table_condensed = (
mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
)
# Prepare the data by creating IDs, grouping together and using mapping table
# to reinsert raw data.
prepared = (
mapping_table_raw
| "Create unique ID ignoring price & date"
>> beam.ParDo(CreateUniquePropertyID())
| "Group by ID"
>> beam.GroupByKey()
| "Deduplicate to eliminate repeated transactions"
>> beam.ParDo(DeduplicateIDs())
| "Insert the raw data using the mapping table"
>> beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
)
# Format the data into a dict.
formatted = (
prepared
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# Save the data to a .json file.
(
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps, indent=2)
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=known_args.output,
file_name_suffix=".json",
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()

5
docs/index.md Normal file
View File

@@ -0,0 +1,5 @@
# Welcome
## Introduction
This documentation accompanies the technical test for the Street Group.

File diff suppressed because one or more lines are too long

8
download_data.sh Executable file
View File

@@ -0,0 +1,8 @@
# Full data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input
# Monthly update data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input
# 2020 data set
wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input

38
exploration/report.py Normal file
View File

@@ -0,0 +1,38 @@
import pathlib
import pandas as pd
from pandas_profiling import ProfileReport
def main():
input_file = (
pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-complete.csv"
)
with input_file.open() as csv:
df_report = pd.read_csv(
csv,
names=[
"transaction_id",
"price",
"date_of_transfer",
"postcode",
"property_type",
"old_new",
"duration",
"paon",
"saon",
"street",
"locality",
"town_city",
"district",
"county",
"ppd_category",
"record_status",
],
)
profile = ProfileReport(df_report, title="Price Paid Data", minimal=True)
profile.to_file("price_paid_data_report.html")
if __name__ == "__main__":
main()

40
mkdocs.yaml Normal file
View File

@@ -0,0 +1,40 @@
site_name: The Street Group Technical Test
repo_url: https://github.com/dtomlinson91/street_group_tech_test
use_directory_urls: false
nav:
- Documentation:
- Welcome: index.md
# - Installation: documentation/installation.md
# - Usage: documentation/usage.md
# - Comments and Caveats:
# - Introduction: comments_caveats/introduction.md
# - Time limit: comments_caveats/time_limit.md
# - Third party libraries: comments_caveats/third_party_libraries.md
# - Areas of improvement and comments: comments_caveats/area_of_improvement_comments.md
# - Similar names algorithm: comments_caveats/similar_names.md
# - Reference:
# - deduplicator.main: reference/api_documentation_main.md
# - Changelog: changelog/changelog.md
theme:
name: material
palette:
primary: indigo
accent: blue
feature:
tabs: true
markdown_extensions:
- admonition
- codehilite:
guess_lang: true
- toc:
permalink: true
- pymdownx.superfences
# - pymdownx.arithmatex:
# generic: true
plugins:
- search:
lang: en
extra_javascript:
- javascripts/config.js
- https://polyfill.io/v3/polyfill.min.js?features=es6
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js

View File

@@ -0,0 +1,7 @@
# Answers
## CSV
Read a csv file into Beam:
<https://stackoverflow.com/a/41171867>

View File

@@ -0,0 +1,12 @@
# Beam Documentation
## Transforms
FlatMap:
<https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/>
I/O Transforms:
<https://beam.apache.org/documentation/io/built-in/>

View File

@@ -0,0 +1,93 @@
# DataFlow
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
## Examples
Full example of beam pipeline on dataflow:
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset>
## Setup
Export env variable:
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
## Run pipeline
### Dataflow
#### Yearly dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-2
```
#### Full dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-complete.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-complete \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-8 \
--num_workers=3 \
--autoscaling_algorithm=NONE
```
### Locally
Run the pipeline locally:
`python -m analyse_properties.main --runner DirectRunner`
## Errors
Unsubscriptable error on window:
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
## Documentation
Running in its own private VPC without public IPs
- <https://stackoverflow.com/questions/58893082/which-compute-engine-quotas-need-to-be-updated-to-run-dataflow-with-50-workers>
- <https://cloud.google.com/dataflow/docs/guides/specifying-networks#subnetwork_parameter>
Error help
- <https://cloud.google.com/dataflow/docs/guides/common-errors>
- <https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline>
Scaling
Using DataFlowPrime: <https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime#enable-prime>
Use `--experiments=enable_prime`
Deploying a pipeline (with scaling options): <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline>
Available VM types (with pricing): <https://cloud.google.com/compute/vm-instance-pricing#n1_predefined>
Performance
Sideinput performance: <https://stackoverflow.com/questions/48242320/google-dataflow-apache-beam-python-side-input-from-pcollection-kills-perform>
Common use cases:
- Part 1 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
- Part 2 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2>

5
notes/links.md Normal file
View File

@@ -0,0 +1,5 @@
# Links
## Data
https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads

27
notes/tmp/errordata Normal file
View File

@@ -0,0 +1,27 @@
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 181, in execute
op.finish()
File "dataflow_worker/native_operations.py", line 93, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 94, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 95, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/nativeavroio.py", line 308, in __exit__
self._data_file_writer.flush()
File "fastavro/_write.pyx", line 664, in fastavro._write.Writer.flush
File "fastavro/_write.pyx", line 639, in fastavro._write.Writer.dump
File "fastavro/_write.pyx", line 451, in fastavro._write.snappy_write_block
File "fastavro/_write.pyx", line 458, in fastavro._write.snappy_write_block
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystemio.py", line 200, in write
self._uploader.put(b)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py", line 720, in put
self._conn.send_bytes(data.tobytes())
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
"
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"

44
notes/tmp/exampledata Normal file
View File

@@ -0,0 +1,44 @@
[{
"property_id": "3cf3c06632c46754696f2017933702f3",
"flat_appartment": "",
"builing": "",
"number": "63",
"street": "ROTTON PARK STREET",
"locality": "",
"town": "BIRMINGHAM",
"district": "BIRMINGHAM",
"county": "WEST MIDLANDS",
"postcode": "B16 0AE",
"property_transactions": [
{ "price": "385000", "transaction_date": "2021-01-08", "year": "2021" },
{ "price": "701985", "transaction_date": "2019-03-28", "year": "2019" },
{ "price": "1748761", "transaction_date": "2020-05-27", "year": "2020" }
],
"latest_transaction_year": "2021"
},
{
"property_id": "c650d5d7bb0daf0a19bb2cacabbee74e",
"readable_address": "16 STATION ROAD\nPARKGATE\nNESTON\nCHESHIRE WEST AND CHESTER\nCH64 6QJ",
"flat_appartment": "",
"builing": "",
"number": "16",
"street": "STATION ROAD",
"locality": "PARKGATE",
"town": "NESTON",
"district": "CHESHIRE WEST AND CHESTER",
"county": "CHESHIRE WEST AND CHESTER",
"postcode": "CH64 6QJ",
"property_transactions": [
{
"price": "280000",
"transaction_date": "2020-11-30",
"year": "2020"
},
{
"price": "265000",
"transaction_date": "2020-05-29",
"year": "2020"
}
],
"latest_transaction_year": "2020"
}]

16
notes/tmp/runningdata Normal file
View File

@@ -0,0 +1,16 @@
Create Mapping table
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
Condensing
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
Prepared
GroupByKey
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
deduplicated
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])

2628
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

80
prospector.yaml Normal file
View File

@@ -0,0 +1,80 @@
# This will enable almost every single warning
output-format: vscode
doc-warnings: true
# allow-shorthand: false
strictness: none
ignore-patterns:
- (^|/)\..+
pylint:
run: true
disable:
- fixme
- bad-continuation
- missing-module-docstring
- logging-fstring-interpolation
- missing-function-docstring
- abstract-method
- missing-class-docstring
- super-init-not-called
- arguments-differ
- inconsistent-return-statements
- expression-not-assigned
- line-too-long
enable:
options:
max-locals: 15
max-returns: 6
max-branches: 12
max-statements: 50
# max-parents: 7
max-attributes: 20
min-public-methods: 0
max-public-methods: 25
max-module-lines: 1000
max-line-length: 88
mccabe:
run: true
options:
max-complexity: 10
pep8:
run: true
options:
max-line-length: 88
single-line-if-stmt: n
disable:
- E501 # line too long
pyroma:
run: false
disable:
- PYR19
- PYR16
pep257:
disable:
- D000
- D203
# - D213
- D212 # multiline docstrings on first line only.
- D404
- D100
- D407 # numpy docstring format
- D107 # missing docstring in __init__
# Docstrings ending with newlines and : in Returns block.
- D413
- D406
- D103
- D101 # missing docstring in public class
- D102 # missing docstring in public method
pyflakes:
disable:
- F401 # unused import
dodgy:
run: true

23
pyproject.toml Normal file
View File

@@ -0,0 +1,23 @@
[tool.poetry]
name = "analyse_properties"
version = "0.1.0"
description = ""
authors = ["Daniel Tomlinson <dtomlinson@panaetius.co.uk>"]
[tool.poetry.dependencies]
python = "^3.7"
apache-beam = {extras = ["gcp"], version = "^2.32.0"}
mkdocs = "^1.2.2"
mkdocs-material = "^7.3.0"
[tool.poetry.dev-dependencies]
# pytest = "^5.2"
prospector = "^1.5.1"
pandas-profiling = "^3.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
"analyse-properties" = "analyse_properties.main:run"