diff --git a/analyse_properties/data/__init__.py b/analyse_properties/data/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/analyse_properties/data/input/__init__.py b/analyse_properties/data/input/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/analyse_properties/data/output/__init__.py b/analyse_properties/data/output/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 95b8299..cf43f89 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -1,22 +1,13 @@ -import csv from datetime import datetime import hashlib -import io -from importlib import resources import itertools import pathlib import apache_beam as beam -from apache_beam.io import fileio # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn -def csv_reader(csv_file): - """Read in a csv file.""" - return csv.reader(io.TextIOWrapper(csv_file.open())) - - def slice_by_range(element, *ranges): """Slice a list with multiple ranges.""" return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) @@ -214,22 +205,22 @@ class ConvertDataToDict(beam.DoFn): def main(): # Load in the data from a csv file. - csv_data = resources.path( - # "analyse_properties.data.input", - # "pp-monthly-update-new-version.csv" - "analyse_properties.data.input", "pp-complete.csv" + input_file = ( + pathlib.Path(__file__).parents[1] + / "data" + / "input" + / "pp-monthly-update-new-version.csv" ) with beam.Pipeline() as pipeline: # Load the data - with csv_data as csv_data_file: - # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170 - load = ( - pipeline - | fileio.MatchFiles(str(csv_data_file)) - | fileio.ReadMatches() - | beam.FlatMap(csv_reader) - ) + load = ( + pipeline + | "Read input data" >> beam.io.ReadFromText(str(input_file)) + | "Split by ','" >> beam.Map(lambda element: element.split(",")) + | "Remove leading and trailing quotes" + >> beam.Map(lambda element: [el.strip('"') for el in element]) + ) # Clean the data by dropping unneeded rows. clean_drop = ( @@ -276,7 +267,9 @@ def main(): ) # Save the data to a .json file. - output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete" + output_file = ( + pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete" + ) output = ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() diff --git a/download_data.sh b/download_data.sh index d69c6e4..dc0fa7a 100755 --- a/download_data.sh +++ b/download_data.sh @@ -1,5 +1,5 @@ # Full data set -wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input +# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P data/input # Monthly update data set -# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input +wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P data/input