diff --git a/analyse_properties/__init__.py b/analyse_properties/__init__.py index 1e91284..88c30c7 100644 --- a/analyse_properties/__init__.py +++ b/analyse_properties/__init__.py @@ -65,16 +65,60 @@ class SplitColumn(beam.DoFn): class GenerateUniqueID(beam.DoFn): + def __init__(self, all_columns=False): + self.all_columns = all_columns + def process(self, element): - unique_string = ",".join(element[2:]) + unique_string = ( + ",".join(element[2:]) if not self.all_columns else ",".join(element) + ) hashed_string = hashlib.md5(unique_string.encode()) element.append(hashed_string.hexdigest()) yield element +class DeduplicateByGroup(beam.DoFn): + def process(self, element): + if len(element[1]) > 0: + deduplicated_element = (element[0], [element[1][0]]) + yield deduplicated_element + else: + yield element + + +class RemoveUniqueID(beam.DoFn): + def process(self, element): + element_no_id = element[-1][0] + element_no_id.pop(-1) + yield element_no_id + + +class ConvertDataToDict(beam.DoFn): + @property + def dict_keys(self): + return [ + "price", + "transaction_date", + "postcode", + "number", + "flat_appartment", + "street", + "locality", + "town_city", + "district", + "county", + "building", + "property_id", + ] + + def process(self, element): + pass + + def main(): csv_data = resources.path( - "analyse_properties.data", "pp-monthly-update-new-version.csv" + "analyse_properties.data", + "pp-monthly-update-new-version.csv" # "analyse_properties.data", "pp-complete.csv" ) @@ -90,7 +134,7 @@ def main(): ) # Clean the data - clean = ( + clean_drop = ( load | "Drop unneeded columns" >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) @@ -98,23 +142,44 @@ def main(): >> beam.Map(lambda element: [e.upper() for e in element]) | "Strip leading/trailing whitespace" >> beam.Map(lambda element: [e.strip() for e in element]) - | "Drop Empty Postcodes" - >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) + | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) # | beam.ParDo(DebugShowColumnWithValueIn(3, ",")) - | beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ")) - | beam.ParDo(SplitColumn(3, ",")) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ")) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) + | "Split PAON into two columns if separated by comma" + >> beam.ParDo(SplitColumn(3, ",")) + ) + + clean_deduplicate = ( + clean_drop + | "Generate unique ID for all columns" + >> beam.ParDo(GenerateUniqueID(all_columns=True)) + | "Group by the ID for all columns" + >> beam.GroupBy(lambda element: element[-1]) + | "Deduplicate by the ID for all columns" + >> beam.ParDo(DeduplicateByGroup()) # | beam.Map(print) ) # Prepare the data prepare = ( - clean - | beam.ParDo(GenerateUniqueID()) + clean_deduplicate + | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) + | "Generate unique ID ignoring price & date" + >> beam.ParDo(GenerateUniqueID()) + | "Group by the ID ignoring price & date" + >> beam.GroupBy(lambda element: element[-1]) | beam.Map(print) ) + # Format the data + formatted = ( + prepare + # | "Convert list to dict object" >> + ) + if __name__ == "__main__": main()