Files
dtomlinson91 80376a662e Merge final release (#1)
* adding initial skeleton

* updating .gitignore

* updating dev dependencies

* adding report.py

* updating notes

* adding prospector.yaml

* updating beam to install gcp extras

* adding documentation

* adding data exploration report + code

* adding latest beam pipeline code

* adding latest beam pipeline code

* adding debug.py

* adding latesty beam pipeline code

* adding latest beam pipeline code

* adding latest beam pipeline code

* updating .gitignore

* updating folder structure for data input/output

* updating prospector.yaml

* adding latest beam pipeline code

* updating prospector.yaml

* migrate beam pipeline to main.py

* updating .gitignore

* updating .gitignore

* adding download script for data set

* adding initial docs

* moving inputs/outputs to use pathlib

* removing shard_name_template from output file

* adding pyenv 3.7.9

* removing requirements.txt for documentation

* updating README.md

* updating download data script for new location in GCS

* adding latest beam pipeline code for dataflow

* adding latest beam pipeline code for dataflow

* adding latest beam pipeline code for dataflow

* moving dataflow notes

* updating prospector.yaml

* adding latest beam pipeline code for dataflow

* updating beam pipeline to use GroupByKey

* updating download_data script with new bucket

* update prospector.yaml

* update dataflow documentation with new commands for vpc

* adding latest beam pipeline code for dataflow with group optimisation

* updating dataflow documentation

* adding latest beam pipeline code for dataflow with group optimisation

* updating download_data script with pp-2020 dataset

* adding temporary notes

* updating dataflow notes

* adding latest beam pipeline code

* updating dataflow notes

* adding latest beam pipeline code for dataflow

* adding debug print

* moving panda-profiling report into docs

* updating report.py

* adding entrypoint command

* adding initial docs

* adding commands.md to notes

* commenting out debug imports

* updating documentation

* updating latest beam pipeline with default inputs

* updating poetry

* adding requirements.txt

* updating documentation
2021-09-28 00:31:09 +01:00

96 lines
3.4 KiB
Markdown

# Approach
The general approach to the pipeline is:
## Loading stage
- Load using `#!python beam.io.ReadFromText()`
- Split the string loaded by `,` as it's a comma delimited `.csv`.
- Strip the leading/trailing `"` marks.
The result is an array with each element representing a single column in that row.
## Cleaning stage
Already discussed.
## Create a mapping table
The mapping table takes each row and creates a `(key,value)` pair with:
- The key being the id across all columns (`id_all_columns`).
- The value being the raw data as an array.
The mapping table is then condensed to a single dictionary with these key, value pairs (automatically deduplicating repeated rows) and is used as a side input further down the pipeline.
This mapping table is created to ensure the `GroupByKey` operation is as quick as possible. The more data you have to process in a `GroupByKey`, the longer the operation takes. By doing the `GroupByKey` using just the ids, the pipeline can process the files much quicker than if we included the raw data in this operation.
## Prepare stage
- Take the mapping table data (before it is condensed) and create a unique id ignoring the price and date (`id_without_price_date`).
This id will not be unique: for properties with more than one transaction they will share this id.
- Create a `(key, value)` pair with:
- The key being `id_without_price_date`.
- The value being `id_all_columns`.
- Group by `id_without_price_date`.
This results in a PCollection that looks like: `(id_without_price_date, [id_all_columns,...])`
- Deduplicate the `id_all_columns` inside this array to eliminate repeated rows that are exactly the same.
- Use the mapping table as a side input to reinsert the raw data using the `id_all_columns`.
<details>
<summary>Example for No.1 B90 3LA</summary>
Mapping table (pre condensed):
```json
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
```
Mapping table (condensed):
```json
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
```
Prepared (key, value):
```json
('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41')
('fe205bfe66bc7f18c50c8f3d77ec3e30', 'fd4634faec47c29de40bbf7840723b41')
```
Prepared (GroupByKey):
```json
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
```
Prepared (Deduplicated):
```json
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])
```
Use mapping table as side input:
```json
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
```
</details>
## Format stage
This stage takes the result and constructs a `json` object out of the grouped data. The schema for this output is discussed in the following page.
## Save stage
- The PCollection is combined with `#!python beam.combiners.ToList()`
- Apply `json.dumps()` for proper quotation marks for strings.
- Write to text with `#!python beam.io.WriteToText`.