mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 11:55:45 +00:00
adding latest beam pipeline code for dataflow
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
import itertools
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
import apache_beam as beam
|
||||
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||
|
||||
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
||||
|
||||
@@ -87,7 +90,7 @@ class DeduplicateByID(beam.DoFn):
|
||||
"""
|
||||
|
||||
def process(self, element):
|
||||
if len(element[1]) > 0:
|
||||
if len(list(element[1])) > 0:
|
||||
deduplicated_element = (element[0], [element[1][0]])
|
||||
yield deduplicated_element
|
||||
else:
|
||||
@@ -203,20 +206,42 @@ class ConvertDataToDict(beam.DoFn):
|
||||
yield json_object
|
||||
|
||||
|
||||
def main():
|
||||
# Load in the data from a csv file.
|
||||
def run(argv=None, save_main_session=True):
|
||||
"""Entrypoint and definition of the pipeline."""
|
||||
# Default input/output files
|
||||
input_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "input"
|
||||
/ "pp-monthly-update-new-version.csv"
|
||||
)
|
||||
output_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "output"
|
||||
/ "pp-monthly-update-new-version"
|
||||
)
|
||||
|
||||
with beam.Pipeline() as pipeline:
|
||||
# Arguments
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--input", dest="input", default=str(input_file), help="Input file."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output", dest="output", default=str(output_file), help="Output file."
|
||||
)
|
||||
known_args, pipeline_args = parser.parse_known_args(argv)
|
||||
|
||||
# Pipeline options. save_main_session needed for DataFlow for global imports.
|
||||
pipeline_options = PipelineOptions(pipeline_args)
|
||||
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
|
||||
|
||||
# Load in the data from a csv file.
|
||||
with beam.Pipeline(options=pipeline_options) as pipeline:
|
||||
# Load the data
|
||||
load = (
|
||||
pipeline
|
||||
| "Read input data" >> beam.io.ReadFromText(str(input_file))
|
||||
| "Read input data" >> beam.io.ReadFromText(known_args.input)
|
||||
| "Split by ','" >> beam.Map(lambda element: element.split(","))
|
||||
| "Remove leading and trailing quotes"
|
||||
>> beam.Map(lambda element: [el.strip('"') for el in element])
|
||||
@@ -248,8 +273,8 @@ def main():
|
||||
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||
)
|
||||
|
||||
# Prepare the data by generating an ID using the uniquely identifying information only
|
||||
# and grouping them by this ID.
|
||||
# Prepare the data by generating an ID using the uniquely identifying
|
||||
# information only and grouping them by this ID.
|
||||
prepare = (
|
||||
clean_deduplicate
|
||||
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
||||
@@ -267,19 +292,17 @@ def main():
|
||||
)
|
||||
|
||||
# Save the data to a .json file.
|
||||
output_file = (
|
||||
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
|
||||
)
|
||||
output = (
|
||||
(
|
||||
formatted
|
||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
||||
| "Save to .json file"
|
||||
>> beam.io.WriteToText(
|
||||
file_path_prefix=str(output_file),
|
||||
file_path_prefix=known_args.output,
|
||||
file_name_suffix=".json",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
run()
|
||||
|
||||
Reference in New Issue
Block a user