From 8e8469579e7b9166505ca4aec66b423f9ad4be8f Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 20:29:11 +0100 Subject: [PATCH] updating beam pipeline to use GroupByKey --- analyse_properties/main.py | 73 +++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 6c86ddb..6a190de 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -2,6 +2,7 @@ import argparse from datetime import datetime import hashlib import itertools +import json import logging import pathlib @@ -12,17 +13,37 @@ from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions def slice_by_range(element, *ranges): - """Slice a list with multiple ranges.""" + """ + Slice a list with multiple ranges. + + Args: + element : The element. + *ranges (tuple): Tuples containing a start,end index to slice the element. + E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else. + + Returns: + list: The list sliced by the ranges + """ return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) class DropRecordsSingleEmptyColumn(beam.DoFn): - """If a given item in a list is empty, drop this entry from the PCollection.""" - def __init__(self, index): self.index = index def process(self, element): + """ + Drop the entire row if a given column is empty. + + Args: + element : The element + + Returns: + None: If the length of the column is 0, drop the element. + + Yields: + element: If the length of the column isn't 0, keep the element. + """ column = element[self.index] if len(column) == 0: return None @@ -78,9 +99,9 @@ class GenerateUniqueID(beam.DoFn): ",".join(element[2:]) if not self.all_columns else ",".join(element) ) hashed_string = hashlib.md5(unique_string.encode()) - # append the hash to the end - element.append(hashed_string.hexdigest()) - yield element + # add the hash as a key to the data. + new_element = (hashed_string.hexdigest(), list(element)) + yield new_element class DeduplicateByID(beam.DoFn): @@ -91,7 +112,7 @@ class DeduplicateByID(beam.DoFn): def process(self, element): if len(list(element[1])) > 0: - deduplicated_element = (list(element[0]), [list(element[1])[0]]) + deduplicated_element = (element[0], [list(element[1])[0]]) yield deduplicated_element else: yield element @@ -102,7 +123,6 @@ class RemoveUniqueID(beam.DoFn): def process(self, element): element_no_id = element[-1][0] - element_no_id.pop(-1) yield element_no_id @@ -169,7 +189,7 @@ class ConvertDataToDict(beam.DoFn): # Create the dict to hold all the information about the property. json_object = { "property_id": element[0], - "readable_address": None, + # "readable_address": None, "flat_appartment": list(element[-1])[0][4], "builing": list(element[-1])[0][10], "number": list(element[-1])[0][3], @@ -189,20 +209,20 @@ class ConvertDataToDict(beam.DoFn): } # Create a human readable address to go in the dict. - json_object["readable_address"] = self.get_readable_address( - [ - json_object["flat_appartment"], - json_object["builing"], - f'{json_object["number"]} {json_object["street"]}', - json_object["postcode"], - ], - [ - json_object["locality"], - json_object["town"], - json_object["district"], - json_object["county"], - ], - ) + # json_object["readable_address"] = self.get_readable_address( + # [ + # json_object["flat_appartment"], + # json_object["builing"], + # f'{json_object["number"]} {json_object["street"]}', + # json_object["postcode"], + # ], + # [ + # json_object["locality"], + # json_object["town"], + # json_object["district"], + # json_object["county"], + # ], + # ) yield json_object @@ -259,6 +279,7 @@ def run(argv=None, save_main_session=True): | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop empty PAON if missing SAON" >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) + # | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE")) | "Split PAON into two columns if separated by comma" >> beam.ParDo(SplitColumn(3, ",")) ) @@ -269,7 +290,7 @@ def run(argv=None, save_main_session=True): | "Generate unique ID for all columns" >> beam.ParDo(GenerateUniqueID(all_columns=True)) | "Group by the ID for all columns" - >> beam.GroupBy(lambda element: element[-1]) + >> beam.GroupByKey() | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) ) @@ -281,7 +302,8 @@ def run(argv=None, save_main_session=True): | "Generate unique ID ignoring price & date" >> beam.ParDo(GenerateUniqueID()) | "Group by the ID ignoring price & date" - >> beam.GroupBy(lambda element: element[-1]) + >> beam.GroupByKey() + # | beam.Map(print) ) # Format the data into a dict. @@ -295,6 +317,7 @@ def run(argv=None, save_main_session=True): ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() + | "Format output" >> beam.Map(json.dumps) | "Save to .json file" >> beam.io.WriteToText( file_path_prefix=known_args.output,