diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 241f3f7..d2bce05 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -83,7 +83,6 @@ class SplitColumn(beam.DoFn): Args: index : The index of the column in the list. split_char: The character to split the column by. - """ def __init__(self, index, split_char): @@ -107,30 +106,40 @@ class CreateMappingTable(beam.DoFn): """ Create a mapping table to be used as a side-input. - This mapping table has a key of an id generated across all columns and a value of + This mapping table has a key of an ID generated across all columns and a value of the raw property data. - The table is used to populate the raw property data after a GroupByKey which is done on ids only - in order to reduce the amount of data processed in the GroupByKey operation. + The table is used to populate the raw property data after a GroupByKey using + only the IDs in order to reduce the amount of data processed in the GroupByKey operation. Args: - all_columns + all_columns(bool): If True will use all fields to calculate the ID. If false + will exclude the first two. """ def __init__(self, all_columns=False): self.all_columns = all_columns def process(self, element): + # Join the row into a string. unique_string = ( ",".join(element[2:]) if not self.all_columns else ",".join(element) ) + # Hash the string. hashed_string = hashlib.md5(unique_string.encode()) - # add the hash as a key to the data. + # Format the resulting PCollection with the key of id and value of raw data. new_element = (hashed_string.hexdigest(), list(element)) yield new_element class CreateUniquePropertyID(beam.DoFn): + """ + Create a unique property ID which does not include the price and date of sale. + + Uses each row of the mapping table to create a PCollection with a key of the + unique property ID and a value of the ID generated across all columns. + """ + def process(self, element): unique_string = ",".join(element[-1][2:]) hashed_string = hashlib.md5(unique_string.encode())