2 Commits

Author SHA1 Message Date
5505dbf24a adding docker commands for spark 2021-09-26 05:34:30 +01:00
2a1c4fe68e adding spark runner 2021-09-26 05:33:39 +01:00
2 changed files with 113 additions and 79 deletions

View File

@@ -8,6 +8,7 @@ import pathlib
import apache_beam as beam import apache_beam as beam
from apache_beam.io import fileio from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
@@ -175,7 +176,7 @@ class ConvertDataToDict(beam.DoFn):
# Create the dict to hold all the information about the property. # Create the dict to hold all the information about the property.
json_object = { json_object = {
"property_id": element[0], "property_id": element[0],
"readable_address": None, # "readable_address": None,
"flat_appartment": element[-1][0][4], "flat_appartment": element[-1][0][4],
"builing": element[-1][0][10], "builing": element[-1][0][10],
"number": element[-1][0][3], "number": element[-1][0][3],
@@ -195,98 +196,117 @@ class ConvertDataToDict(beam.DoFn):
} }
# Create a human readable address to go in the dict. # Create a human readable address to go in the dict.
json_object["readable_address"] = self.get_readable_address( # json_object["readable_address"] = self.get_readable_address(
[ # [
json_object["flat_appartment"], # json_object["flat_appartment"],
json_object["builing"], # json_object["builing"],
f'{json_object["number"]} {json_object["street"]}', # f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"], # json_object["postcode"],
], # ],
[ # [
json_object["locality"], # json_object["locality"],
json_object["town"], # json_object["town"],
json_object["district"], # json_object["district"],
json_object["county"], # json_object["county"],
], # ],
) # )
yield json_object yield json_object
def main(): def main():
# Load in the data from a csv file. # Load in the data from a csv file.
csv_data = resources.path( # csv_data = resources.path(
# "analyse_properties.data.input", # "analyse_properties.data.input",
# "pp-monthly-update-new-version.csv" # "pp-monthly-update-new-version.csv"
"analyse_properties.data.input", "pp-complete.csv" # # "analyse_properties.data.input",
# # "pp-complete.csv",
# )
options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
) )
with beam.Pipeline() as pipeline: with beam.Pipeline(options=options) as pipeline:
# Load the data # Load the data
with csv_data as csv_data_file: # with csv_data as csv_data_file:
# https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170 # # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
load = ( # load = (
pipeline # pipeline
| fileio.MatchFiles(str(csv_data_file)) # | fileio.MatchFiles(str(csv_data_file))
| fileio.ReadMatches() # | fileio.ReadMatches()
| beam.FlatMap(csv_reader) # | beam.FlatMap(csv_reader)
# )
load = pipeline | beam.Create(
[
"🍓Strawberry,🥕Carrot,🍆Eggplant",
"🍅Tomato,🥔Potato",
]
) )
# Clean the data by dropping unneeded rows. # Clean the data by dropping unneeded rows.
clean_drop = ( # clean_drop = (
load # load
| "Drop unneeded columns" # | "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14)))) # >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case" # | "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element]) # >> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace" # | "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element]) # >> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) # | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON" # | "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) # >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
| "Split PAON into two columns if separated by comma" # | "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ",")) # >> beam.ParDo(SplitColumn(3, ","))
) # )
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows. # # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = ( # clean_deduplicate = (
clean_drop # clean_drop
| "Generate unique ID for all columns" # | "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True)) # >> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns" # | "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1]) # >> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) # | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
) # )
# Prepare the data by generating an ID using the uniquely identifying information only # # Prepare the data by generating an ID using the uniquely identifying information only
# and grouping them by this ID. # # and grouping them by this ID.
prepare = ( # prepare = (
clean_deduplicate # clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) # | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date" # | "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID()) # >> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date" # | "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1]) # >> beam.GroupBy(lambda element: element[-1])
) # )
# Format the data into a dict. # # Format the data into a dict.
formatted = ( # formatted = (
prepare # prepare
| "Convert the prepared data into a dict object" # | "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict()) # >> beam.ParDo(ConvertDataToDict())
) # )
# Save the data to a .json file. # # Save the data to a .json file.
output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete" # output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
output = ( # # output_file = "/tmp/file"
formatted
| "Combine into one PCollection" >> beam.combiners.ToList() # (
| "Save to .json file" # formatted
>> beam.io.WriteToText( # | "Combine into one PCollection" >> beam.combiners.ToList()
file_path_prefix=str(output_file), # | beam.Map(print)
file_name_suffix=".json", # # | "Save to .json file"
shard_name_template="", # # >> beam.io.WriteToText(
) # # file_path_prefix=str(output_file),
) # # file_name_suffix=".json",
# # shard_name_template="",
# # )
# )
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,14 @@
docker run --rm \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest
docker pull apache/beam_spark_job_server:2.33.0_rc1
docker run --rm \
-e SPARK_DRIVER_MEMORY=8g \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest