adding latesty beam pipeline code

This commit is contained in:
2021-09-25 16:47:17 +01:00
parent 214ce77d8f
commit 539e4c7786

View File

@@ -5,6 +5,7 @@ Technical test for Street Group.
""" """
import csv import csv
import hashlib
import io import io
from importlib import resources from importlib import resources
from itertools import chain, islice from itertools import chain, islice
@@ -12,6 +13,8 @@ from itertools import chain, islice
import apache_beam as beam import apache_beam as beam
from apache_beam.io import fileio from apache_beam.io import fileio
from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file): def csv_reader(csv_file):
return csv.reader(io.TextIOWrapper(csv_file.open())) return csv.reader(io.TextIOWrapper(csv_file.open()))
@@ -21,27 +24,58 @@ def slice_by_range(element, *ranges):
return chain(*(islice(element, *r) for r in ranges)) return chain(*(islice(element, *r) for r in ranges))
class SplitPAON(beam.DoFn): class DropRecordsSingleEmptyColumn(beam.DoFn):
def process(self, element):
paon_split = element[3].split(",")
if len(paon_split) == 0:
return element
elif len(paon_split) > 0:
pass
class DropEmptyColumn(bean.doFn):
def __init__(self, index): def __init__(self, index):
self.index = index self.index = index
def process(self, element): def process(self, element):
column = element[self.index] column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
def process(self, element):
unique_string = ",".join(element[2:])
hashed_string = hashlib.md5(unique_string.encode())
element.append(hashed_string.hexdigest())
yield element
def main(): def main():
csv_data = resources.path( csv_data = resources.path(
"analyse_properties.data", "pp-monthly-update-new-version.csv" "analyse_properties.data", "pp-monthly-update-new-version.csv"
# "analyse_properties.data", "pp-complete.csv"
) )
with beam.Pipeline() as pipeline: with beam.Pipeline() as pipeline:
@@ -62,6 +96,22 @@ def main():
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case" | "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element]) >> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes"
>> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(3, ","))
| beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ"))
| beam.ParDo(SplitColumn(3, ","))
# | beam.Map(print)
)
# Prepare the data
prepare = (
clean
| beam.ParDo(GenerateUniqueID())
| beam.Map(print) | beam.Map(print)
) )