Merge final release (#1)

* adding initial skeleton

* updating .gitignore

* updating dev dependencies

* adding report.py

* updating notes

* adding prospector.yaml

* updating beam to install gcp extras

* adding documentation

* adding data exploration report + code

* adding latest beam pipeline code

* adding latest beam pipeline code

* adding debug.py

* adding latesty beam pipeline code

* adding latest beam pipeline code

* adding latest beam pipeline code

* updating .gitignore

* updating folder structure for data input/output

* updating prospector.yaml

* adding latest beam pipeline code

* updating prospector.yaml

* migrate beam pipeline to main.py

* updating .gitignore

* updating .gitignore

* adding download script for data set

* adding initial docs

* moving inputs/outputs to use pathlib

* removing shard_name_template from output file

* adding pyenv 3.7.9

* removing requirements.txt for documentation

* updating README.md

* updating download data script for new location in GCS

* adding latest beam pipeline code for dataflow

* adding latest beam pipeline code for dataflow

* adding latest beam pipeline code for dataflow

* moving dataflow notes

* updating prospector.yaml

* adding latest beam pipeline code for dataflow

* updating beam pipeline to use GroupByKey

* updating download_data script with new bucket

* update prospector.yaml

* update dataflow documentation with new commands for vpc

* adding latest beam pipeline code for dataflow with group optimisation

* updating dataflow documentation

* adding latest beam pipeline code for dataflow with group optimisation

* updating download_data script with pp-2020 dataset

* adding temporary notes

* updating dataflow notes

* adding latest beam pipeline code

* updating dataflow notes

* adding latest beam pipeline code for dataflow

* adding debug print

* moving panda-profiling report into docs

* updating report.py

* adding entrypoint command

* adding initial docs

* adding commands.md to notes

* commenting out debug imports

* updating documentation

* updating latest beam pipeline with default inputs

* updating poetry

* adding requirements.txt

* updating documentation
This commit is contained in:
dtomlinson91
2021-09-28 00:31:09 +01:00
committed by GitHub
parent 8a22bfebe1
commit 80376a662e
34 changed files with 5667 additions and 1 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 374 KiB

47
docs/dataflow/index.md Normal file
View File

@@ -0,0 +1,47 @@
# Running on DataFlow
The pipeline runs as is on GCP DataFlow. The following documents how I deployed to my personal GCP account but the approach may vary depending on project/account in GCP.
## Prerequisites
### Cloud Storage
- A Cloud Storage bucket with the following structure:
```
./input
./output
./tmp
```
- Place the input files into the `./input` directory in the bucket.
### VPC
To get around public IP quotas I created a VPC in the `europe-west1` region that has `Private Google Access` turned to `ON`.
## Command
!!! tip
We need to choose a `worker_machine_type` with sufficient memory to run the pipeline. As the pipeline uses a mapping table, and DataFlow autoscales on CPU and not memory usage, we need a machine with more ram than usual to ensure sufficient memory when running on one worker. For `pp-2020.csv` the type `n1-highmem-2` with 2vCPU and 13GB of ram was chosen and completed successfully in ~10 minutes using only 1 worker.
Assuming the `pp-2020.csv` file has been placed in the `./input` directory in the bucket you can run a command similar to:
```bash
python -m analyse_properties.main \
--runner DataflowRunner \
--project street-group \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-2
```
The output file from this pipeline is publically available and can be downloaded [here](https://storage.googleapis.com/street-group-technical-test-dmot-euw1/output/pp-2020-00000-of-00001.json).
The job graph for this pipeline is displayed below:
![JobGraph](img/successful_dataflow_job.png)

70
docs/dataflow/scaling.md Normal file
View File

@@ -0,0 +1,70 @@
# Scaling to the full DataSet
As is the pipeline will not run against the full dataset. But with a little work done to the existing pipeline I believe it is possible to work against the full dataset of ~27 million rows.
## Mapping table
Using a mapping table as a side-input means that for the full dataset this table is going to be huge.
Side inputs are stored in memory on the workers, with such a huge table the machines are going to quickly run out of available memory when autoscaling is applied.
Running the pipeline against the full dataset resulted in the following error:
```text
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"
```
with the pipeline job failing to process anything and the rows being processed per/sec gradually falling to zero as the workers killed the Python process to try free up more memory. This resulted in autoscaling down (as the CPU decreased) and the entire pipeline stagnated.
Using a higher tiered `worker_machine_type`, disabling autoscaling, and fixing the workers to the maximum number of vCPUs available to the quota results in pipeline options:
```bash
--worker_machine_type=n1-highmem-8 \
--num_workers=3 \
--autoscaling_algorithm=NONE
```
with 156GB of RAM available to the pipeline with 52GB on each worker.
The pipeline was able to progress further until Python threw an error and the pipeline failed and shut down:
```text
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
...
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
```
The number 2147483647 being the maximum value for a 32bit integer.
As the side-input needs to be pickled (or serialised), this tells us that the table is far too large to be pickled and passed to the other workers. No amount of CPU/Memory can fix the problem.
## Patterns
Google have several patterns for large side-inputs which are documented here:
- Part 1 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
- Part 2 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2>
## Solution
A possible solution would be to leverage BigQuery to store the results of the mapping table in as the pipeline progresses. We can make use of BigQueries array type to literally store the raw array as we process each row.
In addition to creating the mapping table `(key, value)` pairs, we also save these pairs to BigQuery at this stage. We then yield the element as it is currently written to allow the subsequent stages to make use of this data.
Remove the condense mapping table stage as it is no longer needed (which also saves a bit of time).
Instead of using:
```python
beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
```
to insert the results of the mapping table we write a new `DoFn` that takes the element, and for each `id_all_columns` in the array we make a call to BigQuery to get the array for this ID and insert it at this stage.
Because each `id_all_columns` and its corresponding data is only used once, there would be no need to cache the results from BigQuery, however some work could be done to see if we could pull back more than one row at a time and cache these, saving time/costs in calls to BigQuery.