adding latest beam pipeline code

This commit is contained in:
2021-09-25 01:44:37 +01:00
parent aa61ea9c57
commit 47a4ac4bc3

View File

@@ -17,7 +17,7 @@ def csv_reader(csv_file):
return csv.reader(io.TextIOWrapper(csv_file.open())) return csv.reader(io.TextIOWrapper(csv_file.open()))
def slicer(element, *ranges): def slice_by_range(element, *ranges):
return chain(*(islice(element, *r) for r in ranges)) return chain(*(islice(element, *r) for r in ranges))
@@ -27,12 +27,18 @@ class SplitPAON(beam.DoFn):
if len(paon_split) == 0: if len(paon_split) == 0:
return element return element
elif len(paon_split) == 1: elif len(paon_split) > 0:
pass
else:
pass pass
class DropEmptyColumn(bean.doFn):
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
def main(): def main():
csv_data = resources.path( csv_data = resources.path(
"analyse_properties.data", "pp-monthly-update-new-version.csv" "analyse_properties.data", "pp-monthly-update-new-version.csv"
@@ -53,7 +59,7 @@ def main():
clean = ( clean = (
load load
| "Drop unneeded columns" | "Drop unneeded columns"
>> beam.Map(lambda element: list(slicer(element, (1, 4), (7, 14)))) >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case" | "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element]) >> beam.Map(lambda element: [e.upper() for e in element])
| beam.Map(print) | beam.Map(print)