* adding initial skeleton * updating .gitignore * updating dev dependencies * adding report.py * updating notes * adding prospector.yaml * updating beam to install gcp extras * adding documentation * adding data exploration report + code * adding latest beam pipeline code * adding latest beam pipeline code * adding debug.py * adding latesty beam pipeline code * adding latest beam pipeline code * adding latest beam pipeline code * updating .gitignore * updating folder structure for data input/output * updating prospector.yaml * adding latest beam pipeline code * updating prospector.yaml * migrate beam pipeline to main.py * updating .gitignore * updating .gitignore * adding download script for data set * adding initial docs * moving inputs/outputs to use pathlib * removing shard_name_template from output file * adding pyenv 3.7.9 * removing requirements.txt for documentation * updating README.md * updating download data script for new location in GCS * adding latest beam pipeline code for dataflow * adding latest beam pipeline code for dataflow * adding latest beam pipeline code for dataflow * moving dataflow notes * updating prospector.yaml * adding latest beam pipeline code for dataflow * updating beam pipeline to use GroupByKey * updating download_data script with new bucket * update prospector.yaml * update dataflow documentation with new commands for vpc * adding latest beam pipeline code for dataflow with group optimisation * updating dataflow documentation * adding latest beam pipeline code for dataflow with group optimisation * updating download_data script with pp-2020 dataset * adding temporary notes * updating dataflow notes * adding latest beam pipeline code * updating dataflow notes * adding latest beam pipeline code for dataflow * adding debug print * moving panda-profiling report into docs * updating report.py * adding entrypoint command * adding initial docs * adding commands.md to notes * commenting out debug imports * updating documentation * updating latest beam pipeline with default inputs * updating poetry * adding requirements.txt * updating documentation
3.4 KiB
Approach
The general approach to the pipeline is:
Loading stage
- Load using
#!python beam.io.ReadFromText() - Split the string loaded by
,as it's a comma delimited.csv. - Strip the leading/trailing
"marks.
The result is an array with each element representing a single column in that row.
Cleaning stage
Already discussed.
Create a mapping table
The mapping table takes each row and creates a (key,value) pair with:
- The key being the id across all columns (
id_all_columns). - The value being the raw data as an array.
The mapping table is then condensed to a single dictionary with these key, value pairs (automatically deduplicating repeated rows) and is used as a side input further down the pipeline.
This mapping table is created to ensure the GroupByKey operation is as quick as possible. The more data you have to process in a GroupByKey, the longer the operation takes. By doing the GroupByKey using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.
Prepare stage
- Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date (
id_without_price_date).
This id will not be unique: for properties with more than one transaction they will share this id.
- Create a
(key, value)pair with:- The key being
id_without_price_date. - The value being
id_all_columns.
- The key being
- Group by
id_without_price_date.
This results in a PCollection that looks like: (id_without_price_date, [id_all_columns,...])
- Deduplicate the
id_all_columnsinside this array to eliminate repeated rows that are exactly the same. - Use the mapping table as a side input to reinsert the raw data using the
id_all_columns.
Example for No.1 B90 3LA
Mapping table (pre condensed):
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
Mapping table (condensed):
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
Prepared (key, value):
('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41')
('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41')
Prepared (GroupByKey):
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
Prepared (Deduplicated):
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])
Use mapping table as side input:
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
Format stage
This stage takes the result and constructs a json object out of the grouped data. The schema for this output is discussed in the following page.
Save stage
- The PCollection is combined with
#!python beam.combiners.ToList() - Apply
json.dumps()for proper quotation marks for strings. - Write to text with
#!python beam.io.WriteToText.