diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 6a190de..702c977 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -85,7 +85,7 @@ class SplitColumn(beam.DoFn): yield element -class GenerateUniqueID(beam.DoFn): +class CreateMappingTable(beam.DoFn): """ Generate a unique ID for the PCollection, either for all the columns or for the uniquely identifying data only. @@ -104,6 +104,49 @@ class GenerateUniqueID(beam.DoFn): yield new_element +# class CreateMappingTable(beam.DoFn): +# def process(self, element): +# unique_string = ",".join(element) +# hashed_string = hashlib.md5(unique_string.encode()) +# new_element = {hashed_string.hexdigest(): list(element)} +# yield new_element + + +class CreateUniquePropertyID(beam.DoFn): + def process(self, element): + unique_string = ",".join(element[-1][2:]) + hashed_string = hashlib.md5(unique_string.encode()) + new_element = (hashed_string.hexdigest(), element[0]) + yield new_element + + +class DeduplicateIDs(beam.DoFn): + def process(self, element): + deduplicated_list = list(set(element[-1])) + new_element = (element[0], deduplicated_list) + yield new_element + + +# class InsertDataForID(beam.DoFn): +# def __init__(self, mapping_table): +# self.mapping_table = mapping_table + +# def process(self, element): +# replaced_list = [self.mapping_table[x] for x in element[-1]] +# new_element = (element[0], replaced_list) +# yield new_element + + +def insert_data_for_id(element, mapping_table): + replaced_list = [] + for data_id in element[-1]: + replaced_list.append(mapping_table[data_id]) + # replaced_list = [mapping_table[x] for x in element[-1]] + new_element = (element[0], replaced_list) + yield new_element + + +# old class DeduplicateByID(beam.DoFn): """ If the PCollection has multiple entries after being grouped by ID for all columns, @@ -280,37 +323,66 @@ def run(argv=None, save_main_session=True): | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) - # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. - clean_deduplicate = ( - clean_drop - | "Generate unique ID for all columns" - >> beam.ParDo(GenerateUniqueID(all_columns=True)) - | "Group by the ID for all columns" - >> beam.GroupByKey() - | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) - ) + # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. + # clean_deduplicate = ( + # clean_drop + # | "Generate ID using all columns" + # >> beam.ParDo(GenerateUniqueID(all_columns=True)) + # | "Group by the ID for all columns" >> beam.GroupByKey() + # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) + # ) - # Prepare the data by generating an ID using the uniquely identifying - # information only and grouping them by this ID. - prepare = ( - clean_deduplicate - | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) - | "Generate unique ID ignoring price & date" - >> beam.ParDo(GenerateUniqueID()) - | "Group by the ID ignoring price & date" - >> beam.GroupByKey() + # Create a mapping table + mapping_table_raw = ( + clean_drop + | "Create a mapping table with key of id_all_columns and value of cleaned data." + >> beam.ParDo(CreateMappingTable(all_columns=True)) # | beam.Map(print) ) + mapping_table_condensed = ( + mapping_table_raw + | "Condense mapping table into single dict" >> beam.combiners.ToDict() + # | beam.Map(print) + ) + + prepared = ( + mapping_table_raw + | "Create unique ID ignoring price & date" + >> beam.ParDo(CreateUniquePropertyID()) + | "Group IDs using all columns by IDs ignoring price & date" + >> beam.GroupByKey() + | "Deduplicate to eliminate repeated transactions" + >> beam.ParDo(DeduplicateIDs()) + | "Insert the raw data using the mapping table" + >> beam.FlatMap( + insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed) + ) + # | beam.Map(print) + ) + + # # Prepare the data by generating an ID using the uniquely identifying + # # information only and grouping them by this ID. + # prepare = ( + # clean_deduplicate + # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) + # | "Generate unique ID ignoring price & date old" + # >> beam.ParDo(GenerateUniqueID()) + # | "Group by the ID ignoring price & date" >> beam.GroupByKey() + # # | beam.Map(print) + # ) + # Format the data into a dict. formatted = ( - prepare + prepared | "Convert the prepared data into a dict object" >> beam.ParDo(ConvertDataToDict()) + # | beam.Map(print) ) # Save the data to a .json file.