diff --git a/dataflow/index.html b/dataflow/index.html index 9319d04..577f8d9 100644 --- a/dataflow/index.html +++ b/dataflow/index.html @@ -699,10 +699,6 @@
We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker.
Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to:
Caution
-Use the command python -m analyse_properties.main as the entrypoint to the pipeline and not analyse-properties as the module isn't installed with poetry on the workers with the configuration below.
python -m analyse_properties.main \
--runner DataflowRunner \
--project street-group \
diff --git a/dataflow/scaling.html b/dataflow/scaling.html
index 6e7c7a2..2f882a1 100644
--- a/dataflow/scaling.html
+++ b/dataflow/scaling.html
@@ -686,7 +686,7 @@ struct.error: 'i' format requires -2147483648 <= number <= 2147483
Solution¶
A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row.
In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data.
-Remove the condense mapping table stage as it is no longer needed.
+Remove the condense mapping table stage as it is no longer needed (which also saves a bit of time).
Instead of using:
beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
diff --git a/discussion/approach.html b/discussion/approach.html
index 7433b69..8f4f1a9 100644
--- a/discussion/approach.html
+++ b/discussion/approach.html
@@ -710,7 +710,7 @@
The key being the id across all columns (id_all_columns).
The value being the raw data as an array.
-The mapping table is then condensed to a single dictionary with these key, value pairs and is used as a side input further down the pipeline.
+The mapping table is then condensed to a single dictionary with these key, value pairs (automatically deduplicating repeated rows) and is used as a side input further down the pipeline.
This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey, the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.
Prepare stage¶
diff --git a/discussion/cleaning.html b/discussion/cleaning.html
index 98173d1..c28a91a 100644
--- a/discussion/cleaning.html
+++ b/discussion/cleaning.html
@@ -794,7 +794,7 @@
Repeated rows¶
Some of the data is repeated:
-- Some rows repeated, with the same date + price + address information but with a unique transaction id.
+- Some rows are repeated, with the same date + price + address information but with a unique transaction id.
Example (PCollection)
@@ -816,7 +816,7 @@
]
},
{
- "fd4634faec47c29de40bbf7840723b41": [
+ "gd4634faec47c29de40bbf7840723b42": [
"317500",
"2020-11-13 00:00",
"B90 3LA",
diff --git a/documentation/installation.html b/documentation/installation.html
index fd35afa..8fee2e6 100644
--- a/documentation/installation.html
+++ b/documentation/installation.html
@@ -358,8 +358,15 @@
-
-
- Poetry
+
+ Pip
+
+
+
+
+ -
+
+ Poetry (Alternative)
@@ -600,8 +607,15 @@
-
-
- Poetry
+
+ Pip
+
+
+
+
+ -
+
+ Poetry (Alternative)
@@ -627,18 +641,18 @@
The task is written in Python 3.7.9 using Apache Beam 2.32.0. Python versions 3.6.14 and 3.8.11 should also be compatible but have not been tested.
The task has been tested on MacOS Big Sur and WSL2. The task should run on Windows but this wasn't tested.
For Beam 2.32.0 the supported versions of the Python SDK can be found here.
-Poetry¶
-The test uses Poetry for dependency management.
-
-Info
-If you already have Poetry installed globally you can go straight to the poetry install step.
-
-In a virtual environment install poetry:
-pip install poetry
+Pip¶
+In a virtual environment run from the root of the repo:
+pip install -r requirements.txt
+Poetry (Alternative)¶
+Install Poetry globally
From the root of the repo install the dependencies with:
poetry install --no-dev
+Activate the shell with:
+poetry shell
+
diff --git a/documentation/usage.html b/documentation/usage.html
index d5bd633..2ae6c80 100644
--- a/documentation/usage.html
+++ b/documentation/usage.html
@@ -667,7 +667,7 @@
Usage¶
This page documents how to run the pipeline locally to complete the task for the dataset for 2020.
-The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here. We also discuss how to adapt the pipeline so it can run against the full dataset.
+The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here. We also discuss how to adapt the pipeline so it can run against the full dataset.
Download dataset¶
The input data by default should go in ./data/input.
For convenience the data is available publicly in a GCP Cloud Storage bucket.
@@ -676,13 +676,13 @@
to download the data for 2020 and place in the input directory above.
Entrypoint¶
-The entrypoint to the pipeline is analyse-properties.
+The entrypoint to the pipeline is analyse_properties.main.
Available options¶
Running
-analyse-properties --help
+python -m analyse_properties.main --help
gives the following output:
-usage: analyse-properties [-h] [--input INPUT] [--output OUTPUT]
+usage: analyse_properties.main [-h] [--input INPUT] [--output OUTPUT]
optional arguments:
-h, --help show this help message and exit
@@ -690,11 +690,14 @@ optional arguments:
--output OUTPUT Full path to the output file without extension.
The default value for input is ./data/input/pp-2020.csv and the default value for output is ./data/output/pp-2020.
-If passing in values for input/output these should be full paths to the files. The test will parse these inputs as a str() and pass this to beam.io.ReadFromText().
Run the pipeline¶
To run the pipeline and complete the task run:
-analyse-properties --runner DirectRunner
+python -m analyse_properties.main \
+--runner DirectRunner \
+--input ./data/input/pp-2020.csv \
+--output ./data/output/pp-2020
+from the root of the repo.
The pipeline will use the 2020 dataset located in ./data/input and output the resulting .json to ./data/output.
diff --git a/index.html b/index.html
index a7c31e1..41ab617 100644
--- a/index.html
+++ b/index.html
@@ -626,7 +626,7 @@
Welcome¶
Introduction¶
This documentation accompanies the technical test for the Street Group.
-The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and any improvements that could be made.
+The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and scaling the pipeline.
Navigate sections using the tabs at the top of the page. Pages in this section can be viewed in order by using the section links in the left menu, or by using bar at the bottom of the page. The table of contents in the right menu can be used to navigate sections on each page.
Note
diff --git a/search/search_index.json b/search/search_index.json
index c89abf5..5a659eb 100644
--- a/search/search_index.json
+++ b/search/search_index.json
@@ -1 +1 @@
-{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"index.html","text":"Welcome \u00b6 Introduction \u00b6 This documentation accompanies the technical test for the Street Group. The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and any improvements that could be made. Navigate sections using the tabs at the top of the page. Pages in this section can be viewed in order by using the section links in the left menu, or by using bar at the bottom of the page. The table of contents in the right menu can be used to navigate sections on each page. Note All paths in this documentation, e.g ./analyse_properties/data/output refer to the location of the directory/file from the root of the repo.","title":"Welcome"},{"location":"index.html#welcome","text":"","title":"Welcome"},{"location":"index.html#introduction","text":"This documentation accompanies the technical test for the Street Group. The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and any improvements that could be made. Navigate sections using the tabs at the top of the page. Pages in this section can be viewed in order by using the section links in the left menu, or by using bar at the bottom of the page. The table of contents in the right menu can be used to navigate sections on each page. Note All paths in this documentation, e.g ./analyse_properties/data/output refer to the location of the directory/file from the root of the repo.","title":"Introduction"},{"location":"dataflow/index.html","text":"Running on DataFlow \u00b6 The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP. Prerequisites \u00b6 Cloud Storage \u00b6 A Cloud Storage bucket with the following structure: ./input ./output ./tmp Place the input files into the ./input directory in the bucket. VPC \u00b6 To get around public IP quotas I created a VPC in the europe-west1 region that has Private Google Access turned to ON . Command \u00b6 Tip We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker. Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to: Caution Use the command python -m analyse_properties.main as the entrypoint to the pipeline and not analyse-properties as the module isn't installed with poetry on the workers with the configuration below. python -m analyse_properties.main \\ --runner DataflowRunner \\ --project street-group \\ --region europe-west1 \\ --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \\ --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \\ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \\ --subnetwork = https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \\ --no_use_public_ips \\ --worker_machine_type = n1-highmem-2 The output file from this pipeline is publically available and can be downloaded here . The job graph for this pipeline is displayed below:","title":"Running on DataFlow"},{"location":"dataflow/index.html#running-on-dataflow","text":"The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP.","title":"Running on DataFlow"},{"location":"dataflow/index.html#prerequisites","text":"","title":"Prerequisites"},{"location":"dataflow/index.html#cloud-storage","text":"A Cloud Storage bucket with the following structure: ./input ./output ./tmp Place the input files into the ./input directory in the bucket.","title":"Cloud Storage"},{"location":"dataflow/index.html#vpc","text":"To get around public IP quotas I created a VPC in the europe-west1 region that has Private Google Access turned to ON .","title":"VPC"},{"location":"dataflow/index.html#command","text":"Tip We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker. Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to: Caution Use the command python -m analyse_properties.main as the entrypoint to the pipeline and not analyse-properties as the module isn't installed with poetry on the workers with the configuration below. python -m analyse_properties.main \\ --runner DataflowRunner \\ --project street-group \\ --region europe-west1 \\ --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \\ --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \\ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \\ --subnetwork = https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \\ --no_use_public_ips \\ --worker_machine_type = n1-highmem-2 The output file from this pipeline is publically available and can be downloaded here . The job graph for this pipeline is displayed below:","title":"Command"},{"location":"dataflow/scaling.html","text":"Scaling to the full DataSet \u00b6 As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows. Mapping table \u00b6 Using a mapping table as a side-input means that for the full dataset this table is going to be huge. Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied. Running the pipeline against the full dataset resulted in the following error: \"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900\" with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated. Using a higher tiered worker_machine_type , disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options: --worker_machine_type = n1-highmem-8 \\ --num_workers = 3 \\ --autoscaling_algorithm = NONE with 156GB of RAM available to the pipeline with 52GB on each worker. The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down: \"Error message from worker: Traceback (most recent call last): File \"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py\", line 651, in do_work work_executor.execute() ... File \"/usr/local/lib/python3.7/multiprocessing/connection.py\", line 393, in _send_bytes header = struct.pack(\"!i\", n) struct.error: 'i' format requires -2147483648 <= number <= 2147483647 The number 2147483647 being the maximum value for a 32bit integer. As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem. Patterns \u00b6 Google have several patterns for large side-inputs which are documented here: Part 1 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1 Part 2 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2 Solution \u00b6 A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row. In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data. Remove the condense mapping table stage as it is no longer needed. Instead of using: beam . FlatMap ( insert_data_for_id , beam . pvalue . AsSingleton ( mapping_table_condensed ) ) to insert the results of the mapping table we write a new DoFn that takes the element, and for each id_all_columns in the array we make a call to BigQuery to get the array for this ID and insert it at this stage. Because each id_all_columns and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.","title":"Scaling to the Full DataSet"},{"location":"dataflow/scaling.html#scaling-to-the-full-dataset","text":"As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows.","title":"Scaling to the full DataSet"},{"location":"dataflow/scaling.html#mapping-table","text":"Using a mapping table as a side-input means that for the full dataset this table is going to be huge. Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied. Running the pipeline against the full dataset resulted in the following error: \"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900\" with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated. Using a higher tiered worker_machine_type , disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options: --worker_machine_type = n1-highmem-8 \\ --num_workers = 3 \\ --autoscaling_algorithm = NONE with 156GB of RAM available to the pipeline with 52GB on each worker. The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down: \"Error message from worker: Traceback (most recent call last): File \"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py\", line 651, in do_work work_executor.execute() ... File \"/usr/local/lib/python3.7/multiprocessing/connection.py\", line 393, in _send_bytes header = struct.pack(\"!i\", n) struct.error: 'i' format requires -2147483648 <= number <= 2147483647 The number 2147483647 being the maximum value for a 32bit integer. As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem.","title":"Mapping table"},{"location":"dataflow/scaling.html#patterns","text":"Google have several patterns for large side-inputs which are documented here: Part 1 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1 Part 2 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2","title":"Patterns"},{"location":"dataflow/scaling.html#solution","text":"A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row. In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data. Remove the condense mapping table stage as it is no longer needed. Instead of using: beam . FlatMap ( insert_data_for_id , beam . pvalue . AsSingleton ( mapping_table_condensed ) ) to insert the results of the mapping table we write a new DoFn that takes the element, and for each id_all_columns in the array we make a call to BigQuery to get the array for this ID and insert it at this stage. Because each id_all_columns and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.","title":"Solution"},{"location":"discussion/approach.html","text":"Approach \u00b6 The general approach to the pipeline is: Loading stage \u00b6 Load using beam . io . ReadFromText () Split the string loaded by , as it's a comma delimited .csv . Strip the leading/trailing \" marks. The result is an array with each element representing a single column in that row. Cleaning stage \u00b6 Already discussed. Create a mapping table \u00b6 The mapping table takes each row and creates a (key,value) pair with: The key being the id across all columns ( id_all_columns ). The value being the raw data as an array. The mapping table is then condensed to a single dictionary with these key, value pairs and is used as a side input further down the pipeline. This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey , the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation. Prepare stage \u00b6 Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date ( id_without_price_date ). This id will not be unique: for properties with more than one transaction they will share this id. Create a (key, value) pair with: The key being id_without_price_date . The value being id_all_columns . Group by id_without_price_date . This results in a PCollection that looks like: (id_without_price_date, [id_all_columns,...]) Deduplicate the id_all_columns inside this array to eliminate repeated rows that are exactly the same. Use the mapping table as a side input to reinsert the raw data using the id_all_columns . Example for No.1 B90 3LA Mapping table (pre condensed): (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Mapping table (condensed): { ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' : [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ]} Prepared (key, value): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') Prepared (GroupByKey): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Prepared (Deduplicated): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Use mapping table as side input: (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Format stage \u00b6 This stage takes the result and constructs a json object out of the grouped data. The schema for this output is discussed in the following page. Save stage \u00b6 The PCollection is combined with beam . combiners . ToList () Apply json.dumps() for proper quotation marks for strings. Write to text with beam . io . WriteToText .","title":"Approach"},{"location":"discussion/approach.html#approach","text":"The general approach to the pipeline is:","title":"Approach"},{"location":"discussion/approach.html#loading-stage","text":"Load using beam . io . ReadFromText () Split the string loaded by , as it's a comma delimited .csv . Strip the leading/trailing \" marks. The result is an array with each element representing a single column in that row.","title":"Loading stage"},{"location":"discussion/approach.html#cleaning-stage","text":"Already discussed.","title":"Cleaning stage"},{"location":"discussion/approach.html#create-a-mapping-table","text":"The mapping table takes each row and creates a (key,value) pair with: The key being the id across all columns ( id_all_columns ). The value being the raw data as an array. The mapping table is then condensed to a single dictionary with these key, value pairs and is used as a side input further down the pipeline. This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey , the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.","title":"Create a mapping table"},{"location":"discussion/approach.html#prepare-stage","text":"Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date ( id_without_price_date ). This id will not be unique: for properties with more than one transaction they will share this id. Create a (key, value) pair with: The key being id_without_price_date . The value being id_all_columns . Group by id_without_price_date . This results in a PCollection that looks like: (id_without_price_date, [id_all_columns,...]) Deduplicate the id_all_columns inside this array to eliminate repeated rows that are exactly the same. Use the mapping table as a side input to reinsert the raw data using the id_all_columns . Example for No.1 B90 3LA Mapping table (pre condensed): (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Mapping table (condensed): { ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' : [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ]} Prepared (key, value): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') Prepared (GroupByKey): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Prepared (Deduplicated): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Use mapping table as side input: (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] )","title":"Prepare stage"},{"location":"discussion/approach.html#format-stage","text":"This stage takes the result and constructs a json object out of the grouped data. The schema for this output is discussed in the following page.","title":"Format stage"},{"location":"discussion/approach.html#save-stage","text":"The PCollection is combined with beam . combiners . ToList () Apply json.dumps() for proper quotation marks for strings. Write to text with beam . io . WriteToText .","title":"Save stage"},{"location":"discussion/cleaning.html","text":"Cleaning \u00b6 In this page we discuss the cleaning stages and how best to prepare the data. Uniquely identify a property. \u00b6 To uniquely identify a property with the data we have it is enough to have a Postcode and the PAON (or SAON or combination of both). Postcode \u00b6 Because so few properties are missing a postcode (0.2% of all records) we will drop all rows that do not have one. We will drop some properties that could be identified uniquely with some more work, but the properties that are missing a postcode tend to be unusual/commercial/industrial (e.g a powerplant). PAON/SAON \u00b6 The PAON has 3 possible formats: The street number. The building name. The building name and street number (comma delimited). The SAON: Identifies the appartment/flat number for the building. If the SAON is present (only 11.7% of values) then the PAON will either be The building name. The building name and street number. Because of the way the PAON and SOAN are defined, if any row is missing both of these columns we will drop it. As only having the postcode is not enough (generally speaking) to uniquely identify a property. Tip In a production environment we could send these rows to a sink table (in BigQuery for example), rather than drop them outright. Collecting these rows over time might show some patterns on how we can uniquely identify properties that are missing these fields. We split the PAON as part of the cleaning stage. If the PAON contains a comma then it contains the building name and street number. We keep the street number in the same position as the PAON and insert the building name as a new column at the end of the row. If the PAON does not contain a comma we insert a blank column at the end to keep the number of columns in the PCollection consistent. Unneeded columns \u00b6 To try keep computation costs/time down, I decided to drop the categorical columns provided. These include: Property Type. Old/New. Duration. PPD Category Type. Record Status - monthly file only. Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing. These columns are also not consistent. E.g the property 63 B16, 0AE has three transactions. Two of these transactions have a property type of Other and one transaction has a property type of Terraced . These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change. In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity \\(\\mathcal{O}(n)\\) . General cleaning \u00b6 Upper case \u00b6 As all strings in the dataset are upper case, we convert everything in the row to upper case to enforce consistency across the dataset. Strip leading/trailing whitespace \u00b6 We strip all leading/trailing whitespace from each column to enforce consistency. Repeated rows \u00b6 Some of the data is repeated: Some rows repeated, with the same date + price + address information but with a unique transaction id. Example (PCollection) [ { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] }, { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] } ] These rows will be deduplicated as part of the pipeline. Some rows have the same date + address information, but different prices. It would be very unusual to see multiple transactions on the same date for the same property. One reason could be that there was a data entry error, resulting in two different transactions with only one being the real price. As the date column does not contain the time (it is fixed at 00:00 ) it is impossible to tell. Another reason could be missing building/flat/appartment information in this entry. We keep these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out. Example (Output) [ { \"property_id\" : \"20d5c335c8d822a40baab0ecd57e92a4\" , \"readable_address\" : \"53 PAVENHAM DRIVE\\nBIRMINGHAM\\nWEST MIDLANDS\\nB5 7TN\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"53\" , \"street\" : \"PAVENHAM DRIVE\" , \"locality\" : \"\" , \"town\" : \"BIRMINGHAM\" , \"district\" : \"BIRMINGHAM\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B5 7TN\" , \"property_transactions\" : [ { \"price\" : 270000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 }, { \"price\" : 364000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ]","title":"Cleaning"},{"location":"discussion/cleaning.html#cleaning","text":"In this page we discuss the cleaning stages and how best to prepare the data.","title":"Cleaning"},{"location":"discussion/cleaning.html#uniquely-identify-a-property","text":"To uniquely identify a property with the data we have it is enough to have a Postcode and the PAON (or SAON or combination of both).","title":"Uniquely identify a property."},{"location":"discussion/cleaning.html#postcode","text":"Because so few properties are missing a postcode (0.2% of all records) we will drop all rows that do not have one. We will drop some properties that could be identified uniquely with some more work, but the properties that are missing a postcode tend to be unusual/commercial/industrial (e.g a powerplant).","title":"Postcode"},{"location":"discussion/cleaning.html#paonsaon","text":"The PAON has 3 possible formats: The street number. The building name. The building name and street number (comma delimited). The SAON: Identifies the appartment/flat number for the building. If the SAON is present (only 11.7% of values) then the PAON will either be The building name. The building name and street number. Because of the way the PAON and SOAN are defined, if any row is missing both of these columns we will drop it. As only having the postcode is not enough (generally speaking) to uniquely identify a property. Tip In a production environment we could send these rows to a sink table (in BigQuery for example), rather than drop them outright. Collecting these rows over time might show some patterns on how we can uniquely identify properties that are missing these fields. We split the PAON as part of the cleaning stage. If the PAON contains a comma then it contains the building name and street number. We keep the street number in the same position as the PAON and insert the building name as a new column at the end of the row. If the PAON does not contain a comma we insert a blank column at the end to keep the number of columns in the PCollection consistent.","title":"PAON/SAON"},{"location":"discussion/cleaning.html#unneeded-columns","text":"To try keep computation costs/time down, I decided to drop the categorical columns provided. These include: Property Type. Old/New. Duration. PPD Category Type. Record Status - monthly file only. Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing. These columns are also not consistent. E.g the property 63 B16, 0AE has three transactions. Two of these transactions have a property type of Other and one transaction has a property type of Terraced . These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change. In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity \\(\\mathcal{O}(n)\\) .","title":"Unneeded columns"},{"location":"discussion/cleaning.html#general-cleaning","text":"","title":"General cleaning"},{"location":"discussion/cleaning.html#upper-case","text":"As all strings in the dataset are upper case, we convert everything in the row to upper case to enforce consistency across the dataset.","title":"Upper case"},{"location":"discussion/cleaning.html#strip-leadingtrailing-whitespace","text":"We strip all leading/trailing whitespace from each column to enforce consistency.","title":"Strip leading/trailing whitespace"},{"location":"discussion/cleaning.html#repeated-rows","text":"Some of the data is repeated: Some rows repeated, with the same date + price + address information but with a unique transaction id. Example (PCollection) [ { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] }, { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] } ] These rows will be deduplicated as part of the pipeline. Some rows have the same date + address information, but different prices. It would be very unusual to see multiple transactions on the same date for the same property. One reason could be that there was a data entry error, resulting in two different transactions with only one being the real price. As the date column does not contain the time (it is fixed at 00:00 ) it is impossible to tell. Another reason could be missing building/flat/appartment information in this entry. We keep these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out. Example (Output) [ { \"property_id\" : \"20d5c335c8d822a40baab0ecd57e92a4\" , \"readable_address\" : \"53 PAVENHAM DRIVE\\nBIRMINGHAM\\nWEST MIDLANDS\\nB5 7TN\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"53\" , \"street\" : \"PAVENHAM DRIVE\" , \"locality\" : \"\" , \"town\" : \"BIRMINGHAM\" , \"district\" : \"BIRMINGHAM\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B5 7TN\" , \"property_transactions\" : [ { \"price\" : 270000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 }, { \"price\" : 364000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ]","title":"Repeated rows"},{"location":"discussion/exploration.html","text":"Data Exploration Report \u00b6 A brief exploration was done on the full dataset using the module pandas-profiling . The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc. and produces a report .html file that can be viewed interatively in your browser. The script used to generate this report is located in ./exploration/report.py and can be viewed below. report.py import pathlib import pandas as pd from pandas_profiling import ProfileReport def main (): input_file = ( pathlib . Path ( __file__ ) . parents [ 1 ] / \"data\" / \"input\" / \"pp-complete.csv\" ) with input_file . open () as csv : df_report = pd . read_csv ( csv , names = [ \"transaction_id\" , \"price\" , \"date_of_transfer\" , \"postcode\" , \"property_type\" , \"old_new\" , \"duration\" , \"paon\" , \"saon\" , \"street\" , \"locality\" , \"town_city\" , \"district\" , \"county\" , \"ppd_category\" , \"record_status\" , ], ) profile = ProfileReport ( df_report , title = \"Price Paid Data\" , minimal = True ) profile . to_file ( \"price_paid_data_report.html\" ) if __name__ == \"__main__\" : main () The report can be viewed by clicking the Data Exploration Report tab at the top of the page. Interesting observations \u00b6 When looking at the report we are looking for data quality and missing observations. The statistics are interesting to see but are largely irrelevant for this task. The data overall looks very good for a dataset of its size (~27 million records). For important fields there are no missing values: Every row has a price. Every row has a unique transaction ID. Every row has a transaction date. Some fields that we will need are missing data: ~42,000 (0.2%) are missing a Postcode. ~4,000 (<0.1%) are missing a PAON (primary addressable object name). ~412,000 (1.6%) are missing a Street Name.","title":"Data Exploration Report"},{"location":"discussion/exploration.html#data-exploration-report","text":"A brief exploration was done on the full dataset using the module pandas-profiling . The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc. and produces a report .html file that can be viewed interatively in your browser. The script used to generate this report is located in ./exploration/report.py and can be viewed below. report.py import pathlib import pandas as pd from pandas_profiling import ProfileReport def main (): input_file = ( pathlib . Path ( __file__ ) . parents [ 1 ] / \"data\" / \"input\" / \"pp-complete.csv\" ) with input_file . open () as csv : df_report = pd . read_csv ( csv , names = [ \"transaction_id\" , \"price\" , \"date_of_transfer\" , \"postcode\" , \"property_type\" , \"old_new\" , \"duration\" , \"paon\" , \"saon\" , \"street\" , \"locality\" , \"town_city\" , \"district\" , \"county\" , \"ppd_category\" , \"record_status\" , ], ) profile = ProfileReport ( df_report , title = \"Price Paid Data\" , minimal = True ) profile . to_file ( \"price_paid_data_report.html\" ) if __name__ == \"__main__\" : main () The report can be viewed by clicking the Data Exploration Report tab at the top of the page.","title":"Data Exploration Report"},{"location":"discussion/exploration.html#interesting-observations","text":"When looking at the report we are looking for data quality and missing observations. The statistics are interesting to see but are largely irrelevant for this task. The data overall looks very good for a dataset of its size (~27 million records). For important fields there are no missing values: Every row has a price. Every row has a unique transaction ID. Every row has a transaction date. Some fields that we will need are missing data: ~42,000 (0.2%) are missing a Postcode. ~4,000 (<0.1%) are missing a PAON (primary addressable object name). ~412,000 (1.6%) are missing a Street Name.","title":"Interesting observations"},{"location":"discussion/introduction.html","text":"Introduction \u00b6 This section will go through some discussion of the test including: Data exploration Cleaning the data Interpreting the results","title":"Introduction"},{"location":"discussion/introduction.html#introduction","text":"This section will go through some discussion of the test including: Data exploration Cleaning the data Interpreting the results","title":"Introduction"},{"location":"discussion/results.html","text":"Results \u00b6 The resulting output .json looks like (for the previous example using No. 1 B90 3LA ): [ { \"property_id\" : \"fe205bfe66bc7f18c50c8f3d77ec3e30\" , \"readable_address\" : \"1 VERSTONE ROAD\\nSHIRLEY\\nSOLIHULL\\nWEST MIDLANDS\\nB90 3LA\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"1\" , \"street\" : \"VERSTONE ROAD\" , \"locality\" : \"SHIRLEY\" , \"town\" : \"SOLIHULL\" , \"district\" : \"SOLIHULL\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B90 3LA\" , \"property_transactions\" : [ { \"price\" : 317500 , \"transaction_date\" : \"2020-11-13\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ] The standard property information is included, we will briefly discuss the additional fields included in this output file. readable_address \u00b6 The components that make up the address in the dataset are often repetitive, with the locality, town/city, district and county often sharing the same result. This can result in hard to read addresses if we just stacked all the components sequentially. The readable_address provides an easy to read address that strips this repetiveness out, by doing pairwise comparisons to each of the four components and applying a mask. The result is an address that could be served to the end user, or easily displayed on a page. This saves any user having to apply the same logic to simply display the address somewhere, the full address of a property should be easy to read and easily accessible. property_transactions \u00b6 This array contains an object for each transaction for that property that has the price and year as an int , with the date having the 00:00 time stripped out. latest_transaction_year \u00b6 The date of the latest transaction is extracted from the array of property_transactions and placed in the top level of the json object. This allows any end user to easily search for properties that haven't been sold in a period of time, without having to write this logic themselves. A consumer should be able to use this data to answer questions like: Give me all properties in the town of Solihull that haven't been sold in the past 10 years.","title":"Results"},{"location":"discussion/results.html#results","text":"The resulting output .json looks like (for the previous example using No. 1 B90 3LA ): [ { \"property_id\" : \"fe205bfe66bc7f18c50c8f3d77ec3e30\" , \"readable_address\" : \"1 VERSTONE ROAD\\nSHIRLEY\\nSOLIHULL\\nWEST MIDLANDS\\nB90 3LA\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"1\" , \"street\" : \"VERSTONE ROAD\" , \"locality\" : \"SHIRLEY\" , \"town\" : \"SOLIHULL\" , \"district\" : \"SOLIHULL\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B90 3LA\" , \"property_transactions\" : [ { \"price\" : 317500 , \"transaction_date\" : \"2020-11-13\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ] The standard property information is included, we will briefly discuss the additional fields included in this output file.","title":"Results"},{"location":"discussion/results.html#readable_address","text":"The components that make up the address in the dataset are often repetitive, with the locality, town/city, district and county often sharing the same result. This can result in hard to read addresses if we just stacked all the components sequentially. The readable_address provides an easy to read address that strips this repetiveness out, by doing pairwise comparisons to each of the four components and applying a mask. The result is an address that could be served to the end user, or easily displayed on a page. This saves any user having to apply the same logic to simply display the address somewhere, the full address of a property should be easy to read and easily accessible.","title":"readable_address"},{"location":"discussion/results.html#property_transactions","text":"This array contains an object for each transaction for that property that has the price and year as an int , with the date having the 00:00 time stripped out.","title":"property_transactions"},{"location":"discussion/results.html#latest_transaction_year","text":"The date of the latest transaction is extracted from the array of property_transactions and placed in the top level of the json object. This allows any end user to easily search for properties that haven't been sold in a period of time, without having to write this logic themselves. A consumer should be able to use this data to answer questions like: Give me all properties in the town of Solihull that haven't been sold in the past 10 years.","title":"latest_transaction_year"},{"location":"documentation/installation.html","text":"Installation \u00b6 The task is written in Python 3.7.9 using Apache Beam 2.32.0. Python versions 3.6.14 and 3.8.11 should also be compatible but have not been tested. The task has been tested on MacOS Big Sur and WSL2. The task should run on Windows but this wasn't tested. For Beam 2.32.0 the supported versions of the Python SDK can be found here . Poetry \u00b6 The test uses Poetry for dependency management. Info If you already have Poetry installed globally you can go straight to the poetry install step. In a virtual environment install poetry: pip install poetry From the root of the repo install the dependencies with: poetry install --no-dev","title":"Installation"},{"location":"documentation/installation.html#installation","text":"The task is written in Python 3.7.9 using Apache Beam 2.32.0. Python versions 3.6.14 and 3.8.11 should also be compatible but have not been tested. The task has been tested on MacOS Big Sur and WSL2. The task should run on Windows but this wasn't tested. For Beam 2.32.0 the supported versions of the Python SDK can be found here .","title":"Installation"},{"location":"documentation/installation.html#poetry","text":"The test uses Poetry for dependency management. Info If you already have Poetry installed globally you can go straight to the poetry install step. In a virtual environment install poetry: pip install poetry From the root of the repo install the dependencies with: poetry install --no-dev","title":"Poetry"},{"location":"documentation/usage.html","text":"Usage \u00b6 This page documents how to run the pipeline locally to complete the task for the dataset for 2020 . The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here. We also discuss how to adapt the pipeline so it can run against the full dataset . Download dataset \u00b6 The input data by default should go in ./data/input . For convenience the data is available publicly in a GCP Cloud Storage bucket. Run: wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input to download the data for 2020 and place in the input directory above. Entrypoint \u00b6 The entrypoint to the pipeline is analyse-properties . Available options \u00b6 Running analyse-properties --help gives the following output: usage: analyse-properties [ -h ] [ --input INPUT ] [ --output OUTPUT ] optional arguments: -h, --help show this help message and exit --input INPUT Full path to the input file. --output OUTPUT Full path to the output file without extension. The default value for input is ./data/input/pp-2020.csv and the default value for output is ./data/output/pp-2020 . If passing in values for input / output these should be full paths to the files. The test will parse these inputs as a str() and pass this to beam . io . ReadFromText () . Run the pipeline \u00b6 To run the pipeline and complete the task run: analyse-properties --runner DirectRunner The pipeline will use the 2020 dataset located in ./data/input and output the resulting .json to ./data/output .","title":"Usage"},{"location":"documentation/usage.html#usage","text":"This page documents how to run the pipeline locally to complete the task for the dataset for 2020 . The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here. We also discuss how to adapt the pipeline so it can run against the full dataset .","title":"Usage"},{"location":"documentation/usage.html#download-dataset","text":"The input data by default should go in ./data/input . For convenience the data is available publicly in a GCP Cloud Storage bucket. Run: wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input to download the data for 2020 and place in the input directory above.","title":"Download dataset"},{"location":"documentation/usage.html#entrypoint","text":"The entrypoint to the pipeline is analyse-properties .","title":"Entrypoint"},{"location":"documentation/usage.html#available-options","text":"Running analyse-properties --help gives the following output: usage: analyse-properties [ -h ] [ --input INPUT ] [ --output OUTPUT ] optional arguments: -h, --help show this help message and exit --input INPUT Full path to the input file. --output OUTPUT Full path to the output file without extension. The default value for input is ./data/input/pp-2020.csv and the default value for output is ./data/output/pp-2020 . If passing in values for input / output these should be full paths to the files. The test will parse these inputs as a str() and pass this to beam . io . ReadFromText () .","title":"Available options"},{"location":"documentation/usage.html#run-the-pipeline","text":"To run the pipeline and complete the task run: analyse-properties --runner DirectRunner The pipeline will use the 2020 dataset located in ./data/input and output the resulting .json to ./data/output .","title":"Run the pipeline"}]}
\ No newline at end of file
+{"config":{"indexing":"full","lang":["en"],"min_search_length":3,"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"index.html","text":"Welcome \u00b6 Introduction \u00b6 This documentation accompanies the technical test for the Street Group. The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and scaling the pipeline. Navigate sections using the tabs at the top of the page. Pages in this section can be viewed in order by using the section links in the left menu, or by using bar at the bottom of the page. The table of contents in the right menu can be used to navigate sections on each page. Note All paths in this documentation, e.g ./analyse_properties/data/output refer to the location of the directory/file from the root of the repo.","title":"Welcome"},{"location":"index.html#welcome","text":"","title":"Welcome"},{"location":"index.html#introduction","text":"This documentation accompanies the technical test for the Street Group. The following pages will guide the user through installing the requirements, and running the task to complete the test. In addition, there is some discussion around the approach, and scaling the pipeline. Navigate sections using the tabs at the top of the page. Pages in this section can be viewed in order by using the section links in the left menu, or by using bar at the bottom of the page. The table of contents in the right menu can be used to navigate sections on each page. Note All paths in this documentation, e.g ./analyse_properties/data/output refer to the location of the directory/file from the root of the repo.","title":"Introduction"},{"location":"dataflow/index.html","text":"Running on DataFlow \u00b6 The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP. Prerequisites \u00b6 Cloud Storage \u00b6 A Cloud Storage bucket with the following structure: ./input ./output ./tmp Place the input files into the ./input directory in the bucket. VPC \u00b6 To get around public IP quotas I created a VPC in the europe-west1 region that has Private Google Access turned to ON . Command \u00b6 Tip We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker. Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to: python -m analyse_properties.main \\ --runner DataflowRunner \\ --project street-group \\ --region europe-west1 \\ --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \\ --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \\ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \\ --subnetwork = https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \\ --no_use_public_ips \\ --worker_machine_type = n1-highmem-2 The output file from this pipeline is publically available and can be downloaded here . The job graph for this pipeline is displayed below:","title":"Running on DataFlow"},{"location":"dataflow/index.html#running-on-dataflow","text":"The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP.","title":"Running on DataFlow"},{"location":"dataflow/index.html#prerequisites","text":"","title":"Prerequisites"},{"location":"dataflow/index.html#cloud-storage","text":"A Cloud Storage bucket with the following structure: ./input ./output ./tmp Place the input files into the ./input directory in the bucket.","title":"Cloud Storage"},{"location":"dataflow/index.html#vpc","text":"To get around public IP quotas I created a VPC in the europe-west1 region that has Private Google Access turned to ON .","title":"VPC"},{"location":"dataflow/index.html#command","text":"Tip We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker. Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to: python -m analyse_properties.main \\ --runner DataflowRunner \\ --project street-group \\ --region europe-west1 \\ --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \\ --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \\ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \\ --subnetwork = https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \\ --no_use_public_ips \\ --worker_machine_type = n1-highmem-2 The output file from this pipeline is publically available and can be downloaded here . The job graph for this pipeline is displayed below:","title":"Command"},{"location":"dataflow/scaling.html","text":"Scaling to the full DataSet \u00b6 As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows. Mapping table \u00b6 Using a mapping table as a side-input means that for the full dataset this table is going to be huge. Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied. Running the pipeline against the full dataset resulted in the following error: \"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900\" with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated. Using a higher tiered worker_machine_type , disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options: --worker_machine_type = n1-highmem-8 \\ --num_workers = 3 \\ --autoscaling_algorithm = NONE with 156GB of RAM available to the pipeline with 52GB on each worker. The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down: \"Error message from worker: Traceback (most recent call last): File \"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py\", line 651, in do_work work_executor.execute() ... File \"/usr/local/lib/python3.7/multiprocessing/connection.py\", line 393, in _send_bytes header = struct.pack(\"!i\", n) struct.error: 'i' format requires -2147483648 <= number <= 2147483647 The number 2147483647 being the maximum value for a 32bit integer. As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem. Patterns \u00b6 Google have several patterns for large side-inputs which are documented here: Part 1 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1 Part 2 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2 Solution \u00b6 A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row. In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data. Remove the condense mapping table stage as it is no longer needed (which also saves a bit of time). Instead of using: beam . FlatMap ( insert_data_for_id , beam . pvalue . AsSingleton ( mapping_table_condensed ) ) to insert the results of the mapping table we write a new DoFn that takes the element, and for each id_all_columns in the array we make a call to BigQuery to get the array for this ID and insert it at this stage. Because each id_all_columns and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.","title":"Scaling to the Full DataSet"},{"location":"dataflow/scaling.html#scaling-to-the-full-dataset","text":"As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows.","title":"Scaling to the full DataSet"},{"location":"dataflow/scaling.html#mapping-table","text":"Using a mapping table as a side-input means that for the full dataset this table is going to be huge. Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied. Running the pipeline against the full dataset resulted in the following error: \"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900\" with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated. Using a higher tiered worker_machine_type , disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options: --worker_machine_type = n1-highmem-8 \\ --num_workers = 3 \\ --autoscaling_algorithm = NONE with 156GB of RAM available to the pipeline with 52GB on each worker. The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down: \"Error message from worker: Traceback (most recent call last): File \"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py\", line 651, in do_work work_executor.execute() ... File \"/usr/local/lib/python3.7/multiprocessing/connection.py\", line 393, in _send_bytes header = struct.pack(\"!i\", n) struct.error: 'i' format requires -2147483648 <= number <= 2147483647 The number 2147483647 being the maximum value for a 32bit integer. As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem.","title":"Mapping table"},{"location":"dataflow/scaling.html#patterns","text":"Google have several patterns for large side-inputs which are documented here: Part 1 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1 Part 2 https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2","title":"Patterns"},{"location":"dataflow/scaling.html#solution","text":"A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row. In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data. Remove the condense mapping table stage as it is no longer needed (which also saves a bit of time). Instead of using: beam . FlatMap ( insert_data_for_id , beam . pvalue . AsSingleton ( mapping_table_condensed ) ) to insert the results of the mapping table we write a new DoFn that takes the element, and for each id_all_columns in the array we make a call to BigQuery to get the array for this ID and insert it at this stage. Because each id_all_columns and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.","title":"Solution"},{"location":"discussion/approach.html","text":"Approach \u00b6 The general approach to the pipeline is: Loading stage \u00b6 Load using beam . io . ReadFromText () Split the string loaded by , as it's a comma delimited .csv . Strip the leading/trailing \" marks. The result is an array with each element representing a single column in that row. Cleaning stage \u00b6 Already discussed. Create a mapping table \u00b6 The mapping table takes each row and creates a (key,value) pair with: The key being the id across all columns ( id_all_columns ). The value being the raw data as an array. The mapping table is then condensed to a single dictionary with these key, value pairs (automatically deduplicating repeated rows) and is used as a side input further down the pipeline. This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey , the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation. Prepare stage \u00b6 Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date ( id_without_price_date ). This id will not be unique: for properties with more than one transaction they will share this id. Create a (key, value) pair with: The key being id_without_price_date . The value being id_all_columns . Group by id_without_price_date . This results in a PCollection that looks like: (id_without_price_date, [id_all_columns,...]) Deduplicate the id_all_columns inside this array to eliminate repeated rows that are exactly the same. Use the mapping table as a side input to reinsert the raw data using the id_all_columns . Example for No.1 B90 3LA Mapping table (pre condensed): (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Mapping table (condensed): { ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' : [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ]} Prepared (key, value): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') Prepared (GroupByKey): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Prepared (Deduplicated): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Use mapping table as side input: (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Format stage \u00b6 This stage takes the result and constructs a json object out of the grouped data. The schema for this output is discussed in the following page. Save stage \u00b6 The PCollection is combined with beam . combiners . ToList () Apply json.dumps() for proper quotation marks for strings. Write to text with beam . io . WriteToText .","title":"Approach"},{"location":"discussion/approach.html#approach","text":"The general approach to the pipeline is:","title":"Approach"},{"location":"discussion/approach.html#loading-stage","text":"Load using beam . io . ReadFromText () Split the string loaded by , as it's a comma delimited .csv . Strip the leading/trailing \" marks. The result is an array with each element representing a single column in that row.","title":"Loading stage"},{"location":"discussion/approach.html#cleaning-stage","text":"Already discussed.","title":"Cleaning stage"},{"location":"discussion/approach.html#create-a-mapping-table","text":"The mapping table takes each row and creates a (key,value) pair with: The key being the id across all columns ( id_all_columns ). The value being the raw data as an array. The mapping table is then condensed to a single dictionary with these key, value pairs (automatically deduplicating repeated rows) and is used as a side input further down the pipeline. This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey , the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.","title":"Create a mapping table"},{"location":"discussion/approach.html#prepare-stage","text":"Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date ( id_without_price_date ). This id will not be unique: for properties with more than one transaction they will share this id. Create a (key, value) pair with: The key being id_without_price_date . The value being id_all_columns . Group by id_without_price_date . This results in a PCollection that looks like: (id_without_price_date, [id_all_columns,...]) Deduplicate the id_all_columns inside this array to eliminate repeated rows that are exactly the same. Use the mapping table as a side input to reinsert the raw data using the id_all_columns . Example for No.1 B90 3LA Mapping table (pre condensed): (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) (' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] ) Mapping table (condensed): { ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' : [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ]} Prepared (key, value): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ') Prepared (GroupByKey): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' , ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Prepared (Deduplicated): (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' f d 4634 fae c 47 c 29 de 40 bb f 7840723 b 41 ' ] ) Use mapping table as side input: (' fe 205 b fe 66 bc 7 f 18 c 50 c 8 f 3 d 77e c 3e30 ' , [ ' 317500 ' , ' 2020-11-13 00 : 00 ' , 'B 90 3 LA' , ' 1 ' , '' , 'VERSTONE ROAD' , 'SHIRLEY' , 'SOLIHULL' , 'SOLIHULL' , 'WEST MIDLANDS' , '' ] )","title":"Prepare stage"},{"location":"discussion/approach.html#format-stage","text":"This stage takes the result and constructs a json object out of the grouped data. The schema for this output is discussed in the following page.","title":"Format stage"},{"location":"discussion/approach.html#save-stage","text":"The PCollection is combined with beam . combiners . ToList () Apply json.dumps() for proper quotation marks for strings. Write to text with beam . io . WriteToText .","title":"Save stage"},{"location":"discussion/cleaning.html","text":"Cleaning \u00b6 In this page we discuss the cleaning stages and how best to prepare the data. Uniquely identify a property. \u00b6 To uniquely identify a property with the data we have it is enough to have a Postcode and the PAON (or SAON or combination of both). Postcode \u00b6 Because so few properties are missing a postcode (0.2% of all records) we will drop all rows that do not have one. We will drop some properties that could be identified uniquely with some more work, but the properties that are missing a postcode tend to be unusual/commercial/industrial (e.g a powerplant). PAON/SAON \u00b6 The PAON has 3 possible formats: The street number. The building name. The building name and street number (comma delimited). The SAON: Identifies the appartment/flat number for the building. If the SAON is present (only 11.7% of values) then the PAON will either be The building name. The building name and street number. Because of the way the PAON and SOAN are defined, if any row is missing both of these columns we will drop it. As only having the postcode is not enough (generally speaking) to uniquely identify a property. Tip In a production environment we could send these rows to a sink table (in BigQuery for example), rather than drop them outright. Collecting these rows over time might show some patterns on how we can uniquely identify properties that are missing these fields. We split the PAON as part of the cleaning stage. If the PAON contains a comma then it contains the building name and street number. We keep the street number in the same position as the PAON and insert the building name as a new column at the end of the row. If the PAON does not contain a comma we insert a blank column at the end to keep the number of columns in the PCollection consistent. Unneeded columns \u00b6 To try keep computation costs/time down, I decided to drop the categorical columns provided. These include: Property Type. Old/New. Duration. PPD Category Type. Record Status - monthly file only. Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing. These columns are also not consistent. E.g the property 63 B16, 0AE has three transactions. Two of these transactions have a property type of Other and one transaction has a property type of Terraced . These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change. In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity \\(\\mathcal{O}(n)\\) . General cleaning \u00b6 Upper case \u00b6 As all strings in the dataset are upper case, we convert everything in the row to upper case to enforce consistency across the dataset. Strip leading/trailing whitespace \u00b6 We strip all leading/trailing whitespace from each column to enforce consistency. Repeated rows \u00b6 Some of the data is repeated: Some rows are repeated, with the same date + price + address information but with a unique transaction id. Example (PCollection) [ { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] }, { \"gd4634faec47c29de40bbf7840723b42\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] } ] These rows will be deduplicated as part of the pipeline. Some rows have the same date + address information, but different prices. It would be very unusual to see multiple transactions on the same date for the same property. One reason could be that there was a data entry error, resulting in two different transactions with only one being the real price. As the date column does not contain the time (it is fixed at 00:00 ) it is impossible to tell. Another reason could be missing building/flat/appartment information in this entry. We keep these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out. Example (Output) [ { \"property_id\" : \"20d5c335c8d822a40baab0ecd57e92a4\" , \"readable_address\" : \"53 PAVENHAM DRIVE\\nBIRMINGHAM\\nWEST MIDLANDS\\nB5 7TN\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"53\" , \"street\" : \"PAVENHAM DRIVE\" , \"locality\" : \"\" , \"town\" : \"BIRMINGHAM\" , \"district\" : \"BIRMINGHAM\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B5 7TN\" , \"property_transactions\" : [ { \"price\" : 270000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 }, { \"price\" : 364000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ]","title":"Cleaning"},{"location":"discussion/cleaning.html#cleaning","text":"In this page we discuss the cleaning stages and how best to prepare the data.","title":"Cleaning"},{"location":"discussion/cleaning.html#uniquely-identify-a-property","text":"To uniquely identify a property with the data we have it is enough to have a Postcode and the PAON (or SAON or combination of both).","title":"Uniquely identify a property."},{"location":"discussion/cleaning.html#postcode","text":"Because so few properties are missing a postcode (0.2% of all records) we will drop all rows that do not have one. We will drop some properties that could be identified uniquely with some more work, but the properties that are missing a postcode tend to be unusual/commercial/industrial (e.g a powerplant).","title":"Postcode"},{"location":"discussion/cleaning.html#paonsaon","text":"The PAON has 3 possible formats: The street number. The building name. The building name and street number (comma delimited). The SAON: Identifies the appartment/flat number for the building. If the SAON is present (only 11.7% of values) then the PAON will either be The building name. The building name and street number. Because of the way the PAON and SOAN are defined, if any row is missing both of these columns we will drop it. As only having the postcode is not enough (generally speaking) to uniquely identify a property. Tip In a production environment we could send these rows to a sink table (in BigQuery for example), rather than drop them outright. Collecting these rows over time might show some patterns on how we can uniquely identify properties that are missing these fields. We split the PAON as part of the cleaning stage. If the PAON contains a comma then it contains the building name and street number. We keep the street number in the same position as the PAON and insert the building name as a new column at the end of the row. If the PAON does not contain a comma we insert a blank column at the end to keep the number of columns in the PCollection consistent.","title":"PAON/SAON"},{"location":"discussion/cleaning.html#unneeded-columns","text":"To try keep computation costs/time down, I decided to drop the categorical columns provided. These include: Property Type. Old/New. Duration. PPD Category Type. Record Status - monthly file only. Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing. These columns are also not consistent. E.g the property 63 B16, 0AE has three transactions. Two of these transactions have a property type of Other and one transaction has a property type of Terraced . These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change. In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity \\(\\mathcal{O}(n)\\) .","title":"Unneeded columns"},{"location":"discussion/cleaning.html#general-cleaning","text":"","title":"General cleaning"},{"location":"discussion/cleaning.html#upper-case","text":"As all strings in the dataset are upper case, we convert everything in the row to upper case to enforce consistency across the dataset.","title":"Upper case"},{"location":"discussion/cleaning.html#strip-leadingtrailing-whitespace","text":"We strip all leading/trailing whitespace from each column to enforce consistency.","title":"Strip leading/trailing whitespace"},{"location":"discussion/cleaning.html#repeated-rows","text":"Some of the data is repeated: Some rows are repeated, with the same date + price + address information but with a unique transaction id. Example (PCollection) [ { \"fd4634faec47c29de40bbf7840723b41\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] }, { \"gd4634faec47c29de40bbf7840723b42\" : [ \"317500\" , \"2020-11-13 00:00\" , \"B90 3LA\" , \"1\" , \"\" , \"VERSTONE ROAD\" , \"SHIRLEY\" , \"SOLIHULL\" , \"SOLIHULL\" , \"WEST MIDLANDS\" , \"\" ] } ] These rows will be deduplicated as part of the pipeline. Some rows have the same date + address information, but different prices. It would be very unusual to see multiple transactions on the same date for the same property. One reason could be that there was a data entry error, resulting in two different transactions with only one being the real price. As the date column does not contain the time (it is fixed at 00:00 ) it is impossible to tell. Another reason could be missing building/flat/appartment information in this entry. We keep these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out. Example (Output) [ { \"property_id\" : \"20d5c335c8d822a40baab0ecd57e92a4\" , \"readable_address\" : \"53 PAVENHAM DRIVE\\nBIRMINGHAM\\nWEST MIDLANDS\\nB5 7TN\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"53\" , \"street\" : \"PAVENHAM DRIVE\" , \"locality\" : \"\" , \"town\" : \"BIRMINGHAM\" , \"district\" : \"BIRMINGHAM\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B5 7TN\" , \"property_transactions\" : [ { \"price\" : 270000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 }, { \"price\" : 364000 , \"transaction_date\" : \"2020-04-23\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ]","title":"Repeated rows"},{"location":"discussion/exploration.html","text":"Data Exploration Report \u00b6 A brief exploration was done on the full dataset using the module pandas-profiling . The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc. and produces a report .html file that can be viewed interatively in your browser. The script used to generate this report is located in ./exploration/report.py and can be viewed below. report.py import pathlib import pandas as pd from pandas_profiling import ProfileReport def main (): input_file = ( pathlib . Path ( __file__ ) . parents [ 1 ] / \"data\" / \"input\" / \"pp-complete.csv\" ) with input_file . open () as csv : df_report = pd . read_csv ( csv , names = [ \"transaction_id\" , \"price\" , \"date_of_transfer\" , \"postcode\" , \"property_type\" , \"old_new\" , \"duration\" , \"paon\" , \"saon\" , \"street\" , \"locality\" , \"town_city\" , \"district\" , \"county\" , \"ppd_category\" , \"record_status\" , ], ) profile = ProfileReport ( df_report , title = \"Price Paid Data\" , minimal = True ) profile . to_file ( \"price_paid_data_report.html\" ) if __name__ == \"__main__\" : main () The report can be viewed by clicking the Data Exploration Report tab at the top of the page. Interesting observations \u00b6 When looking at the report we are looking for data quality and missing observations. The statistics are interesting to see but are largely irrelevant for this task. The data overall looks very good for a dataset of its size (~27 million records). For important fields there are no missing values: Every row has a price. Every row has a unique transaction ID. Every row has a transaction date. Some fields that we will need are missing data: ~42,000 (0.2%) are missing a Postcode. ~4,000 (<0.1%) are missing a PAON (primary addressable object name). ~412,000 (1.6%) are missing a Street Name.","title":"Data Exploration Report"},{"location":"discussion/exploration.html#data-exploration-report","text":"A brief exploration was done on the full dataset using the module pandas-profiling . The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc. and produces a report .html file that can be viewed interatively in your browser. The script used to generate this report is located in ./exploration/report.py and can be viewed below. report.py import pathlib import pandas as pd from pandas_profiling import ProfileReport def main (): input_file = ( pathlib . Path ( __file__ ) . parents [ 1 ] / \"data\" / \"input\" / \"pp-complete.csv\" ) with input_file . open () as csv : df_report = pd . read_csv ( csv , names = [ \"transaction_id\" , \"price\" , \"date_of_transfer\" , \"postcode\" , \"property_type\" , \"old_new\" , \"duration\" , \"paon\" , \"saon\" , \"street\" , \"locality\" , \"town_city\" , \"district\" , \"county\" , \"ppd_category\" , \"record_status\" , ], ) profile = ProfileReport ( df_report , title = \"Price Paid Data\" , minimal = True ) profile . to_file ( \"price_paid_data_report.html\" ) if __name__ == \"__main__\" : main () The report can be viewed by clicking the Data Exploration Report tab at the top of the page.","title":"Data Exploration Report"},{"location":"discussion/exploration.html#interesting-observations","text":"When looking at the report we are looking for data quality and missing observations. The statistics are interesting to see but are largely irrelevant for this task. The data overall looks very good for a dataset of its size (~27 million records). For important fields there are no missing values: Every row has a price. Every row has a unique transaction ID. Every row has a transaction date. Some fields that we will need are missing data: ~42,000 (0.2%) are missing a Postcode. ~4,000 (<0.1%) are missing a PAON (primary addressable object name). ~412,000 (1.6%) are missing a Street Name.","title":"Interesting observations"},{"location":"discussion/introduction.html","text":"Introduction \u00b6 This section will go through some discussion of the test including: Data exploration Cleaning the data Interpreting the results","title":"Introduction"},{"location":"discussion/introduction.html#introduction","text":"This section will go through some discussion of the test including: Data exploration Cleaning the data Interpreting the results","title":"Introduction"},{"location":"discussion/results.html","text":"Results \u00b6 The resulting output .json looks like (for the previous example using No. 1 B90 3LA ): [ { \"property_id\" : \"fe205bfe66bc7f18c50c8f3d77ec3e30\" , \"readable_address\" : \"1 VERSTONE ROAD\\nSHIRLEY\\nSOLIHULL\\nWEST MIDLANDS\\nB90 3LA\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"1\" , \"street\" : \"VERSTONE ROAD\" , \"locality\" : \"SHIRLEY\" , \"town\" : \"SOLIHULL\" , \"district\" : \"SOLIHULL\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B90 3LA\" , \"property_transactions\" : [ { \"price\" : 317500 , \"transaction_date\" : \"2020-11-13\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ] The standard property information is included, we will briefly discuss the additional fields included in this output file. readable_address \u00b6 The components that make up the address in the dataset are often repetitive, with the locality, town/city, district and county often sharing the same result. This can result in hard to read addresses if we just stacked all the components sequentially. The readable_address provides an easy to read address that strips this repetiveness out, by doing pairwise comparisons to each of the four components and applying a mask. The result is an address that could be served to the end user, or easily displayed on a page. This saves any user having to apply the same logic to simply display the address somewhere, the full address of a property should be easy to read and easily accessible. property_transactions \u00b6 This array contains an object for each transaction for that property that has the price and year as an int , with the date having the 00:00 time stripped out. latest_transaction_year \u00b6 The date of the latest transaction is extracted from the array of property_transactions and placed in the top level of the json object. This allows any end user to easily search for properties that haven't been sold in a period of time, without having to write this logic themselves. A consumer should be able to use this data to answer questions like: Give me all properties in the town of Solihull that haven't been sold in the past 10 years.","title":"Results"},{"location":"discussion/results.html#results","text":"The resulting output .json looks like (for the previous example using No. 1 B90 3LA ): [ { \"property_id\" : \"fe205bfe66bc7f18c50c8f3d77ec3e30\" , \"readable_address\" : \"1 VERSTONE ROAD\\nSHIRLEY\\nSOLIHULL\\nWEST MIDLANDS\\nB90 3LA\" , \"flat_appartment\" : \"\" , \"builing\" : \"\" , \"number\" : \"1\" , \"street\" : \"VERSTONE ROAD\" , \"locality\" : \"SHIRLEY\" , \"town\" : \"SOLIHULL\" , \"district\" : \"SOLIHULL\" , \"county\" : \"WEST MIDLANDS\" , \"postcode\" : \"B90 3LA\" , \"property_transactions\" : [ { \"price\" : 317500 , \"transaction_date\" : \"2020-11-13\" , \"year\" : 2020 } ], \"latest_transaction_year\" : 2020 } ] The standard property information is included, we will briefly discuss the additional fields included in this output file.","title":"Results"},{"location":"discussion/results.html#readable_address","text":"The components that make up the address in the dataset are often repetitive, with the locality, town/city, district and county often sharing the same result. This can result in hard to read addresses if we just stacked all the components sequentially. The readable_address provides an easy to read address that strips this repetiveness out, by doing pairwise comparisons to each of the four components and applying a mask. The result is an address that could be served to the end user, or easily displayed on a page. This saves any user having to apply the same logic to simply display the address somewhere, the full address of a property should be easy to read and easily accessible.","title":"readable_address"},{"location":"discussion/results.html#property_transactions","text":"This array contains an object for each transaction for that property that has the price and year as an int , with the date having the 00:00 time stripped out.","title":"property_transactions"},{"location":"discussion/results.html#latest_transaction_year","text":"The date of the latest transaction is extracted from the array of property_transactions and placed in the top level of the json object. This allows any end user to easily search for properties that haven't been sold in a period of time, without having to write this logic themselves. A consumer should be able to use this data to answer questions like: Give me all properties in the town of Solihull that haven't been sold in the past 10 years.","title":"latest_transaction_year"},{"location":"documentation/installation.html","text":"Installation \u00b6 The task is written in Python 3.7.9 using Apache Beam 2.32.0. Python versions 3.6.14 and 3.8.11 should also be compatible but have not been tested. The task has been tested on MacOS Big Sur and WSL2. The task should run on Windows but this wasn't tested. For Beam 2.32.0 the supported versions of the Python SDK can be found here . Pip \u00b6 In a virtual environment run from the root of the repo: pip install -r requirements.txt Poetry (Alternative) \u00b6 Install Poetry globally From the root of the repo install the dependencies with: poetry install --no-dev Activate the shell with: poetry shell","title":"Installation"},{"location":"documentation/installation.html#installation","text":"The task is written in Python 3.7.9 using Apache Beam 2.32.0. Python versions 3.6.14 and 3.8.11 should also be compatible but have not been tested. The task has been tested on MacOS Big Sur and WSL2. The task should run on Windows but this wasn't tested. For Beam 2.32.0 the supported versions of the Python SDK can be found here .","title":"Installation"},{"location":"documentation/installation.html#pip","text":"In a virtual environment run from the root of the repo: pip install -r requirements.txt","title":"Pip"},{"location":"documentation/installation.html#poetry-alternative","text":"Install Poetry globally From the root of the repo install the dependencies with: poetry install --no-dev Activate the shell with: poetry shell","title":"Poetry (Alternative)"},{"location":"documentation/usage.html","text":"Usage \u00b6 This page documents how to run the pipeline locally to complete the task for the dataset for 2020 . The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here . We also discuss how to adapt the pipeline so it can run against the full dataset . Download dataset \u00b6 The input data by default should go in ./data/input . For convenience the data is available publicly in a GCP Cloud Storage bucket. Run: wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input to download the data for 2020 and place in the input directory above. Entrypoint \u00b6 The entrypoint to the pipeline is analyse_properties.main . Available options \u00b6 Running python -m analyse_properties.main --help gives the following output: usage: analyse_properties.main [ -h ] [ --input INPUT ] [ --output OUTPUT ] optional arguments: -h, --help show this help message and exit --input INPUT Full path to the input file. --output OUTPUT Full path to the output file without extension. The default value for input is ./data/input/pp-2020.csv and the default value for output is ./data/output/pp-2020 . Run the pipeline \u00b6 To run the pipeline and complete the task run: python -m analyse_properties.main \\ --runner DirectRunner \\ --input ./data/input/pp-2020.csv \\ --output ./data/output/pp-2020 from the root of the repo. The pipeline will use the 2020 dataset located in ./data/input and output the resulting .json to ./data/output .","title":"Usage"},{"location":"documentation/usage.html#usage","text":"This page documents how to run the pipeline locally to complete the task for the dataset for 2020 . The pipeline also runs in GCP using DataFlow and is discussed further on but can be viewed here . We also discuss how to adapt the pipeline so it can run against the full dataset .","title":"Usage"},{"location":"documentation/usage.html#download-dataset","text":"The input data by default should go in ./data/input . For convenience the data is available publicly in a GCP Cloud Storage bucket. Run: wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input to download the data for 2020 and place in the input directory above.","title":"Download dataset"},{"location":"documentation/usage.html#entrypoint","text":"The entrypoint to the pipeline is analyse_properties.main .","title":"Entrypoint"},{"location":"documentation/usage.html#available-options","text":"Running python -m analyse_properties.main --help gives the following output: usage: analyse_properties.main [ -h ] [ --input INPUT ] [ --output OUTPUT ] optional arguments: -h, --help show this help message and exit --input INPUT Full path to the input file. --output OUTPUT Full path to the output file without extension. The default value for input is ./data/input/pp-2020.csv and the default value for output is ./data/output/pp-2020 .","title":"Available options"},{"location":"documentation/usage.html#run-the-pipeline","text":"To run the pipeline and complete the task run: python -m analyse_properties.main \\ --runner DirectRunner \\ --input ./data/input/pp-2020.csv \\ --output ./data/output/pp-2020 from the root of the repo. The pipeline will use the 2020 dataset located in ./data/input and output the resulting .json to ./data/output .","title":"Run the pipeline"}]}
\ No newline at end of file
diff --git a/sitemap.xml.gz b/sitemap.xml.gz
index 9da2801..f96b2a7 100644
Binary files a/sitemap.xml.gz and b/sitemap.xml.gz differ