adding latest beam pipeline code for dataflow with group optimisation

This commit is contained in:
2021-09-26 23:28:58 +01:00
parent eaa36877f6
commit a8fc06c764

View File

@@ -85,7 +85,7 @@ class SplitColumn(beam.DoFn):
yield element
class GenerateUniqueID(beam.DoFn):
class CreateMappingTable(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
@@ -104,6 +104,49 @@ class GenerateUniqueID(beam.DoFn):
yield new_element
# class CreateMappingTable(beam.DoFn):
# def process(self, element):
# unique_string = ",".join(element)
# hashed_string = hashlib.md5(unique_string.encode())
# new_element = {hashed_string.hexdigest(): list(element)}
# yield new_element
class CreateUniquePropertyID(beam.DoFn):
def process(self, element):
unique_string = ",".join(element[-1][2:])
hashed_string = hashlib.md5(unique_string.encode())
new_element = (hashed_string.hexdigest(), element[0])
yield new_element
class DeduplicateIDs(beam.DoFn):
def process(self, element):
deduplicated_list = list(set(element[-1]))
new_element = (element[0], deduplicated_list)
yield new_element
# class InsertDataForID(beam.DoFn):
# def __init__(self, mapping_table):
# self.mapping_table = mapping_table
# def process(self, element):
# replaced_list = [self.mapping_table[x] for x in element[-1]]
# new_element = (element[0], replaced_list)
# yield new_element
def insert_data_for_id(element, mapping_table):
replaced_list = []
for data_id in element[-1]:
replaced_list.append(mapping_table[data_id])
# replaced_list = [mapping_table[x] for x in element[-1]]
new_element = (element[0], replaced_list)
yield new_element
# old
class DeduplicateByID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
@@ -280,37 +323,66 @@ def run(argv=None, save_main_session=True):
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA"))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupByKey()
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
)
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
# clean_deduplicate = (
# clean_drop
# | "Generate ID using all columns"
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
# | "Group by the ID for all columns" >> beam.GroupByKey()
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
# )
# Prepare the data by generating an ID using the uniquely identifying
# information only and grouping them by this ID.
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupByKey()
# Create a mapping table
mapping_table_raw = (
clean_drop
| "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable(all_columns=True))
# | beam.Map(print)
)
mapping_table_condensed = (
mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
# | beam.Map(print)
)
prepared = (
mapping_table_raw
| "Create unique ID ignoring price & date"
>> beam.ParDo(CreateUniquePropertyID())
| "Group IDs using all columns by IDs ignoring price & date"
>> beam.GroupByKey()
| "Deduplicate to eliminate repeated transactions"
>> beam.ParDo(DeduplicateIDs())
| "Insert the raw data using the mapping table"
>> beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
# | beam.Map(print)
)
# # Prepare the data by generating an ID using the uniquely identifying
# # information only and grouping them by this ID.
# prepare = (
# clean_deduplicate
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
# | "Generate unique ID ignoring price & date old"
# >> beam.ParDo(GenerateUniqueID())
# | "Group by the ID ignoring price & date" >> beam.GroupByKey()
# # | beam.Map(print)
# )
# Format the data into a dict.
formatted = (
prepare
prepared
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
# | beam.Map(print)
)
# Save the data to a .json file.