diff --git a/docs/dataflow/img/screencapture-console-cloud-google-dataflow-jobs-europe-west1-2021-09-27-08-08-57-10745415117326954715-step-mainTab-JOB-GRAPH-2021-09-27-22_29_32.png:Zone.Identifier b/docs/dataflow/img/screencapture-console-cloud-google-dataflow-jobs-europe-west1-2021-09-27-08-08-57-10745415117326954715-step-mainTab-JOB-GRAPH-2021-09-27-22_29_32.png:Zone.Identifier new file mode 100644 index 0000000..053d112 --- /dev/null +++ b/docs/dataflow/img/screencapture-console-cloud-google-dataflow-jobs-europe-west1-2021-09-27-08-08-57-10745415117326954715-step-mainTab-JOB-GRAPH-2021-09-27-22_29_32.png:Zone.Identifier @@ -0,0 +1,3 @@ +[ZoneTransfer] +ZoneId=3 +HostUrl=about:internet diff --git a/docs/dataflow/img/successful_dataflow_job.png b/docs/dataflow/img/successful_dataflow_job.png new file mode 100644 index 0000000..a0da489 Binary files /dev/null and b/docs/dataflow/img/successful_dataflow_job.png differ diff --git a/docs/dataflow/index.md b/docs/dataflow/index.md new file mode 100644 index 0000000..a21fc9a --- /dev/null +++ b/docs/dataflow/index.md @@ -0,0 +1,50 @@ +# Running on DataFlow + +The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP. + +## Prerequisites + +### Cloud Storage + +- A Cloud Storage bucket with the following structure: + +``` +./input +./output +./tmp +``` + +- Place the input files into the `./input` directory in the bucket. + +### VPC + +To get around public IP quotas I created a VPC in the `europe-west1` region that has `Private Google Access` turned to `ON`. + +## Command + +!!! tip + We need to choose a `worker_machine_type` with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For `pp-2020.csv` the type `n1-highmem-2` with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker. + +Assuming the `pp-2020.csv` file has been placed in the `./input` directory in the bucket you can run a command similar to: + +!!! caution + Use the command `python -m analyse_properties.main` as the entrypoint to the pipeline and not `analyse-properties` as the module isn't installed with poetry on the workers with the configuration below. + +```bash +python -m analyse_properties.main \ + --runner DataflowRunner \ + --project street-group \ + --region europe-west1 \ + --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \ + --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \ + --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ + --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ + --no_use_public_ips \ + --worker_machine_type=n1-highmem-2 +``` + +The output file from this pipeline is publically available and can be downloaded [here](https://storage.googleapis.com/street-group-technical-test-dmot-euw1/output/pp-2020-00000-of-00001.json). + +The job graph for this pipeline is displayed below: + +![JobGraph](img/successful_dataflow_job.png) diff --git a/docs/dataflow/scaling.md b/docs/dataflow/scaling.md new file mode 100644 index 0000000..6cd10a3 --- /dev/null +++ b/docs/dataflow/scaling.md @@ -0,0 +1,70 @@ +# Scaling to the full DataSet + +As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows. + +## Mapping table + +Using a mapping table as a side-input means that for the full dataset this table is going to be huge. + +Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied. + +Running the pipeline against the full dataset resulted in the following error: + +```text +"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900" +``` + +with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated. + +Using a higher tiered `worker_machine_type`, disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options: + +```bash +--worker_machine_type=n1-highmem-8 \ +--num_workers=3 \ +--autoscaling_algorithm=NONE +``` + +with 156GB of RAM available to the pipeline with 52GB on each worker. + +The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down: + +```text +"Error message from worker: Traceback (most recent call last): + File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work + work_executor.execute() + ... + File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes + header = struct.pack("!i", n) +struct.error: 'i' format requires -2147483648 <= number <= 2147483647 +``` + +The number 2147483647 being the maximum value for a 32bit integer. + +As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem. + +## Patterns + +Google have several patterns for large side-inputs which are documented here: + +- Part 1 +- Part 2 + +## Solution + +A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row. + +In addition to creating the mapping table `(key, value)` pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data. + +Remove the condense mapping table stage as it is no longer needed. + +Instead of using: + +```python +beam.FlatMap( + insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed) +) +``` + +to insert the results of the mapping table we write a new `DoFn` that takes the element, and for each `id_all_columns` in the array we make a call to BigQuery to get the array for this ID and insert it at this stage. + +Because each `id_all_columns` and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery. diff --git a/docs/discussion/approach.md b/docs/discussion/approach.md index c95f645..252e0e5 100644 --- a/docs/discussion/approach.md +++ b/docs/discussion/approach.md @@ -1 +1,95 @@ # Approach + +The general approach to the pipeline is: + +## Loading stage + +- Load using `#!python beam.io.ReadFromText()` +- Split the string loaded by `,` as it's a comma delimited `.csv`. +- Strip the leading/trailing `"` marks. + +The result is an array with each element representing a single column in that row. + +## Cleaning stage + +Already discussed. + +## Create a mapping table + +The mapping table takes each row and creates a `(key,value)` pair with: + +- The key being the id across all columns (`id_all_columns`). +- The value being the raw data as an array. + +The mapping table is then condensed to a single dictionary with these key, value pairs and is used as a side input further down the pipeline. + +This mapping table is created to ensure the `GroupByKey` operation is as quick as possible. The more data you have to process in a `GroupByKey`, the longer the operation takes. By doing the `GroupByKey` using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation. + +## Prepare stage + +- Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date (`id_without_price_date`). + +This id will not be unique: for properties with more than one transaction they will share this id. + +- Create a `(key, value)` pair with: + - The key being `id_without_price_date`. + - The value being `id_all_columns`. +- Group by `id_without_price_date`. + +This results in a PCollection that looks like: `(id_without_price_date, [id_all_columns,...])` + +- Deduplicate the `id_all_columns` inside this array to eliminate repeated rows that are exactly the same. +- Use the mapping table as a side input to reinsert the raw data using the `id_all_columns`. + +
+ Example for No.1 B90 3LA + +Mapping table (pre condensed): + +```json +('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']) +('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']) +``` + +Mapping table (condensed): + +```json +{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']} +``` + +Prepared (key, value): + +```json +('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41') +('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41') +``` + +Prepared (GroupByKey): + +```json +('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41']) +``` + +Prepared (Deduplicated): + +```json +('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41']) +``` + +Use mapping table as side input: + +```json +('fe205bfe66bc7f18c50c8f3d77ec3e30', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']) +``` + +
+ +## Format stage + +This stage takes the result and constructs a `json` object out of the grouped data. The schema for this output is discussed in the following page. + +## Save stage + +- The PCollection is combined with `#!python beam.combiners.ToList()` +- Apply `json.dumps()` for proper quotation marks for strings. +- Write to text with `#!python beam.io.WriteToText`. diff --git a/docs/discussion/cleaning.md b/docs/discussion/cleaning.md index a2659ba..314a829 100644 --- a/docs/discussion/cleaning.md +++ b/docs/discussion/cleaning.md @@ -44,6 +44,8 @@ To try keep computation costs/time down, I decided to drop the categorical colum Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing. +These columns are also not consistent. E.g the property `63` `B16, 0AE` has three transactions. Two of these transactions have a property type of `Other` and one transaction has a property type of `Terraced`. + These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change. In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity $\mathcal{O}(n)$. @@ -113,3 +115,40 @@ It would be very unusual to see multiple transactions on the same date for the s Another reason could be missing building/flat/appartment information in this entry. We **keep** these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out. + +
+ Example (Output) + +```json +[ + { + "property_id": "20d5c335c8d822a40baab0ecd57e92a4", + "readable_address": "53 PAVENHAM DRIVE\nBIRMINGHAM\nWEST MIDLANDS\nB5 7TN", + "flat_appartment": "", + "builing": "", + "number": "53", + "street": "PAVENHAM DRIVE", + "locality": "", + "town": "BIRMINGHAM", + "district": "BIRMINGHAM", + "county": "WEST MIDLANDS", + "postcode": "B5 7TN", + "property_transactions": [ + { + "price": 270000, + "transaction_date": "2020-04-23", + "year": 2020 + }, + { + "price": 364000, + "transaction_date": "2020-04-23", + "year": 2020 + } + ], + "latest_transaction_year": 2020 + } +] + +``` + +
diff --git a/docs/discussion/introduction.md b/docs/discussion/introduction.md index 8315849..33b5e1f 100644 --- a/docs/discussion/introduction.md +++ b/docs/discussion/introduction.md @@ -5,5 +5,3 @@ This section will go through some discussion of the test including: - Data exploration - Cleaning the data - Interpreting the results -- Deploying on GCP DataFlow -- Improvements diff --git a/docs/discussion/results.md b/docs/discussion/results.md new file mode 100644 index 0000000..747841d --- /dev/null +++ b/docs/discussion/results.md @@ -0,0 +1,51 @@ +# Results + +The resulting output `.json` looks like (for the previous example using No. 1 `B90 3LA`): + +```json +[ + { + "property_id": "fe205bfe66bc7f18c50c8f3d77ec3e30", + "readable_address": "1 VERSTONE ROAD\nSHIRLEY\nSOLIHULL\nWEST MIDLANDS\nB90 3LA", + "flat_appartment": "", + "builing": "", + "number": "1", + "street": "VERSTONE ROAD", + "locality": "SHIRLEY", + "town": "SOLIHULL", + "district": "SOLIHULL", + "county": "WEST MIDLANDS", + "postcode": "B90 3LA", + "property_transactions": [ + { + "price": 317500, + "transaction_date": "2020-11-13", + "year": 2020 + } + ], + "latest_transaction_year": 2020 + } +] +``` + +The standard property information is included, we will briefly discuss the additional fields included in this output file. + +## readable_address + +The components that make up the address in the dataset are often repetitive, with the locality, town/city, district and county often sharing the same result. This can result in hard to read addresses if we just stacked all the components sequentially. + +The `readable_address` provides an easy to read address that strips this repetiveness out, by doing pairwise comparisons to each of the four components and applying a mask. The result is an address that could be served to the end user, or easily displayed on a page. + +This saves any user having to apply the same logic to simply display the address somewhere, the full address of a property should be easy to read and easily accessible. + +## property_transactions + +This array contains an object for each transaction for that property that has the price and year as an `int`, with the date having the `00:00` time stripped out. + +## latest_transaction_year + +The date of the latest transaction is extracted from the array of `property_transactions` and placed in the top level of the `json` object. This allows any end user to easily search for properties that haven't been sold in a period of time, without having to write this logic themselves. + +A consumer should be able to use this data to answer questions like: + +- Give me all properties in the town of Solihull that haven't been sold in the past 10 years. diff --git a/mkdocs.yaml b/mkdocs.yaml index 424a20f..d732f6c 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -11,6 +11,10 @@ nav: - Data Exploration Report: discussion/exploration.md - Cleaning: discussion/cleaning.md - Approach: discussion/approach.md + - Results: discussion/results.md + - DataFlow: + - Running on DataFlow: dataflow/index.md + - Scaling to the Full DataSet: dataflow/scaling.md - Data Exploration Report: pandas-profiling/report.html theme: name: material diff --git a/notes/documentation/dataflow.md b/notes/documentation/dataflow.md index b5228e6..ddfd1ff 100644 --- a/notes/documentation/dataflow.md +++ b/notes/documentation/dataflow.md @@ -91,3 +91,5 @@ Common use cases: - Part 1 - Part 2 + +Side inputs: