moving inputs/outputs to use pathlib

This commit is contained in:
2021-09-26 06:03:55 +01:00
parent 7962f40e32
commit 7f9b7e4bfd
5 changed files with 17 additions and 24 deletions

View File

@@ -1,22 +1,13 @@
import csv
from datetime import datetime from datetime import datetime
import hashlib import hashlib
import io
from importlib import resources
import itertools import itertools
import pathlib import pathlib
import apache_beam as beam import apache_beam as beam
from apache_beam.io import fileio
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
"""Read in a csv file."""
return csv.reader(io.TextIOWrapper(csv_file.open()))
def slice_by_range(element, *ranges): def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges.""" """Slice a list with multiple ranges."""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
@@ -214,22 +205,22 @@ class ConvertDataToDict(beam.DoFn):
def main(): def main():
# Load in the data from a csv file. # Load in the data from a csv file.
csv_data = resources.path( input_file = (
# "analyse_properties.data.input", pathlib.Path(__file__).parents[1]
# "pp-monthly-update-new-version.csv" / "data"
"analyse_properties.data.input", "pp-complete.csv" / "input"
/ "pp-monthly-update-new-version.csv"
) )
with beam.Pipeline() as pipeline: with beam.Pipeline() as pipeline:
# Load the data # Load the data
with csv_data as csv_data_file: load = (
# https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170 pipeline
load = ( | "Read input data" >> beam.io.ReadFromText(str(input_file))
pipeline | "Split by ','" >> beam.Map(lambda element: element.split(","))
| fileio.MatchFiles(str(csv_data_file)) | "Remove leading and trailing quotes"
| fileio.ReadMatches() >> beam.Map(lambda element: [el.strip('"') for el in element])
| beam.FlatMap(csv_reader) )
)
# Clean the data by dropping unneeded rows. # Clean the data by dropping unneeded rows.
clean_drop = ( clean_drop = (
@@ -276,7 +267,9 @@ def main():
) )
# Save the data to a .json file. # Save the data to a .json file.
output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete" output_file = (
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
)
output = ( output = (
formatted formatted
| "Combine into one PCollection" >> beam.combiners.ToList() | "Combine into one PCollection" >> beam.combiners.ToList()

View File

@@ -1,5 +1,5 @@
# Full data set # Full data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input # wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P data/input
# Monthly update data set # Monthly update data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P data/input