From 9f53c669752f9687114296dd37570cbccde0cfd0 Mon Sep 17 00:00:00 2001 From: Daniel Tomlinson Date: Sun, 26 Sep 2021 15:57:00 +0100 Subject: [PATCH] adding latest beam pipeline code for dataflow --- analyse_properties/main.py | 49 ++++++++++++++++++++++++++++---------- notes/dataflow.md | 17 +++++++++++++ 2 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 notes/dataflow.md diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 0053233..c4f9df8 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -1,9 +1,12 @@ +import argparse from datetime import datetime import hashlib import itertools +import logging import pathlib import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn @@ -87,7 +90,7 @@ class DeduplicateByID(beam.DoFn): """ def process(self, element): - if len(element[1]) > 0: + if len(list(element[1])) > 0: deduplicated_element = (element[0], [element[1][0]]) yield deduplicated_element else: @@ -203,20 +206,42 @@ class ConvertDataToDict(beam.DoFn): yield json_object -def main(): - # Load in the data from a csv file. +def run(argv=None, save_main_session=True): + """Entrypoint and definition of the pipeline.""" + # Default input/output files input_file = ( pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-monthly-update-new-version.csv" ) + output_file = ( + pathlib.Path(__file__).parents[1] + / "data" + / "output" + / "pp-monthly-update-new-version" + ) - with beam.Pipeline() as pipeline: + # Arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--input", dest="input", default=str(input_file), help="Input file." + ) + parser.add_argument( + "--output", dest="output", default=str(output_file), help="Output file." + ) + known_args, pipeline_args = parser.parse_known_args(argv) + + # Pipeline options. save_main_session needed for DataFlow for global imports. + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # Load in the data from a csv file. + with beam.Pipeline(options=pipeline_options) as pipeline: # Load the data load = ( pipeline - | "Read input data" >> beam.io.ReadFromText(str(input_file)) + | "Read input data" >> beam.io.ReadFromText(known_args.input) | "Split by ','" >> beam.Map(lambda element: element.split(",")) | "Remove leading and trailing quotes" >> beam.Map(lambda element: [el.strip('"') for el in element]) @@ -248,8 +273,8 @@ def main(): | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) ) - # Prepare the data by generating an ID using the uniquely identifying information only - # and grouping them by this ID. + # Prepare the data by generating an ID using the uniquely identifying + # information only and grouping them by this ID. prepare = ( clean_deduplicate | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) @@ -267,19 +292,17 @@ def main(): ) # Save the data to a .json file. - output_file = ( - pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete" - ) - output = ( + ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() | "Save to .json file" >> beam.io.WriteToText( - file_path_prefix=str(output_file), + file_path_prefix=known_args.output, file_name_suffix=".json", ) ) if __name__ == "__main__": - main() + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/notes/dataflow.md b/notes/dataflow.md new file mode 100644 index 0000000..54f16df --- /dev/null +++ b/notes/dataflow.md @@ -0,0 +1,17 @@ +# DataFlow + + + +Export env variable: + +`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"` + +Run the pipeline: + +python -m analyse_properties.main \ + --region europe-west2 \ + --input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \ + --output gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version \ + --runner DataflowRunner \ + --project street-group \ + --temp_location gs://street-group-technical-test-dmot/tmp