From 3a745794402a6fc963dc8fdc2f672e1608685898 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Mon, 27 Sep 2021 03:18:17 +0100 Subject: [PATCH] adding latest beam pipeline code for dataflow with group optimisation --- analyse_properties/main.py | 172 ++++++++++++++----------------------- 1 file changed, 65 insertions(+), 107 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 702c977..241f3f7 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -9,8 +9,6 @@ import pathlib import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions -# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn - def slice_by_range(element, *ranges): """ @@ -28,22 +26,23 @@ def slice_by_range(element, *ranges): class DropRecordsSingleEmptyColumn(beam.DoFn): + """ + Drop the entire row if a given column is empty. + + Args: + index : The index of the column in the list. + + Returns: + None: If the length of the column is 0, drop the element. + + Yields: + element: If the length of the column isn't 0, keep the element. + """ + def __init__(self, index): self.index = index def process(self, element): - """ - Drop the entire row if a given column is empty. - - Args: - element : The element - - Returns: - None: If the length of the column is 0, drop the element. - - Yields: - element: If the length of the column isn't 0, keep the element. - """ column = element[self.index] if len(column) == 0: return None @@ -51,7 +50,19 @@ class DropRecordsSingleEmptyColumn(beam.DoFn): class DropRecordsTwoEmptyColumn(beam.DoFn): - """If two given items in a list are both empty, drop this entry from the PCollection.""" + """ + Drop the entire row if both of two given columns are empty. + + Args: + index_0 : The index of the first column in the list. + index_1 : The index of the second column in the list. + + Returns: + None: If the length of both columns is 0, drop the element. + + Yields: + element: If the length of both columns isn't 0, keep the element. + """ def __init__(self, index_0, index_1): self.index_0 = index_0 @@ -66,7 +77,14 @@ class DropRecordsTwoEmptyColumn(beam.DoFn): class SplitColumn(beam.DoFn): - """Split an item in a list into two separate items in the PCollection.""" + """ + Split one column into two columns by a character. + + Args: + index : The index of the column in the list. + split_char: The character to split the column by. + + """ def __init__(self, index, split_char): self.index = index @@ -74,7 +92,7 @@ class SplitColumn(beam.DoFn): def process(self, element): # If there is a split based on the split_char, then keep the first result in - # place and append the second. + # place and append the second column at the end. try: part_0, part_1 = element[self.index].split(self.split_char) element[self.index] = part_1.strip() @@ -87,8 +105,16 @@ class SplitColumn(beam.DoFn): class CreateMappingTable(beam.DoFn): """ - Generate a unique ID for the PCollection, either for all the columns or for the - uniquely identifying data only. + Create a mapping table to be used as a side-input. + + This mapping table has a key of an id generated across all columns and a value of + the raw property data. + + The table is used to populate the raw property data after a GroupByKey which is done on ids only + in order to reduce the amount of data processed in the GroupByKey operation. + + Args: + all_columns """ def __init__(self, all_columns=False): @@ -104,14 +130,6 @@ class CreateMappingTable(beam.DoFn): yield new_element -# class CreateMappingTable(beam.DoFn): -# def process(self, element): -# unique_string = ",".join(element) -# hashed_string = hashlib.md5(unique_string.encode()) -# new_element = {hashed_string.hexdigest(): list(element)} -# yield new_element - - class CreateUniquePropertyID(beam.DoFn): def process(self, element): unique_string = ",".join(element[-1][2:]) @@ -127,48 +145,12 @@ class DeduplicateIDs(beam.DoFn): yield new_element -# class InsertDataForID(beam.DoFn): -# def __init__(self, mapping_table): -# self.mapping_table = mapping_table - -# def process(self, element): -# replaced_list = [self.mapping_table[x] for x in element[-1]] -# new_element = (element[0], replaced_list) -# yield new_element - - def insert_data_for_id(element, mapping_table): - replaced_list = [] - for data_id in element[-1]: - replaced_list.append(mapping_table[data_id]) - # replaced_list = [mapping_table[x] for x in element[-1]] + replaced_list = [mapping_table[data_id] for data_id in element[-1]] new_element = (element[0], replaced_list) yield new_element -# old -class DeduplicateByID(beam.DoFn): - """ - If the PCollection has multiple entries after being grouped by ID for all columns, - deduplicate the list to keep only one. - """ - - def process(self, element): - if len(list(element[1])) > 0: - deduplicated_element = (element[0], [list(element[1])[0]]) - yield deduplicated_element - else: - yield element - - -class RemoveUniqueID(beam.DoFn): - """Remove the unique ID from the PCollection, transforming it back into a list.""" - - def process(self, element): - element_no_id = element[-1][0] - yield element_no_id - - class ConvertDataToDict(beam.DoFn): """Convert the processed data into a dict to be exported as a JSON object.""" @@ -232,7 +214,7 @@ class ConvertDataToDict(beam.DoFn): # Create the dict to hold all the information about the property. json_object = { "property_id": element[0], - # "readable_address": None, + "readable_address": None, "flat_appartment": list(element[-1])[0][4], "builing": list(element[-1])[0][10], "number": list(element[-1])[0][3], @@ -252,20 +234,20 @@ class ConvertDataToDict(beam.DoFn): } # Create a human readable address to go in the dict. - # json_object["readable_address"] = self.get_readable_address( - # [ - # json_object["flat_appartment"], - # json_object["builing"], - # f'{json_object["number"]} {json_object["street"]}', - # json_object["postcode"], - # ], - # [ - # json_object["locality"], - # json_object["town"], - # json_object["district"], - # json_object["county"], - # ], - # ) + json_object["readable_address"] = self.get_readable_address( + [ + json_object["flat_appartment"], + json_object["builing"], + f'{json_object["number"]} {json_object["street"]}', + json_object["postcode"], + ], + [ + json_object["locality"], + json_object["town"], + json_object["district"], + json_object["county"], + ], + ) yield json_object @@ -276,13 +258,15 @@ def run(argv=None, save_main_session=True): pathlib.Path(__file__).parents[1] / "data" / "input" - / "pp-monthly-update-new-version.csv" + / "pp-2020.csv" + # / "pp-complete.csv" ) output_file = ( pathlib.Path(__file__).parents[1] / "data" / "output" - / "pp-monthly-update-new-version" + / "pp-2020" + # / "pp-complete" ) # Arguments @@ -322,33 +306,20 @@ def run(argv=None, save_main_session=True): | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) - # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) - # | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) - # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. - # clean_deduplicate = ( - # clean_drop - # | "Generate ID using all columns" - # >> beam.ParDo(GenerateUniqueID(all_columns=True)) - # | "Group by the ID for all columns" >> beam.GroupByKey() - # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) - # ) - # Create a mapping table mapping_table_raw = ( clean_drop | "Create a mapping table with key of id_all_columns and value of cleaned data." >> beam.ParDo(CreateMappingTable(all_columns=True)) - # | beam.Map(print) ) mapping_table_condensed = ( mapping_table_raw | "Condense mapping table into single dict" >> beam.combiners.ToDict() - # | beam.Map(print) ) prepared = ( @@ -363,26 +334,13 @@ def run(argv=None, save_main_session=True): >> beam.FlatMap( insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed) ) - # | beam.Map(print) ) - # # Prepare the data by generating an ID using the uniquely identifying - # # information only and grouping them by this ID. - # prepare = ( - # clean_deduplicate - # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) - # | "Generate unique ID ignoring price & date old" - # >> beam.ParDo(GenerateUniqueID()) - # | "Group by the ID ignoring price & date" >> beam.GroupByKey() - # # | beam.Map(print) - # ) - # Format the data into a dict. formatted = ( prepared | "Convert the prepared data into a dict object" >> beam.ParDo(ConvertDataToDict()) - # | beam.Map(print) ) # Save the data to a .json file.