5 Commits

9 changed files with 90 additions and 130 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.7.9

View File

@@ -1,2 +1,9 @@
# street_group_tech_test
Technical Test for Street Group
Technical Test for Street Group for Daniel Tomlinson.
## Documentation
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
https://dtomlinson91.github.io/street_group_tech_test/

View File

@@ -1,23 +1,13 @@
import csv
from datetime import datetime
import hashlib
import io
from importlib import resources
import itertools
import pathlib
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
"""Read in a csv file."""
return csv.reader(io.TextIOWrapper(csv_file.open()))
def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges."""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
@@ -176,7 +166,7 @@ class ConvertDataToDict(beam.DoFn):
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
# "readable_address": None,
"readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
@@ -196,117 +186,99 @@ class ConvertDataToDict(beam.DoFn):
}
# Create a human readable address to go in the dict.
# json_object["readable_address"] = self.get_readable_address(
# [
# json_object["flat_appartment"],
# json_object["builing"],
# f'{json_object["number"]} {json_object["street"]}',
# json_object["postcode"],
# ],
# [
# json_object["locality"],
# json_object["town"],
# json_object["district"],
# json_object["county"],
# ],
# )
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
def main():
# Load in the data from a csv file.
# csv_data = resources.path(
# "analyse_properties.data.input",
# "pp-monthly-update-new-version.csv"
# # "analyse_properties.data.input",
# # "pp-complete.csv",
# )
options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
input_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-monthly-update-new-version.csv"
)
with beam.Pipeline(options=options) as pipeline:
with beam.Pipeline() as pipeline:
# Load the data
# with csv_data as csv_data_file:
# # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
# load = (
# pipeline
# | fileio.MatchFiles(str(csv_data_file))
# | fileio.ReadMatches()
# | beam.FlatMap(csv_reader)
# )
load = pipeline | beam.Create(
[
"🍓Strawberry,🥕Carrot,🍆Eggplant",
"🍅Tomato,🥔Potato",
]
load = (
pipeline
| "Read input data" >> beam.io.ReadFromText(str(input_file))
| "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element])
)
# Clean the data by dropping unneeded rows.
# clean_drop = (
# load
# | "Drop unneeded columns"
# >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
# | "Convert to Upper Case"
# >> beam.Map(lambda element: [e.upper() for e in element])
# | "Strip leading/trailing whitespace"
# >> beam.Map(lambda element: [e.strip() for e in element])
# | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
# | "Drop empty PAON if missing SAON"
# >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | "Split PAON into two columns if separated by comma"
# >> beam.ParDo(SplitColumn(3, ","))
# )
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
# clean_deduplicate = (
# clean_drop
# | "Generate unique ID for all columns"
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
# | "Group by the ID for all columns"
# >> beam.GroupBy(lambda element: element[-1])
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
# )
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
)
# # Prepare the data by generating an ID using the uniquely identifying information only
# # and grouping them by this ID.
# prepare = (
# clean_deduplicate
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
# | "Generate unique ID ignoring price & date"
# >> beam.ParDo(GenerateUniqueID())
# | "Group by the ID ignoring price & date"
# >> beam.GroupBy(lambda element: element[-1])
# )
# Prepare the data by generating an ID using the uniquely identifying information only
# and grouping them by this ID.
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1])
)
# # Format the data into a dict.
# formatted = (
# prepare
# | "Convert the prepared data into a dict object"
# >> beam.ParDo(ConvertDataToDict())
# )
# Format the data into a dict.
formatted = (
prepare
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# # Save the data to a .json file.
# output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
# # output_file = "/tmp/file"
# (
# formatted
# | "Combine into one PCollection" >> beam.combiners.ToList()
# | beam.Map(print)
# # | "Save to .json file"
# # >> beam.io.WriteToText(
# # file_path_prefix=str(output_file),
# # file_name_suffix=".json",
# # shard_name_template="",
# # )
# )
# Save the data to a .json file.
output_file = (
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
)
output = (
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=str(output_file),
file_name_suffix=".json",
)
)
if __name__ == "__main__":

View File

@@ -1,5 +1,5 @@
# Full data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P data/input
# Monthly update data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P data/input

View File

@@ -1,14 +0,0 @@
docker run --rm \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest
docker pull apache/beam_spark_job_server:2.33.0_rc1
docker run --rm \
-e SPARK_DRIVER_MEMORY=8g \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest

View File

@@ -1,6 +0,0 @@
apache-beam==2.32.0; python_version >= "3.6"
avro-python3==1.9.2.1; python_version >= "3.6"
cachetools==4.2.2; python_version >= "3.6" and python_version < "4.0" and (python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.6.0")
certifi==2021.5.30; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3.6"
mkdocs-material==7.3.0
mkdocs==1.2.2; python_version >= "3.6"