mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 20:05:45 +00:00
Compare commits
2 Commits
develop
...
wip/spark_
| Author | SHA1 | Date | |
|---|---|---|---|
| 5505dbf24a | |||
| 2a1c4fe68e |
@@ -8,6 +8,7 @@ import pathlib
|
|||||||
|
|
||||||
import apache_beam as beam
|
import apache_beam as beam
|
||||||
from apache_beam.io import fileio
|
from apache_beam.io import fileio
|
||||||
|
from apache_beam.options.pipeline_options import PipelineOptions
|
||||||
|
|
||||||
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
||||||
|
|
||||||
@@ -175,7 +176,7 @@ class ConvertDataToDict(beam.DoFn):
|
|||||||
# Create the dict to hold all the information about the property.
|
# Create the dict to hold all the information about the property.
|
||||||
json_object = {
|
json_object = {
|
||||||
"property_id": element[0],
|
"property_id": element[0],
|
||||||
"readable_address": None,
|
# "readable_address": None,
|
||||||
"flat_appartment": element[-1][0][4],
|
"flat_appartment": element[-1][0][4],
|
||||||
"builing": element[-1][0][10],
|
"builing": element[-1][0][10],
|
||||||
"number": element[-1][0][3],
|
"number": element[-1][0][3],
|
||||||
@@ -195,98 +196,117 @@ class ConvertDataToDict(beam.DoFn):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Create a human readable address to go in the dict.
|
# Create a human readable address to go in the dict.
|
||||||
json_object["readable_address"] = self.get_readable_address(
|
# json_object["readable_address"] = self.get_readable_address(
|
||||||
[
|
# [
|
||||||
json_object["flat_appartment"],
|
# json_object["flat_appartment"],
|
||||||
json_object["builing"],
|
# json_object["builing"],
|
||||||
f'{json_object["number"]} {json_object["street"]}',
|
# f'{json_object["number"]} {json_object["street"]}',
|
||||||
json_object["postcode"],
|
# json_object["postcode"],
|
||||||
],
|
# ],
|
||||||
[
|
# [
|
||||||
json_object["locality"],
|
# json_object["locality"],
|
||||||
json_object["town"],
|
# json_object["town"],
|
||||||
json_object["district"],
|
# json_object["district"],
|
||||||
json_object["county"],
|
# json_object["county"],
|
||||||
],
|
# ],
|
||||||
)
|
# )
|
||||||
yield json_object
|
yield json_object
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# Load in the data from a csv file.
|
# Load in the data from a csv file.
|
||||||
csv_data = resources.path(
|
# csv_data = resources.path(
|
||||||
# "analyse_properties.data.input",
|
# "analyse_properties.data.input",
|
||||||
# "pp-monthly-update-new-version.csv"
|
# "pp-monthly-update-new-version.csv"
|
||||||
"analyse_properties.data.input", "pp-complete.csv"
|
# # "analyse_properties.data.input",
|
||||||
|
# # "pp-complete.csv",
|
||||||
|
# )
|
||||||
|
|
||||||
|
options = PipelineOptions(
|
||||||
|
[
|
||||||
|
"--runner=PortableRunner",
|
||||||
|
"--job_endpoint=localhost:8099",
|
||||||
|
"--environment_type=LOOPBACK",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
with beam.Pipeline() as pipeline:
|
with beam.Pipeline(options=options) as pipeline:
|
||||||
# Load the data
|
# Load the data
|
||||||
with csv_data as csv_data_file:
|
# with csv_data as csv_data_file:
|
||||||
# https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
|
# # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
|
||||||
load = (
|
# load = (
|
||||||
pipeline
|
# pipeline
|
||||||
| fileio.MatchFiles(str(csv_data_file))
|
# | fileio.MatchFiles(str(csv_data_file))
|
||||||
| fileio.ReadMatches()
|
# | fileio.ReadMatches()
|
||||||
| beam.FlatMap(csv_reader)
|
# | beam.FlatMap(csv_reader)
|
||||||
|
# )
|
||||||
|
|
||||||
|
load = pipeline | beam.Create(
|
||||||
|
[
|
||||||
|
"🍓Strawberry,🥕Carrot,🍆Eggplant",
|
||||||
|
"🍅Tomato,🥔Potato",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Clean the data by dropping unneeded rows.
|
# Clean the data by dropping unneeded rows.
|
||||||
clean_drop = (
|
# clean_drop = (
|
||||||
load
|
# load
|
||||||
| "Drop unneeded columns"
|
# | "Drop unneeded columns"
|
||||||
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
|
# >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
|
||||||
| "Convert to Upper Case"
|
# | "Convert to Upper Case"
|
||||||
>> beam.Map(lambda element: [e.upper() for e in element])
|
# >> beam.Map(lambda element: [e.upper() for e in element])
|
||||||
| "Strip leading/trailing whitespace"
|
# | "Strip leading/trailing whitespace"
|
||||||
>> beam.Map(lambda element: [e.strip() for e in element])
|
# >> beam.Map(lambda element: [e.strip() for e in element])
|
||||||
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
# | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||||
| "Drop empty PAON if missing SAON"
|
# | "Drop empty PAON if missing SAON"
|
||||||
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
# >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||||
| "Split PAON into two columns if separated by comma"
|
# | "Split PAON into two columns if separated by comma"
|
||||||
>> beam.ParDo(SplitColumn(3, ","))
|
# >> beam.ParDo(SplitColumn(3, ","))
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
|
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
|
||||||
clean_deduplicate = (
|
# clean_deduplicate = (
|
||||||
clean_drop
|
# clean_drop
|
||||||
| "Generate unique ID for all columns"
|
# | "Generate unique ID for all columns"
|
||||||
>> beam.ParDo(GenerateUniqueID(all_columns=True))
|
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
|
||||||
| "Group by the ID for all columns"
|
# | "Group by the ID for all columns"
|
||||||
>> beam.GroupBy(lambda element: element[-1])
|
# >> beam.GroupBy(lambda element: element[-1])
|
||||||
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Prepare the data by generating an ID using the uniquely identifying information only
|
# # Prepare the data by generating an ID using the uniquely identifying information only
|
||||||
# and grouping them by this ID.
|
# # and grouping them by this ID.
|
||||||
prepare = (
|
# prepare = (
|
||||||
clean_deduplicate
|
# clean_deduplicate
|
||||||
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
||||||
| "Generate unique ID ignoring price & date"
|
# | "Generate unique ID ignoring price & date"
|
||||||
>> beam.ParDo(GenerateUniqueID())
|
# >> beam.ParDo(GenerateUniqueID())
|
||||||
| "Group by the ID ignoring price & date"
|
# | "Group by the ID ignoring price & date"
|
||||||
>> beam.GroupBy(lambda element: element[-1])
|
# >> beam.GroupBy(lambda element: element[-1])
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Format the data into a dict.
|
# # Format the data into a dict.
|
||||||
formatted = (
|
# formatted = (
|
||||||
prepare
|
# prepare
|
||||||
| "Convert the prepared data into a dict object"
|
# | "Convert the prepared data into a dict object"
|
||||||
>> beam.ParDo(ConvertDataToDict())
|
# >> beam.ParDo(ConvertDataToDict())
|
||||||
)
|
# )
|
||||||
|
|
||||||
# Save the data to a .json file.
|
# # Save the data to a .json file.
|
||||||
output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
|
# output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
|
||||||
output = (
|
# # output_file = "/tmp/file"
|
||||||
formatted
|
|
||||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
# (
|
||||||
| "Save to .json file"
|
# formatted
|
||||||
>> beam.io.WriteToText(
|
# | "Combine into one PCollection" >> beam.combiners.ToList()
|
||||||
file_path_prefix=str(output_file),
|
# | beam.Map(print)
|
||||||
file_name_suffix=".json",
|
# # | "Save to .json file"
|
||||||
shard_name_template="",
|
# # >> beam.io.WriteToText(
|
||||||
)
|
# # file_path_prefix=str(output_file),
|
||||||
)
|
# # file_name_suffix=".json",
|
||||||
|
# # shard_name_template="",
|
||||||
|
# # )
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
14
notes/docker-commands-spark.txt
Normal file
14
notes/docker-commands-spark.txt
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
docker run --rm \
|
||||||
|
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
|
||||||
|
--name=beam_spark \
|
||||||
|
apache/beam_spark_job_server:latest
|
||||||
|
|
||||||
|
|
||||||
|
docker pull apache/beam_spark_job_server:2.33.0_rc1
|
||||||
|
|
||||||
|
|
||||||
|
docker run --rm \
|
||||||
|
-e SPARK_DRIVER_MEMORY=8g \
|
||||||
|
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
|
||||||
|
--name=beam_spark \
|
||||||
|
apache/beam_spark_job_server:latest
|
||||||
Reference in New Issue
Block a user