adding latest beam pipeline code

This commit is contained in:
2021-09-25 18:25:08 +01:00
parent 44f346deff
commit 24420c8935

View File

@@ -65,16 +65,60 @@ class SplitColumn(beam.DoFn):
class GenerateUniqueID(beam.DoFn): class GenerateUniqueID(beam.DoFn):
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element): def process(self, element):
unique_string = ",".join(element[2:]) unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode()) hashed_string = hashlib.md5(unique_string.encode())
element.append(hashed_string.hexdigest()) element.append(hashed_string.hexdigest())
yield element yield element
class DeduplicateByGroup(beam.DoFn):
def process(self, element):
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
def process(self, element):
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
class ConvertDataToDict(beam.DoFn):
@property
def dict_keys(self):
return [
"price",
"transaction_date",
"postcode",
"number",
"flat_appartment",
"street",
"locality",
"town_city",
"district",
"county",
"building",
"property_id",
]
def process(self, element):
pass
def main(): def main():
csv_data = resources.path( csv_data = resources.path(
"analyse_properties.data", "pp-monthly-update-new-version.csv" "analyse_properties.data",
"pp-monthly-update-new-version.csv"
# "analyse_properties.data", "pp-complete.csv" # "analyse_properties.data", "pp-complete.csv"
) )
@@ -90,7 +134,7 @@ def main():
) )
# Clean the data # Clean the data
clean = ( clean_drop = (
load load
| "Drop unneeded columns" | "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
@@ -98,23 +142,44 @@ def main():
>> beam.Map(lambda element: [e.upper() for e in element]) >> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace" | "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element]) >> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
>> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON" | "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(3, ",")) # | beam.ParDo(DebugShowColumnWithValueIn(3, ","))
| beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ")) # | beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ"))
| beam.ParDo(SplitColumn(3, ",")) # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns"
>> beam.ParDo(DeduplicateByGroup())
# | beam.Map(print) # | beam.Map(print)
) )
# Prepare the data # Prepare the data
prepare = ( prepare = (
clean clean_deduplicate
| beam.ParDo(GenerateUniqueID()) | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1])
| beam.Map(print) | beam.Map(print)
) )
# Format the data
formatted = (
prepare
# | "Convert list to dict object" >>
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()