diff --git a/analyse_properties/main.py b/analyse_properties/main.py index 95b8299..d20a481 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -8,6 +8,7 @@ import pathlib import apache_beam as beam from apache_beam.io import fileio +from apache_beam.options.pipeline_options import PipelineOptions # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn @@ -175,7 +176,7 @@ class ConvertDataToDict(beam.DoFn): # Create the dict to hold all the information about the property. json_object = { "property_id": element[0], - "readable_address": None, + # "readable_address": None, "flat_appartment": element[-1][0][4], "builing": element[-1][0][10], "number": element[-1][0][3], @@ -195,98 +196,117 @@ class ConvertDataToDict(beam.DoFn): } # Create a human readable address to go in the dict. - json_object["readable_address"] = self.get_readable_address( - [ - json_object["flat_appartment"], - json_object["builing"], - f'{json_object["number"]} {json_object["street"]}', - json_object["postcode"], - ], - [ - json_object["locality"], - json_object["town"], - json_object["district"], - json_object["county"], - ], - ) + # json_object["readable_address"] = self.get_readable_address( + # [ + # json_object["flat_appartment"], + # json_object["builing"], + # f'{json_object["number"]} {json_object["street"]}', + # json_object["postcode"], + # ], + # [ + # json_object["locality"], + # json_object["town"], + # json_object["district"], + # json_object["county"], + # ], + # ) yield json_object def main(): # Load in the data from a csv file. - csv_data = resources.path( - # "analyse_properties.data.input", - # "pp-monthly-update-new-version.csv" - "analyse_properties.data.input", "pp-complete.csv" + # csv_data = resources.path( + # "analyse_properties.data.input", + # "pp-monthly-update-new-version.csv" + # # "analyse_properties.data.input", + # # "pp-complete.csv", + # ) + + options = PipelineOptions( + [ + "--runner=PortableRunner", + "--job_endpoint=localhost:8099", + "--environment_type=LOOPBACK", + ] ) - with beam.Pipeline() as pipeline: + with beam.Pipeline(options=options) as pipeline: # Load the data - with csv_data as csv_data_file: - # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170 - load = ( - pipeline - | fileio.MatchFiles(str(csv_data_file)) - | fileio.ReadMatches() - | beam.FlatMap(csv_reader) - ) + # with csv_data as csv_data_file: + # # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170 + # load = ( + # pipeline + # | fileio.MatchFiles(str(csv_data_file)) + # | fileio.ReadMatches() + # | beam.FlatMap(csv_reader) + # ) + + load = pipeline | beam.Create( + [ + "🍓Strawberry,🥕Carrot,🍆Eggplant", + "🍅Tomato,🥔Potato", + ] + ) # Clean the data by dropping unneeded rows. - clean_drop = ( - load - | "Drop unneeded columns" - >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) - | "Convert to Upper Case" - >> beam.Map(lambda element: [e.upper() for e in element]) - | "Strip leading/trailing whitespace" - >> beam.Map(lambda element: [e.strip() for e in element]) - | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) - | "Drop empty PAON if missing SAON" - >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) - | "Split PAON into two columns if separated by comma" - >> beam.ParDo(SplitColumn(3, ",")) - ) + # clean_drop = ( + # load + # | "Drop unneeded columns" + # >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) + # | "Convert to Upper Case" + # >> beam.Map(lambda element: [e.upper() for e in element]) + # | "Strip leading/trailing whitespace" + # >> beam.Map(lambda element: [e.strip() for e in element]) + # | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) + # | "Drop empty PAON if missing SAON" + # >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) + # | "Split PAON into two columns if separated by comma" + # >> beam.ParDo(SplitColumn(3, ",")) + # ) - # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. - clean_deduplicate = ( - clean_drop - | "Generate unique ID for all columns" - >> beam.ParDo(GenerateUniqueID(all_columns=True)) - | "Group by the ID for all columns" - >> beam.GroupBy(lambda element: element[-1]) - | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) - ) + # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows. + # clean_deduplicate = ( + # clean_drop + # | "Generate unique ID for all columns" + # >> beam.ParDo(GenerateUniqueID(all_columns=True)) + # | "Group by the ID for all columns" + # >> beam.GroupBy(lambda element: element[-1]) + # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) + # ) - # Prepare the data by generating an ID using the uniquely identifying information only - # and grouping them by this ID. - prepare = ( - clean_deduplicate - | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) - | "Generate unique ID ignoring price & date" - >> beam.ParDo(GenerateUniqueID()) - | "Group by the ID ignoring price & date" - >> beam.GroupBy(lambda element: element[-1]) - ) + # # Prepare the data by generating an ID using the uniquely identifying information only + # # and grouping them by this ID. + # prepare = ( + # clean_deduplicate + # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) + # | "Generate unique ID ignoring price & date" + # >> beam.ParDo(GenerateUniqueID()) + # | "Group by the ID ignoring price & date" + # >> beam.GroupBy(lambda element: element[-1]) + # ) - # Format the data into a dict. - formatted = ( - prepare - | "Convert the prepared data into a dict object" - >> beam.ParDo(ConvertDataToDict()) - ) + # # Format the data into a dict. + # formatted = ( + # prepare + # | "Convert the prepared data into a dict object" + # >> beam.ParDo(ConvertDataToDict()) + # ) - # Save the data to a .json file. - output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete" - output = ( - formatted - | "Combine into one PCollection" >> beam.combiners.ToList() - | "Save to .json file" - >> beam.io.WriteToText( - file_path_prefix=str(output_file), - file_name_suffix=".json", - shard_name_template="", - ) - ) + # # Save the data to a .json file. + # output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete" + # # output_file = "/tmp/file" + + # ( + # formatted + # | "Combine into one PCollection" >> beam.combiners.ToList() + # | beam.Map(print) + # # | "Save to .json file" + # # >> beam.io.WriteToText( + # # file_path_prefix=str(output_file), + # # file_name_suffix=".json", + # # shard_name_template="", + # # ) + # ) if __name__ == "__main__":