The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP.
We need to choose a worker_machine_type with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For pp-2020.csv the type n1-highmem-2 with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker.
+
+
Assuming the pp-2020.csv file has been placed in the ./input directory in the bucket you can run a command similar to:
+
+
Caution
+
Use the command python -m analyse_properties.main as the entrypoint to the pipeline and not analyse-properties as the module isn't installed with poetry on the workers with the configuration below.
As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows.
Using a mapping table as a side-input means that for the full dataset this table is going to be huge.
+
Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied.
+
Running the pipeline against the full dataset resulted in the following error:
+
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"
+
+
with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated.
+
Using a higher tiered worker_machine_type, disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options:
with 156GB of RAM available to the pipeline with 52GB on each worker.
+
The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down:
+
"Error message from worker: Traceback (most recent call last):
+ File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
+ work_executor.execute()
+ ...
+ File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
+ header = struct.pack("!i", n)
+struct.error: 'i' format requires -2147483648 <= number <= 2147483647
+
+
The number 2147483647 being the maximum value for a 32bit integer.
+
As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem.
A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row.
+
In addition to creating the mapping table (key, value) pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data.
+
Remove the condense mapping table stage as it is no longer needed.
to insert the results of the mapping table we write a new DoFn that takes the element, and for each id_all_columns in the array we make a call to BigQuery to get the array for this ID and insert it at this stage.
+
Because each id_all_columns and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.
The mapping table takes each row and creates a (key,value) pair with:
+
+
The key being the id across all columns (id_all_columns).
+
The value being the raw data as an array.
+
+
The mapping table is then condensed to a single dictionary with these key, value pairs and is used as a side input further down the pipeline.
+
This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey, the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.
Because so few properties are missing a postcode (0.2% of all records) we will drop all rows that do not have one. We will drop some properties that could be identified uniquely with some more work, but the properties that are missing a postcode tend to be unusual/commercial/industrial (e.g a powerplant).
The building name and street number (comma delimited).
+
+
The SAON:
+
+
Identifies the appartment/flat number for the building.
+
If the SAON is present (only 11.7% of values) then the PAON will either be
+
The building name.
+
The building name and street number.
+
+
+
+
Because of the way the PAON and SOAN are defined, if any row is missing both of these columns we will drop it. As only having the postcode is not enough (generally speaking) to uniquely identify a property.
+
+
Tip
+
In a production environment we could send these rows to a sink table (in BigQuery for example), rather than drop them outright. Collecting these rows over time might show some patterns on how we can uniquely identify properties that are missing these fields.
+
+
We split the PAON as part of the cleaning stage. If the PAON contains a comma then it contains the building name and street number. We keep the street number in the same position as the PAON and insert the building name as a new column at the end of the row. If the PAON does not contain a comma we insert a blank column at the end to keep the number of columns in the PCollection consistent.
To try keep computation costs/time down, I decided to drop the categorical columns provided. These include:
+
+
Property Type.
+
Old/New.
+
Duration.
+
PPD Category Type.
+
Record Status - monthly file only.
+
+
Initially I was attempting to work against the full dataset so dropping these columns would make a difference in the amount of data that needs processing.
+
These columns are also not consistent. E.g the property 63B16, 0AE has three transactions. Two of these transactions have a property type of Other and one transaction has a property type of Terraced.
+
These columns do provide some relevant information (old/new, duration, property type) and these could be included back into the pipeline fairly easily. Due to time constraints I was unable to make this change.
+
In addition, I also dropped the transaction unique identifier column. I wanted the IDs calculated in the pipeline to be consistent in format, and hashing a string (md5) isn't that expensive to calculate with complexity \(\mathcal{O}(n)\).
These rows will be deduplicated as part of the pipeline.
+
+
Some rows have the same date + address information, but different prices.
+
+
It would be very unusual to see multiple transactions on the same date for the same property. One reason could be that there was a data entry error, resulting in two different transactions with only one being the real price. As the date column does not contain the time (it is fixed at 00:00) it is impossible to tell.
+
Another reason could be missing building/flat/appartment information in this entry.
+
We keep these in the data, resulting in some properties having multiple transactions with different prices on the same date. Without a time or more information to go on, it is difficult to see how these could be filtered out.
A brief exploration was done on the full dataset using the module pandas-profiling. The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc.
-
The script used to generate this report is located in ./exploration/report.py.
+
A brief exploration was done on the full dataset using the module pandas-profiling. The module uses pandas to load a dataset and automatically produce quantile/descriptive statistics, common values, extreme values, skew, kurtosis etc. and produces a report .html file that can be viewed interatively in your browser.
+
The script used to generate this report is located in ./exploration/report.py and can be viewed below.
When looking at the report we are looking for data quality and missing observations. The statistics are interesting to see but are largely irrelevant for this task.
+
The data overall looks very good for a dataset of its size (~27 million records). For important fields there are no missing values:
+
+
Every row has a price.
+
Every row has a unique transaction ID.
+
Every row has a transaction date.
+
+
Some fields that we will need are missing data:
+
+
~42,000 (0.2%) are missing a Postcode.
+
~4,000 (<0.1%) are missing a PAON (primary addressable object name).