From 9f53c669752f9687114296dd37570cbccde0cfd0 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 15:57:00 +0100 Subject: [PATCH 01/23] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 49 ++++++++++++++++++++++++++++---------- notes/dataflow.md | 17 +++++++++++++ 2 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 notes/dataflow.md diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 0053233..c4f9df8 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -1,9 +1,12 @@ +import argparse from datetime import datetime import hashlib import itertools +import logging import pathlib import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn @@ -87,7 +90,7 @@ class DeduplicateByID(beam.DoFn): """ def process(self, element): - if len(element[1]) > 0: + if len(list(element[1])) > 0: deduplicated_element = (element[0], [element[1][0]]) yield deduplicated_element else: @@ -203,20 +206,42 @@ class ConvertDataToDict(beam.DoFn): yield json_object -def main(): - # Load in the data from a csv file. +def run(argv=None, save_main_session=True): + """Entrypoint and definition of the pipeline.""" + # Default input/output files input_file = ( pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-monthly-update-new-version.csv" ) + output_file = ( + pathlib.Path(__file__).parents[1] + / "data" + / "output" + / "pp-monthly-update-new-version" + ) - with beam.Pipeline() as pipeline: + # Arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--input", dest="input", default=str(input_file), help="Input file." + ) + parser.add_argument( + "--output", dest="output", default=str(output_file), help="Output file." + ) + known_args, pipeline_args = parser.parse_known_args(argv) + + # Pipeline options. save_main_session needed for DataFlow for global imports. + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # Load in the data from a csv file. + with beam.Pipeline(options=pipeline_options) as pipeline: # Load the data load = ( pipeline - | "Read input data" >> beam.io.ReadFromText(str(input_file)) + | "Read input data" >> beam.io.ReadFromText(known_args.input) | "Split by ','" >> beam.Map(lambda element: element.split(",")) | "Remove leading and trailing quotes" >> beam.Map(lambda element: [el.strip('"') for el in element]) @@ -248,8 +273,8 @@ def main(): | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) ) - # Prepare the data by generating an ID using the uniquely identifying information only - # and grouping them by this ID. + # Prepare the data by generating an ID using the uniquely identifying + # information only and grouping them by this ID. prepare = ( clean_deduplicate | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) @@ -267,19 +292,17 @@ def main(): ) # Save the data to a .json file. - output_file = ( - pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete" - ) - output = ( + ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() | "Save to .json file" >> beam.io.WriteToText( - file_path_prefix=str(output_file), + file_path_prefix=known_args.output, file_name_suffix=".json", ) ) if __name__ == "__main__": - main() + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/notes/dataflow.md b/notes/dataflow.md new file mode 100644 index 0000000..54f16df --- /dev/null +++ b/notes/dataflow.md @@ -0,0 +1,17 @@ +# DataFlow + + + +Export env variable: + +`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"` + +Run the pipeline: + +python -m analyse_properties.main \ + --region europe-west2 \ + --input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \ + --output gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version \ + --runner DataflowRunner \ + --project street-group \ + --temp_location gs://street-group-technical-test-dmot/tmp From 8047b5ced44c3bd0904a006275899694877901d0 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 16:16:58 +0100 Subject: [PATCH 02/23] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index c4f9df8..f3c27f9 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -91,7 +91,7 @@ class DeduplicateByID(beam.DoFn): def process(self, element): if len(list(element[1])) > 0: - deduplicated_element = (element[0], [element[1][0]]) + deduplicated_element = list(list(element[0]), list([element[1][0]])) yield deduplicated_element else: yield element From bb71d55f8ca4d9339e50e3f4f1e6ef6fdd073adb Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 16:23:19 +0100 Subject: [PATCH 03/23] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index f3c27f9..ac1c899 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -91,7 +91,7 @@ class DeduplicateByID(beam.DoFn): def process(self, element): if len(list(element[1])) > 0: - deduplicated_element = list(list(element[0]), list([element[1][0]])) + deduplicated_element = (list(element[0]), [list(element[1])[0]]) yield deduplicated_element else: yield element From fded8589327ea1d0dca9d3e389efbb383fc98ef7 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 17:15:17 +0100 Subject: [PATCH 04/23] moving dataflow notes --- notes/{ => documentation}/dataflow.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) rename notes/{ => documentation}/dataflow.md (69%) diff --git a/notes/dataflow.md b/notes/documentation/dataflow.md similarity index 69% rename from notes/dataflow.md rename to notes/documentation/dataflow.md index 54f16df..d0a82ea 100644 --- a/notes/dataflow.md +++ b/notes/documentation/dataflow.md @@ -11,7 +11,14 @@ Run the pipeline: python -m analyse_properties.main \ --region europe-west2 \ --input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \ - --output gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version \ + --output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \ --runner DataflowRunner \ --project street-group \ --temp_location gs://street-group-technical-test-dmot/tmp + + +## Errors + +Unsubscriptable error on window: + + From 8856a9763f8ce39b0294291263c119b89f1288d7 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 17:15:24 +0100 Subject: [PATCH 05/23] updating prospector.yaml --- prospector.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/prospector.yaml b/prospector.yaml index 86f9e76..190f1a2 100644 --- a/prospector.yaml +++ b/prospector.yaml @@ -20,6 +20,7 @@ pylint: - super-init-not-called - arguments-differ - inconsistent-return-statements + - expression-not-assigned enable: options: From 4e3771c72832068ada1a163acecffeecd84b904d Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 17:15:35 +0100 Subject: [PATCH 06/23] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index ac1c899..6c86ddb 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -170,15 +170,15 @@ class ConvertDataToDict(beam.DoFn): json_object = { "property_id": element[0], "readable_address": None, - "flat_appartment": element[-1][0][4], - "builing": element[-1][0][10], - "number": element[-1][0][3], - "street": element[-1][0][5], - "locality": element[-1][0][6], - "town": element[-1][0][7], - "district": element[-1][0][8], - "county": element[-1][0][9], - "postcode": element[-1][0][2], + "flat_appartment": list(element[-1])[0][4], + "builing": list(element[-1])[0][10], + "number": list(element[-1])[0][3], + "street": list(element[-1])[0][5], + "locality": list(element[-1])[0][6], + "town": list(element[-1])[0][7], + "district": list(element[-1])[0][8], + "county": list(element[-1])[0][9], + "postcode": list(element[-1])[0][2], "property_transactions": property_transactions, "latest_transaction_year": self.get_latest_transaction( [ From 8e8469579e7b9166505ca4aec66b423f9ad4be8f Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 20:29:11 +0100 Subject: [PATCH 07/23] updating beam pipeline to use GroupByKey --- analyse_properties/main.py | 73 +++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 6c86ddb..6a190de 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -2,6 +2,7 @@ import argparse from datetime import datetime import hashlib import itertools +import json import logging import pathlib @@ -12,17 +13,37 @@ from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions def slice_by_range(element, *ranges): - """Slice a list with multiple ranges.""" + """ + Slice a list with multiple ranges. + + Args: + element : The element. + *ranges (tuple): Tuples containing a start,end index to slice the element. + E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else. + + Returns: + list: The list sliced by the ranges + """ return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) class DropRecordsSingleEmptyColumn(beam.DoFn): - """If a given item in a list is empty, drop this entry from the PCollection.""" - def __init__(self, index): self.index = index def process(self, element): + """ + Drop the entire row if a given column is empty. + + Args: + element : The element + + Returns: + None: If the length of the column is 0, drop the element. + + Yields: + element: If the length of the column isn't 0, keep the element. + """ column = element[self.index] if len(column) == 0: return None @@ -78,9 +99,9 @@ class GenerateUniqueID(beam.DoFn): ",".join(element[2:]) if not self.all_columns else ",".join(element) ) hashed_string = hashlib.md5(unique_string.encode()) - # append the hash to the end - element.append(hashed_string.hexdigest()) - yield element + # add the hash as a key to the data. + new_element = (hashed_string.hexdigest(), list(element)) + yield new_element class DeduplicateByID(beam.DoFn): @@ -91,7 +112,7 @@ class DeduplicateByID(beam.DoFn): def process(self, element): if len(list(element[1])) > 0: - deduplicated_element = (list(element[0]), [list(element[1])[0]]) + deduplicated_element = (element[0], [list(element[1])[0]]) yield deduplicated_element else: yield element @@ -102,7 +123,6 @@ class RemoveUniqueID(beam.DoFn): def process(self, element): element_no_id = element[-1][0] - element_no_id.pop(-1) yield element_no_id @@ -169,7 +189,7 @@ class ConvertDataToDict(beam.DoFn): # Create the dict to hold all the information about the property. json_object = { "property_id": element[0], - "readable_address": None, + # "readable_address": None, "flat_appartment": list(element[-1])[0][4], "builing": list(element[-1])[0][10], "number": list(element[-1])[0][3], @@ -189,20 +209,20 @@ class ConvertDataToDict(beam.DoFn): } # Create a human readable address to go in the dict. - json_object["readable_address"] = self.get_readable_address( - [ - json_object["flat_appartment"], - json_object["builing"], - f'{json_object["number"]} {json_object["street"]}', - json_object["postcode"], - ], - [ - json_object["locality"], - json_object["town"], - json_object["district"], - json_object["county"], - ], - ) + # json_object["readable_address"] = self.get_readable_address( + # [ + # json_object["flat_appartment"], + # json_object["builing"], + # f'{json_object["number"]} {json_object["street"]}', + # json_object["postcode"], + # ], + # [ + # json_object["locality"], + # json_object["town"], + # json_object["district"], + # json_object["county"], + # ], + # ) yield json_object @@ -259,6 +279,7 @@ def run(argv=None, save_main_session=True): | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) @@ -269,7 +290,7 @@ def run(argv=None, save_main_session=True): | "Generate unique ID for all columns" >> beam.ParDo(GenerateUniqueID(all_columns=True)) | "Group by the ID for all columns" - >> beam.GroupBy(lambda element: element[-1]) + >> beam.GroupByKey() | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) ) @@ -281,7 +302,8 @@ def run(argv=None, save_main_session=True): | "Generate unique ID ignoring price & date" >> beam.ParDo(GenerateUniqueID()) | "Group by the ID ignoring price & date" - >> beam.GroupBy(lambda element: element[-1]) + >> beam.GroupByKey() + # | beam.Map(print) ) # Format the data into a dict. @@ -295,6 +317,7 @@ def run(argv=None, save_main_session=True): ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() + | "Format output" >> beam.Map(json.dumps) | "Save to .json file" >> beam.io.WriteToText( file_path_prefix=known_args.output, From 99e67c2840d4a3382c1912d82ce539ba1f35c4ed Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 23:28:12 +0100 Subject: [PATCH 08/23] updating download_data script with new bucket --- download_data.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/download_data.sh b/download_data.sh index eb647da..d179f9b 100755 --- a/download_data.sh +++ b/download_data.sh @@ -1,5 +1,5 @@ # Full data set -# wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input +# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input # Monthly update data set -wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input +wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input From 1941fcb7bf9eded77bab1700e1e863465ba534bd Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 23:28:21 +0100 Subject: [PATCH 09/23] update prospector.yaml --- prospector.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/prospector.yaml b/prospector.yaml index 190f1a2..ead732e 100644 --- a/prospector.yaml +++ b/prospector.yaml @@ -21,6 +21,7 @@ pylint: - arguments-differ - inconsistent-return-statements - expression-not-assigned + - line-too-long enable: options: From eaa36877f69e560c8a25a77a13bc6faeb25a9f86 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 23:28:35 +0100 Subject: [PATCH 10/23] update dataflow documentation with new commands for vpc --- notes/documentation/dataflow.md | 52 +++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/notes/documentation/dataflow.md b/notes/documentation/dataflow.md index d0a82ea..fd483e4 100644 --- a/notes/documentation/dataflow.md +++ b/notes/documentation/dataflow.md @@ -2,23 +2,65 @@ +## Examples + +Full example of beam pipeline on dataflow: + + + +## Setup + Export env variable: `export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"` -Run the pipeline: +## Run pipeline +### Dataflow + +#### Monthly dataset + +```bash python -m analyse_properties.main \ - --region europe-west2 \ - --input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \ - --output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \ + --region europe-west1 \ + --input gs://street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv \ + --output gs://street-group-technical-test-dmot-euw1/output/pp-monthly-update-new-version \ --runner DataflowRunner \ --project street-group \ - --temp_location gs://street-group-technical-test-dmot/tmp + --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ + --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ + --no_use_public_ips +``` +#### Full dataset + +```bash +python -m analyse_properties.main \ + --region europe-west1 \ + --input gs://street-group-technical-test-dmot-euw1/input/pp-complete.csv \ + --output gs://street-group-technical-test-dmot-euw1/output/pp-complete \ + --runner DataflowRunner \ + --project street-group \ + --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ + --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ + --no_use_public_ips +``` + +### Locally + +Run the pipeline locally: + +`python -m analyse_properties.main --runner DirectRunner` ## Errors Unsubscriptable error on window: + +## Documentation + +Running in its own private VPC without public IPs + +- +- From a8fc06c7643056d578af67cf376a76a05485b48f Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 23:28:58 +0100 Subject: [PATCH 11/23] adding latest beam pipeline code for dataflow with group optimisation --- analyse_properties/main.py | 112 ++++++++++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 20 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 6a190de..702c977 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -85,7 +85,7 @@ class SplitColumn(beam.DoFn): yield element -class GenerateUniqueID(beam.DoFn): +class CreateMappingTable(beam.DoFn): """ Generate a unique ID for the PCollection, either for all the columns or for the uniquely identifying data only. @@ -104,6 +104,49 @@ class GenerateUniqueID(beam.DoFn): yield new_element +# class CreateMappingTable(beam.DoFn): +# def process(self, element): +# unique_string = ",".join(element) +# hashed_string = hashlib.md5(unique_string.encode()) +# new_element = {hashed_string.hexdigest(): list(element)} +# yield new_element + + +class CreateUniquePropertyID(beam.DoFn): + def process(self, element): + unique_string = ",".join(element[-1][2:]) + hashed_string = hashlib.md5(unique_string.encode()) + new_element = (hashed_string.hexdigest(), element[0]) + yield new_element + + +class DeduplicateIDs(beam.DoFn): + def process(self, element): + deduplicated_list = list(set(element[-1])) + new_element = (element[0], deduplicated_list) + yield new_element + + +# class InsertDataForID(beam.DoFn): +# def __init__(self, mapping_table): +# self.mapping_table = mapping_table + +# def process(self, element): +# replaced_list = [self.mapping_table[x] for x in element[-1]] +# new_element = (element[0], replaced_list) +# yield new_element + + +def insert_data_for_id(element, mapping_table): + replaced_list = [] + for data_id in element[-1]: + replaced_list.append(mapping_table[data_id]) + # replaced_list = [mapping_table[x] for x in element[-1]] + new_element = (element[0], replaced_list) + yield new_element + + +# old class DeduplicateByID(beam.DoFn): """ If the PCollection has multiple entries after being grouped by ID for all columns, @@ -280,37 +323,66 @@ def run(argv=None, save_main_session=True): | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) - # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. - clean_deduplicate = ( - clean_drop - | "Generate unique ID for all columns" - >> beam.ParDo(GenerateUniqueID(all_columns=True)) - | "Group by the ID for all columns" - >> beam.GroupByKey() - | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) - ) + # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. + # clean_deduplicate = ( + # clean_drop + # | "Generate ID using all columns" + # >> beam.ParDo(GenerateUniqueID(all_columns=True)) + # | "Group by the ID for all columns" >> beam.GroupByKey() + # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) + # ) - # Prepare the data by generating an ID using the uniquely identifying - # information only and grouping them by this ID. - prepare = ( - clean_deduplicate - | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) - | "Generate unique ID ignoring price & date" - >> beam.ParDo(GenerateUniqueID()) - | "Group by the ID ignoring price & date" - >> beam.GroupByKey() + # Create a mapping table + mapping_table_raw = ( + clean_drop + | "Create a mapping table with key of id_all_columns and value of cleaned data." + >> beam.ParDo(CreateMappingTable(all_columns=True)) # | beam.Map(print) ) + mapping_table_condensed = ( + mapping_table_raw + | "Condense mapping table into single dict" >> beam.combiners.ToDict() + # | beam.Map(print) + ) + + prepared = ( + mapping_table_raw + | "Create unique ID ignoring price & date" + >> beam.ParDo(CreateUniquePropertyID()) + | "Group IDs using all columns by IDs ignoring price & date" + >> beam.GroupByKey() + | "Deduplicate to eliminate repeated transactions" + >> beam.ParDo(DeduplicateIDs()) + | "Insert the raw data using the mapping table" + >> beam.FlatMap( + insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed) + ) + # | beam.Map(print) + ) + + # # Prepare the data by generating an ID using the uniquely identifying + # # information only and grouping them by this ID. + # prepare = ( + # clean_deduplicate + # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) + # | "Generate unique ID ignoring price & date old" + # >> beam.ParDo(GenerateUniqueID()) + # | "Group by the ID ignoring price & date" >> beam.GroupByKey() + # # | beam.Map(print) + # ) + # Format the data into a dict. formatted = ( - prepare + prepared | "Convert the prepared data into a dict object" >> beam.ParDo(ConvertDataToDict()) + # | beam.Map(print) ) # Save the data to a .json file. From 377e3c703f214d2cafb443b38f07ae0d6c1f9b22 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 01:35:48 +0100 Subject: [PATCH 12/23] updating dataflow documentation --- notes/documentation/dataflow.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/notes/documentation/dataflow.md b/notes/documentation/dataflow.md index fd483e4..63a6fa1 100644 --- a/notes/documentation/dataflow.md +++ b/notes/documentation/dataflow.md @@ -43,9 +43,14 @@ python -m analyse_properties.main \ --project street-group \ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ - --no_use_public_ips + --no_use_public_ips \ + --worker_machine_type=n1-highmem-8 ``` + —-disk_size_gb=50 \ + + + ### Locally Run the pipeline locally: @@ -64,3 +69,17 @@ Running in its own private VPC without public IPs - - + +Error help + +- +- + +Scaling + +Using DataFlowPrime: +Use `--experiments=enable_prime` + +Deploying a pipeline (with scaling options): + +Available VM types (with pricing): From 3a745794402a6fc963dc8fdc2f672e1608685898 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:18:17 +0100 Subject: [PATCH 13/23] adding latest beam pipeline code for dataflow with group optimisation --- analyse_properties/main.py | 172 ++++++++++++++----------------------- 1 file changed, 65 insertions(+), 107 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 702c977..241f3f7 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -9,8 +9,6 @@ import pathlib import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions -# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn - def slice_by_range(element, *ranges): """ @@ -28,22 +26,23 @@ def slice_by_range(element, *ranges): class DropRecordsSingleEmptyColumn(beam.DoFn): + """ + Drop the entire row if a given column is empty. + + Args: + index : The index of the column in the list. + + Returns: + None: If the length of the column is 0, drop the element. + + Yields: + element: If the length of the column isn't 0, keep the element. + """ + def __init__(self, index): self.index = index def process(self, element): - """ - Drop the entire row if a given column is empty. - - Args: - element : The element - - Returns: - None: If the length of the column is 0, drop the element. - - Yields: - element: If the length of the column isn't 0, keep the element. - """ column = element[self.index] if len(column) == 0: return None @@ -51,7 +50,19 @@ class DropRecordsSingleEmptyColumn(beam.DoFn): class DropRecordsTwoEmptyColumn(beam.DoFn): - """If two given items in a list are both empty, drop this entry from the PCollection.""" + """ + Drop the entire row if both of two given columns are empty. + + Args: + index_0 : The index of the first column in the list. + index_1 : The index of the second column in the list. + + Returns: + None: If the length of both columns is 0, drop the element. + + Yields: + element: If the length of both columns isn't 0, keep the element. + """ def __init__(self, index_0, index_1): self.index_0 = index_0 @@ -66,7 +77,14 @@ class DropRecordsTwoEmptyColumn(beam.DoFn): class SplitColumn(beam.DoFn): - """Split an item in a list into two separate items in the PCollection.""" + """ + Split one column into two columns by a character. + + Args: + index : The index of the column in the list. + split_char: The character to split the column by. + + """ def __init__(self, index, split_char): self.index = index @@ -74,7 +92,7 @@ class SplitColumn(beam.DoFn): def process(self, element): # If there is a split based on the split_char, then keep the first result in - # place and append the second. + # place and append the second column at the end. try: part_0, part_1 = element[self.index].split(self.split_char) element[self.index] = part_1.strip() @@ -87,8 +105,16 @@ class SplitColumn(beam.DoFn): class CreateMappingTable(beam.DoFn): """ - Generate a unique ID for the PCollection, either for all the columns or for the - uniquely identifying data only. + Create a mapping table to be used as a side-input. + + This mapping table has a key of an id generated across all columns and a value of + the raw property data. + + The table is used to populate the raw property data after a GroupByKey which is done on ids only + in order to reduce the amount of data processed in the GroupByKey operation. + + Args: + all_columns """ def __init__(self, all_columns=False): @@ -104,14 +130,6 @@ class CreateMappingTable(beam.DoFn): yield new_element -# class CreateMappingTable(beam.DoFn): -# def process(self, element): -# unique_string = ",".join(element) -# hashed_string = hashlib.md5(unique_string.encode()) -# new_element = {hashed_string.hexdigest(): list(element)} -# yield new_element - - class CreateUniquePropertyID(beam.DoFn): def process(self, element): unique_string = ",".join(element[-1][2:]) @@ -127,48 +145,12 @@ class DeduplicateIDs(beam.DoFn): yield new_element -# class InsertDataForID(beam.DoFn): -# def __init__(self, mapping_table): -# self.mapping_table = mapping_table - -# def process(self, element): -# replaced_list = [self.mapping_table[x] for x in element[-1]] -# new_element = (element[0], replaced_list) -# yield new_element - - def insert_data_for_id(element, mapping_table): - replaced_list = [] - for data_id in element[-1]: - replaced_list.append(mapping_table[data_id]) - # replaced_list = [mapping_table[x] for x in element[-1]] + replaced_list = [mapping_table[data_id] for data_id in element[-1]] new_element = (element[0], replaced_list) yield new_element -# old -class DeduplicateByID(beam.DoFn): - """ - If the PCollection has multiple entries after being grouped by ID for all columns, - deduplicate the list to keep only one. - """ - - def process(self, element): - if len(list(element[1])) > 0: - deduplicated_element = (element[0], [list(element[1])[0]]) - yield deduplicated_element - else: - yield element - - -class RemoveUniqueID(beam.DoFn): - """Remove the unique ID from the PCollection, transforming it back into a list.""" - - def process(self, element): - element_no_id = element[-1][0] - yield element_no_id - - class ConvertDataToDict(beam.DoFn): """Convert the processed data into a dict to be exported as a JSON object.""" @@ -232,7 +214,7 @@ class ConvertDataToDict(beam.DoFn): # Create the dict to hold all the information about the property. json_object = { "property_id": element[0], - # "readable_address": None, + "readable_address": None, "flat_appartment": list(element[-1])[0][4], "builing": list(element[-1])[0][10], "number": list(element[-1])[0][3], @@ -252,20 +234,20 @@ class ConvertDataToDict(beam.DoFn): } # Create a human readable address to go in the dict. - # json_object["readable_address"] = self.get_readable_address( - # [ - # json_object["flat_appartment"], - # json_object["builing"], - # f'{json_object["number"]} {json_object["street"]}', - # json_object["postcode"], - # ], - # [ - # json_object["locality"], - # json_object["town"], - # json_object["district"], - # json_object["county"], - # ], - # ) + json_object["readable_address"] = self.get_readable_address( + [ + json_object["flat_appartment"], + json_object["builing"], + f'{json_object["number"]} {json_object["street"]}', + json_object["postcode"], + ], + [ + json_object["locality"], + json_object["town"], + json_object["district"], + json_object["county"], + ], + ) yield json_object @@ -276,13 +258,15 @@ def run(argv=None, save_main_session=True): pathlib.Path(__file__).parents[1] / "data" / "input" - / "pp-monthly-update-new-version.csv" + / "pp-2020.csv" + # / "pp-complete.csv" ) output_file = ( pathlib.Path(__file__).parents[1] / "data" / "output" - / "pp-monthly-update-new-version" + / "pp-2020" + # / "pp-complete" ) # Arguments @@ -322,33 +306,20 @@ def run(argv=None, save_main_session=True): | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) - # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) - # | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) - # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. - # clean_deduplicate = ( - # clean_drop - # | "Generate ID using all columns" - # >> beam.ParDo(GenerateUniqueID(all_columns=True)) - # | "Group by the ID for all columns" >> beam.GroupByKey() - # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) - # ) - # Create a mapping table mapping_table_raw = ( clean_drop | "Create a mapping table with key of id_all_columns and value of cleaned data." >> beam.ParDo(CreateMappingTable(all_columns=True)) - # | beam.Map(print) ) mapping_table_condensed = ( mapping_table_raw | "Condense mapping table into single dict" >> beam.combiners.ToDict() - # | beam.Map(print) ) prepared = ( @@ -363,26 +334,13 @@ def run(argv=None, save_main_session=True): >> beam.FlatMap( insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed) ) - # | beam.Map(print) ) - # # Prepare the data by generating an ID using the uniquely identifying - # # information only and grouping them by this ID. - # prepare = ( - # clean_deduplicate - # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) - # | "Generate unique ID ignoring price & date old" - # >> beam.ParDo(GenerateUniqueID()) - # | "Group by the ID ignoring price & date" >> beam.GroupByKey() - # # | beam.Map(print) - # ) - # Format the data into a dict. formatted = ( prepared | "Convert the prepared data into a dict object" >> beam.ParDo(ConvertDataToDict()) - # | beam.Map(print) ) # Save the data to a .json file. From 7db1edb90c3c447218d5d25479064dbb006935d2 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:18:33 +0100 Subject: [PATCH 14/23] updating download_data script with pp-2020 dataset --- download_data.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/download_data.sh b/download_data.sh index d179f9b..01d789a 100755 --- a/download_data.sh +++ b/download_data.sh @@ -2,4 +2,7 @@ # wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input # Monthly update data set -wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input +# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input + +# 2020 data set +wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input From f2ed60426dba030bfbbb3e1a9ed14a0a0ed8cace Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:18:42 +0100 Subject: [PATCH 15/23] adding temporary notes --- notes/tmp/errordata | 27 ++++++++++++++++++++++++++ notes/tmp/exampledata | 44 +++++++++++++++++++++++++++++++++++++++++++ notes/tmp/runningdata | 16 ++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 notes/tmp/errordata create mode 100644 notes/tmp/exampledata create mode 100644 notes/tmp/runningdata diff --git a/notes/tmp/errordata b/notes/tmp/errordata new file mode 100644 index 0000000..2df356d --- /dev/null +++ b/notes/tmp/errordata @@ -0,0 +1,27 @@ +"Error message from worker: Traceback (most recent call last): + File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work + work_executor.execute() + File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 181, in execute + op.finish() + File "dataflow_worker/native_operations.py", line 93, in dataflow_worker.native_operations.NativeWriteOperation.finish + File "dataflow_worker/native_operations.py", line 94, in dataflow_worker.native_operations.NativeWriteOperation.finish + File "dataflow_worker/native_operations.py", line 95, in dataflow_worker.native_operations.NativeWriteOperation.finish + File "/usr/local/lib/python3.7/site-packages/dataflow_worker/nativeavroio.py", line 308, in __exit__ + self._data_file_writer.flush() + File "fastavro/_write.pyx", line 664, in fastavro._write.Writer.flush + File "fastavro/_write.pyx", line 639, in fastavro._write.Writer.dump + File "fastavro/_write.pyx", line 451, in fastavro._write.snappy_write_block + File "fastavro/_write.pyx", line 458, in fastavro._write.snappy_write_block + File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystemio.py", line 200, in write + self._uploader.put(b) + File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py", line 720, in put + self._conn.send_bytes(data.tobytes()) + File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes + self._send_bytes(m[offset:offset + size]) + File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes + header = struct.pack("!i", n) +struct.error: 'i' format requires -2147483648 <= number <= 2147483647 +" + + +"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900" diff --git a/notes/tmp/exampledata b/notes/tmp/exampledata new file mode 100644 index 0000000..97c9248 --- /dev/null +++ b/notes/tmp/exampledata @@ -0,0 +1,44 @@ +[{ + "property_id": "3cf3c06632c46754696f2017933702f3", + "flat_appartment": "", + "builing": "", + "number": "63", + "street": "ROTTON PARK STREET", + "locality": "", + "town": "BIRMINGHAM", + "district": "BIRMINGHAM", + "county": "WEST MIDLANDS", + "postcode": "B16 0AE", + "property_transactions": [ + { "price": "385000", "transaction_date": "2021-01-08", "year": "2021" }, + { "price": "701985", "transaction_date": "2019-03-28", "year": "2019" }, + { "price": "1748761", "transaction_date": "2020-05-27", "year": "2020" } + ], + "latest_transaction_year": "2021" +}, +{ + "property_id": "c650d5d7bb0daf0a19bb2cacabbee74e", + "readable_address": "16 STATION ROAD\nPARKGATE\nNESTON\nCHESHIRE WEST AND CHESTER\nCH64 6QJ", + "flat_appartment": "", + "builing": "", + "number": "16", + "street": "STATION ROAD", + "locality": "PARKGATE", + "town": "NESTON", + "district": "CHESHIRE WEST AND CHESTER", + "county": "CHESHIRE WEST AND CHESTER", + "postcode": "CH64 6QJ", + "property_transactions": [ + { + "price": "280000", + "transaction_date": "2020-11-30", + "year": "2020" + }, + { + "price": "265000", + "transaction_date": "2020-05-29", + "year": "2020" + } + ], + "latest_transaction_year": "2020" +}] diff --git a/notes/tmp/runningdata b/notes/tmp/runningdata new file mode 100644 index 0000000..0929edb --- /dev/null +++ b/notes/tmp/runningdata @@ -0,0 +1,16 @@ + + +Create Mapping table +('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']) +('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']) + +Condensing +{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']} + + +Prepared +GroupByKey +('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41']) + +deduplicated +('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41']) From f60beb456514cdadfaf5b68d3472f8c9af7a849d Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:18:49 +0100 Subject: [PATCH 16/23] updating dataflow notes --- notes/documentation/dataflow.md | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/notes/documentation/dataflow.md b/notes/documentation/dataflow.md index 63a6fa1..f263c04 100644 --- a/notes/documentation/dataflow.md +++ b/notes/documentation/dataflow.md @@ -18,18 +18,19 @@ Export env variable: ### Dataflow -#### Monthly dataset +#### Yearly dataset ```bash python -m analyse_properties.main \ --region europe-west1 \ - --input gs://street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv \ - --output gs://street-group-technical-test-dmot-euw1/output/pp-monthly-update-new-version \ + --input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \ + --output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \ --runner DataflowRunner \ --project street-group \ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ - --no_use_public_ips + --no_use_public_ips \ + --worker_machine_type=n1-highmem-2 ``` #### Full dataset @@ -44,13 +45,11 @@ python -m analyse_properties.main \ --temp_location gs://street-group-technical-test-dmot-euw1/tmp \ --subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \ --no_use_public_ips \ - --worker_machine_type=n1-highmem-8 + --worker_machine_type=n1-highmem-8 \ + --num_workers=3 \ + --autoscaling_algorithm=NONE ``` - —-disk_size_gb=50 \ - - - ### Locally Run the pipeline locally: @@ -83,3 +82,7 @@ Use `--experiments=enable_prime` Deploying a pipeline (with scaling options): Available VM types (with pricing): + +Performance + +Sideinput performance: From 391861d80cabdd644829eebde80d745191dcba2a Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:39:30 +0100 Subject: [PATCH 17/23] adding latest beam pipeline code --- analyse_properties/main.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 241f3f7..d2bce05 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -83,7 +83,6 @@ class SplitColumn(beam.DoFn): Args: index : The index of the column in the list. split_char: The character to split the column by. - """ def __init__(self, index, split_char): @@ -107,30 +106,40 @@ class CreateMappingTable(beam.DoFn): """ Create a mapping table to be used as a side-input. - This mapping table has a key of an id generated across all columns and a value of + This mapping table has a key of an ID generated across all columns and a value of the raw property data. - The table is used to populate the raw property data after a GroupByKey which is done on ids only - in order to reduce the amount of data processed in the GroupByKey operation. + The table is used to populate the raw property data after a GroupByKey using + only the IDs in order to reduce the amount of data processed in the GroupByKey operation. Args: - all_columns + all_columns(bool): If True will use all fields to calculate the ID. If false + will exclude the first two. """ def __init__(self, all_columns=False): self.all_columns = all_columns def process(self, element): + # Join the row into a string. unique_string = ( ",".join(element[2:]) if not self.all_columns else ",".join(element) ) + # Hash the string. hashed_string = hashlib.md5(unique_string.encode()) - # add the hash as a key to the data. + # Format the resulting PCollection with the key of id and value of raw data. new_element = (hashed_string.hexdigest(), list(element)) yield new_element class CreateUniquePropertyID(beam.DoFn): + """ + Create a unique property ID which does not include the price and date of sale. + + Uses each row of the mapping table to create a PCollection with a key of the + unique property ID and a value of the ID generated across all columns. + """ + def process(self, element): unique_string = ",".join(element[-1][2:]) hashed_string = hashlib.md5(unique_string.encode()) From cad6612ebee81858485a605651db30263671c33a Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:39:40 +0100 Subject: [PATCH 18/23] updating dataflow notes --- notes/documentation/dataflow.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/notes/documentation/dataflow.md b/notes/documentation/dataflow.md index f263c04..83c98f8 100644 --- a/notes/documentation/dataflow.md +++ b/notes/documentation/dataflow.md @@ -86,3 +86,8 @@ Available VM types (with pricing): + +Common use cases: + +- Part 1 +- Part 2 From f9eeb8bfadb69068b204472be37d52398cf82744 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 21:17:39 +0100 Subject: [PATCH 19/23] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 88 ++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 28 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index d2bce05..969a09d 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -9,6 +9,7 @@ import pathlib import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from analyse_properties.debug import * # noqa def slice_by_range(element, *ranges): """ @@ -36,7 +37,7 @@ class DropRecordsSingleEmptyColumn(beam.DoFn): None: If the length of the column is 0, drop the element. Yields: - element: If the length of the column isn't 0, keep the element. + element: If the length of the column is >0, keep the element. """ def __init__(self, index): @@ -61,7 +62,7 @@ class DropRecordsTwoEmptyColumn(beam.DoFn): None: If the length of both columns is 0, drop the element. Yields: - element: If the length of both columns isn't 0, keep the element. + element: If the length of both columns is >0, keep the element. """ def __init__(self, index_0, index_1): @@ -90,14 +91,15 @@ class SplitColumn(beam.DoFn): self.split_char = split_char def process(self, element): - # If there is a split based on the split_char, then keep the first result in - # place and append the second column at the end. + # If there is a split based on the split_char, then keep the second result in + # place (street number) and append the first result (building) at the end. try: part_0, part_1 = element[self.index].split(self.split_char) element[self.index] = part_1.strip() element.append(part_0.strip()) yield element except ValueError: + # append a blank column to keep column numbers consistent. element.append("") yield element @@ -111,20 +113,11 @@ class CreateMappingTable(beam.DoFn): The table is used to populate the raw property data after a GroupByKey using only the IDs in order to reduce the amount of data processed in the GroupByKey operation. - - Args: - all_columns(bool): If True will use all fields to calculate the ID. If false - will exclude the first two. """ - def __init__(self, all_columns=False): - self.all_columns = all_columns - def process(self, element): # Join the row into a string. - unique_string = ( - ",".join(element[2:]) if not self.all_columns else ",".join(element) - ) + unique_string = ",".join(element) # Hash the string. hashed_string = hashlib.md5(unique_string.encode()) # Format the resulting PCollection with the key of id and value of raw data. @@ -148,6 +141,8 @@ class CreateUniquePropertyID(beam.DoFn): class DeduplicateIDs(beam.DoFn): + """Deduplicate a list of IDs.""" + def process(self, element): deduplicated_list = list(set(element[-1])) new_element = (element[0], deduplicated_list) @@ -155,6 +150,16 @@ class DeduplicateIDs(beam.DoFn): def insert_data_for_id(element, mapping_table): + """ + Replace the ID with the raw data from the mapping table. + + Args: + element: The element. + mapping_table (dict): The mapping table. + + Yields: + The element with IDs replaced with raw data. + """ replaced_list = [mapping_table[data_id] for data_id in element[-1]] new_element = (element[0], replaced_list) yield new_element @@ -165,7 +170,15 @@ class ConvertDataToDict(beam.DoFn): @staticmethod def get_latest_transaction(transaction_dates): - """Get the date of the latest transaction.""" + """ + Get the date of the latest transaction for a list of dates. + + Args: + transaction_dates (str): A date in the form "%Y-%m-%d". + + Returns: + str: The year in the form "%Y" of the latest transaction date. + """ transaction_dates = [ datetime.strptime(individual_transaction, "%Y-%m-%d") for individual_transaction in transaction_dates @@ -173,7 +186,17 @@ class ConvertDataToDict(beam.DoFn): return max(transaction_dates).strftime("%Y") @staticmethod - def get_readable_address(address_components: list, address_comparisons: list): + def get_readable_address(address_components, address_comparisons): + """ + Create a human readable address from the locality/town/district/county columns. + + Args: + address_components (list): The preceeding parts of the address (street, postcode etc.) + address_comparisons (list): The locality/town/district/county. + + Returns: + str: The complete address deduplicated & cleaned. + """ # Get pairwise comparison to see if two locality/town/district/counties # are equivalent pairwise_comparison = [ @@ -194,7 +217,6 @@ class ConvertDataToDict(beam.DoFn): applied_mask = list(itertools.compress(address_comparisons, mask)) # Filter out empty items in list deduplicated_address_part = list(filter(None, applied_mask)) - # Filter out any missing parts of the address components cleaned_address_components = list(filter(None, address_components)) @@ -213,9 +235,9 @@ class ConvertDataToDict(beam.DoFn): # Group together all the transactions for the property. property_transactions = [ { - "price": entry[0], + "price": int(entry[0]), "transaction_date": entry[1].replace(" 00:00", ""), - "year": entry[1][0:4], + "year": int(entry[1][0:4]), } for entry in element[-1] ] @@ -234,12 +256,12 @@ class ConvertDataToDict(beam.DoFn): "county": list(element[-1])[0][9], "postcode": list(element[-1])[0][2], "property_transactions": property_transactions, - "latest_transaction_year": self.get_latest_transaction( + "latest_transaction_year": int(self.get_latest_transaction( [ transaction["transaction_date"] for transaction in property_transactions ] - ), + )), } # Create a human readable address to go in the dict. @@ -262,6 +284,8 @@ class ConvertDataToDict(beam.DoFn): def run(argv=None, save_main_session=True): """Entrypoint and definition of the pipeline.""" + logging.getLogger().setLevel(logging.INFO) + # Default input/output files input_file = ( pathlib.Path(__file__).parents[1] @@ -281,10 +305,16 @@ def run(argv=None, save_main_session=True): # Arguments parser = argparse.ArgumentParser() parser.add_argument( - "--input", dest="input", default=str(input_file), help="Input file." + "--input", + dest="input", + default=str(input_file), + help="Full path to the input file.", ) parser.add_argument( - "--output", dest="output", default=str(output_file), help="Output file." + "--output", + dest="output", + default=str(output_file), + help="Full path to the output file without extension.", ) known_args, pipeline_args = parser.parse_known_args(argv) @@ -292,7 +322,6 @@ def run(argv=None, save_main_session=True): pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - # Load in the data from a csv file. with beam.Pipeline(options=pipeline_options) as pipeline: # Load the data load = ( @@ -303,7 +332,7 @@ def run(argv=None, save_main_session=True): >> beam.Map(lambda element: [el.strip('"') for el in element]) ) - # Clean the data by dropping unneeded rows. + # Clean the data. clean_drop = ( load | "Drop unneeded columns" @@ -323,19 +352,22 @@ def run(argv=None, save_main_session=True): mapping_table_raw = ( clean_drop | "Create a mapping table with key of id_all_columns and value of cleaned data." - >> beam.ParDo(CreateMappingTable(all_columns=True)) + >> beam.ParDo(CreateMappingTable()) ) + # Condense mapping table into a single dict. mapping_table_condensed = ( mapping_table_raw | "Condense mapping table into single dict" >> beam.combiners.ToDict() ) + # Prepare the data by creating IDs, grouping together and using mapping table + # to reinsert raw data. prepared = ( mapping_table_raw | "Create unique ID ignoring price & date" >> beam.ParDo(CreateUniquePropertyID()) - | "Group IDs using all columns by IDs ignoring price & date" + | "Group by ID" >> beam.GroupByKey() | "Deduplicate to eliminate repeated transactions" >> beam.ParDo(DeduplicateIDs()) @@ -356,7 +388,7 @@ def run(argv=None, save_main_session=True): ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() - | "Format output" >> beam.Map(json.dumps) + | "Format output" >> beam.Map(json.dumps, indent=2) | "Save to .json file" >> beam.io.WriteToText( file_path_prefix=known_args.output, From dffc6aa553815aa3eb5f0e09b5fef8a7ec514837 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 21:17:49 +0100 Subject: [PATCH 20/23] adding debug print --- analyse_properties/debug.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/analyse_properties/debug.py b/analyse_properties/debug.py index ab44457..ae59b79 100644 --- a/analyse_properties/debug.py +++ b/analyse_properties/debug.py @@ -22,3 +22,9 @@ class DebugShowColumnWithValueIn(beam.DoFn): if self.value in column: yield element return None + + +class DebugPrint(beam.DoFn): + def process(self, element): + print(element) + yield element From 3263b3dd8b724e4475857fcbfb52f89a3477eab6 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 21:18:06 +0100 Subject: [PATCH 21/23] moving panda-profiling report into docs --- .../pandas-profiling/report.html | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename exploration/price_paid_data_report.html => docs/pandas-profiling/report.html (100%) diff --git a/exploration/price_paid_data_report.html b/docs/pandas-profiling/report.html similarity index 100% rename from exploration/price_paid_data_report.html rename to docs/pandas-profiling/report.html From 886a37ca94eae3c8ac904d1a6b736c36c2564207 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 21:18:14 +0100 Subject: [PATCH 22/23] updating report.py --- exploration/report.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/exploration/report.py b/exploration/report.py index ed81b47..954609c 100644 --- a/exploration/report.py +++ b/exploration/report.py @@ -1,13 +1,16 @@ -from importlib import resources +import pathlib import pandas as pd from pandas_profiling import ProfileReport def main(): - with resources.path("analyse_properties.data", "pp-complete.csv") as csv_file: + input_file = ( + pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-complete.csv" + ) + with input_file.open() as csv: df_report = pd.read_csv( - csv_file, + csv, names=[ "transaction_id", "price", From 76434fae5b47c79fbcabc13b1c9009f498523274 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 21:18:28 +0100 Subject: [PATCH 23/23] adding entrypoint command --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 70e34d7..0f73a0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,3 +18,6 @@ pandas-profiling = "^3.0.0" [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +"analyse-properties" = "analyse_properties.main:run"