adding latest beam pipeline code

This commit is contained in:
2021-09-27 03:39:30 +01:00
parent f60beb4565
commit 391861d80c

View File

@@ -83,7 +83,6 @@ class SplitColumn(beam.DoFn):
Args: Args:
index : The index of the column in the list. index : The index of the column in the list.
split_char: The character to split the column by. split_char: The character to split the column by.
""" """
def __init__(self, index, split_char): def __init__(self, index, split_char):
@@ -107,30 +106,40 @@ class CreateMappingTable(beam.DoFn):
""" """
Create a mapping table to be used as a side-input. Create a mapping table to be used as a side-input.
This mapping table has a key of an id generated across all columns and a value of This mapping table has a key of an ID generated across all columns and a value of
the raw property data. the raw property data.
The table is used to populate the raw property data after a GroupByKey which is done on ids only The table is used to populate the raw property data after a GroupByKey using
in order to reduce the amount of data processed in the GroupByKey operation. only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
Args: Args:
all_columns all_columns(bool): If True will use all fields to calculate the ID. If false
will exclude the first two.
""" """
def __init__(self, all_columns=False): def __init__(self, all_columns=False):
self.all_columns = all_columns self.all_columns = all_columns
def process(self, element): def process(self, element):
# Join the row into a string.
unique_string = ( unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element) ",".join(element[2:]) if not self.all_columns else ",".join(element)
) )
# Hash the string.
hashed_string = hashlib.md5(unique_string.encode()) hashed_string = hashlib.md5(unique_string.encode())
# add the hash as a key to the data. # Format the resulting PCollection with the key of id and value of raw data.
new_element = (hashed_string.hexdigest(), list(element)) new_element = (hashed_string.hexdigest(), list(element))
yield new_element yield new_element
class CreateUniquePropertyID(beam.DoFn): class CreateUniquePropertyID(beam.DoFn):
"""
Create a unique property ID which does not include the price and date of sale.
Uses each row of the mapping table to create a PCollection with a key of the
unique property ID and a value of the ID generated across all columns.
"""
def process(self, element): def process(self, element):
unique_string = ",".join(element[-1][2:]) unique_string = ",".join(element[-1][2:])
hashed_string = hashlib.md5(unique_string.encode()) hashed_string = hashlib.md5(unique_string.encode())