diff --git a/analyse_properties/__init__.py b/analyse_properties/__init__.py index dab7851..3bce574 100644 --- a/analyse_properties/__init__.py +++ b/analyse_properties/__init__.py @@ -17,7 +17,7 @@ def csv_reader(csv_file): return csv.reader(io.TextIOWrapper(csv_file.open())) -def slicer(element, *ranges): +def slice_by_range(element, *ranges): return chain(*(islice(element, *r) for r in ranges)) @@ -27,12 +27,18 @@ class SplitPAON(beam.DoFn): if len(paon_split) == 0: return element - elif len(paon_split) == 1: - pass - else: + elif len(paon_split) > 0: pass +class DropEmptyColumn(bean.doFn): + def __init__(self, index): + self.index = index + + def process(self, element): + column = element[self.index] + + def main(): csv_data = resources.path( "analyse_properties.data", "pp-monthly-update-new-version.csv" @@ -53,7 +59,7 @@ def main(): clean = ( load | "Drop unneeded columns" - >> beam.Map(lambda element: list(slicer(element, (1, 4), (7, 14)))) + >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) | "Convert to Upper Case" >> beam.Map(lambda element: [e.upper() for e in element]) | beam.Map(print)