From a37e7817c3d502f4ffb47782f812d82a3e957ac2 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sat, 25 Sep 2021 16:47:17 +0100 Subject: [PATCH] adding latest beam pipeline code --- analyse_properties/__init__.py | 72 ++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 11 deletions(-) diff --git a/analyse_properties/__init__.py b/analyse_properties/__init__.py index 3bce574..1e91284 100644 --- a/analyse_properties/__init__.py +++ b/analyse_properties/__init__.py @@ -5,6 +5,7 @@ Technical test for Street Group. """ import csv +import hashlib import io from importlib import resources from itertools import chain, islice @@ -12,6 +13,8 @@ from itertools import chain, islice import apache_beam as beam from apache_beam.io import fileio +from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn + def csv_reader(csv_file): return csv.reader(io.TextIOWrapper(csv_file.open())) @@ -21,27 +24,58 @@ def slice_by_range(element, *ranges): return chain(*(islice(element, *r) for r in ranges)) -class SplitPAON(beam.DoFn): - def process(self, element): - paon_split = element[3].split(",") - - if len(paon_split) == 0: - return element - elif len(paon_split) > 0: - pass - - -class DropEmptyColumn(bean.doFn): +class DropRecordsSingleEmptyColumn(beam.DoFn): def __init__(self, index): self.index = index def process(self, element): column = element[self.index] + if len(column) == 0: + return None + yield element + + +class DropRecordsTwoEmptyColumn(beam.DoFn): + def __init__(self, index_0, index_1): + self.index_0 = index_0 + self.index_1 = index_1 + + def process(self, element): + column_0 = element[self.index_0] + column_1 = element[self.index_1] + if len(column_0) == 0 and len(column_1) == 0: + return None + yield element + + +class SplitColumn(beam.DoFn): + def __init__(self, index, split_char): + self.index = index + self.split_char = split_char + + def process(self, element): + try: + part_0, part_1 = element[self.index].split(self.split_char) + element[self.index] = part_1.strip() + element.append(part_0.strip()) + yield element + except ValueError: + element.append("") + yield element + + +class GenerateUniqueID(beam.DoFn): + def process(self, element): + unique_string = ",".join(element[2:]) + hashed_string = hashlib.md5(unique_string.encode()) + element.append(hashed_string.hexdigest()) + yield element def main(): csv_data = resources.path( "analyse_properties.data", "pp-monthly-update-new-version.csv" + # "analyse_properties.data", "pp-complete.csv" ) with beam.Pipeline() as pipeline: @@ -62,6 +96,22 @@ def main(): >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) | "Convert to Upper Case" >> beam.Map(lambda element: [e.upper() for e in element]) + | "Strip leading/trailing whitespace" + >> beam.Map(lambda element: [e.strip() for e in element]) + | "Drop Empty Postcodes" + >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) + | "Drop empty PAON if missing SAON" + >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) + # | beam.ParDo(DebugShowColumnWithValueIn(3, ",")) + | beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ")) + | beam.ParDo(SplitColumn(3, ",")) + # | beam.Map(print) + ) + + # Prepare the data + prepare = ( + clean + | beam.ParDo(GenerateUniqueID()) | beam.Map(print) )