mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 11:55:45 +00:00
Compare commits
2 Commits
wip/datafl
...
wip/spark_
| Author | SHA1 | Date | |
|---|---|---|---|
| 5505dbf24a | |||
| 2a1c4fe68e |
@@ -1 +0,0 @@
|
||||
3.7.9
|
||||
@@ -1,9 +1,2 @@
|
||||
# street_group_tech_test
|
||||
|
||||
Technical Test for Street Group for Daniel Tomlinson.
|
||||
|
||||
## Documentation
|
||||
|
||||
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
|
||||
|
||||
https://dtomlinson91.github.io/street_group_tech_test/
|
||||
Technical Test for Street Group
|
||||
|
||||
0
analyse_properties/data/__init__.py
Normal file
0
analyse_properties/data/__init__.py
Normal file
0
analyse_properties/data/input/__init__.py
Normal file
0
analyse_properties/data/input/__init__.py
Normal file
0
analyse_properties/data/output/__init__.py
Normal file
0
analyse_properties/data/output/__init__.py
Normal file
@@ -1,49 +1,35 @@
|
||||
import argparse
|
||||
import csv
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
import io
|
||||
from importlib import resources
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
import apache_beam as beam
|
||||
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||
from apache_beam.io import fileio
|
||||
from apache_beam.options.pipeline_options import PipelineOptions
|
||||
|
||||
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
||||
|
||||
|
||||
def csv_reader(csv_file):
|
||||
"""Read in a csv file."""
|
||||
return csv.reader(io.TextIOWrapper(csv_file.open()))
|
||||
|
||||
|
||||
def slice_by_range(element, *ranges):
|
||||
"""
|
||||
Slice a list with multiple ranges.
|
||||
|
||||
Args:
|
||||
element : The element.
|
||||
*ranges (tuple): Tuples containing a start,end index to slice the element.
|
||||
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
|
||||
|
||||
Returns:
|
||||
list: The list sliced by the ranges
|
||||
"""
|
||||
"""Slice a list with multiple ranges."""
|
||||
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
|
||||
|
||||
|
||||
class DropRecordsSingleEmptyColumn(beam.DoFn):
|
||||
"""If a given item in a list is empty, drop this entry from the PCollection."""
|
||||
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
|
||||
def process(self, element):
|
||||
"""
|
||||
Drop the entire row if a given column is empty.
|
||||
|
||||
Args:
|
||||
element : The element
|
||||
|
||||
Returns:
|
||||
None: If the length of the column is 0, drop the element.
|
||||
|
||||
Yields:
|
||||
element: If the length of the column isn't 0, keep the element.
|
||||
"""
|
||||
column = element[self.index]
|
||||
if len(column) == 0:
|
||||
return None
|
||||
@@ -99,9 +85,9 @@ class GenerateUniqueID(beam.DoFn):
|
||||
",".join(element[2:]) if not self.all_columns else ",".join(element)
|
||||
)
|
||||
hashed_string = hashlib.md5(unique_string.encode())
|
||||
# add the hash as a key to the data.
|
||||
new_element = (hashed_string.hexdigest(), list(element))
|
||||
yield new_element
|
||||
# append the hash to the end
|
||||
element.append(hashed_string.hexdigest())
|
||||
yield element
|
||||
|
||||
|
||||
class DeduplicateByID(beam.DoFn):
|
||||
@@ -111,8 +97,8 @@ class DeduplicateByID(beam.DoFn):
|
||||
"""
|
||||
|
||||
def process(self, element):
|
||||
if len(list(element[1])) > 0:
|
||||
deduplicated_element = (element[0], [list(element[1])[0]])
|
||||
if len(element[1]) > 0:
|
||||
deduplicated_element = (element[0], [element[1][0]])
|
||||
yield deduplicated_element
|
||||
else:
|
||||
yield element
|
||||
@@ -123,6 +109,7 @@ class RemoveUniqueID(beam.DoFn):
|
||||
|
||||
def process(self, element):
|
||||
element_no_id = element[-1][0]
|
||||
element_no_id.pop(-1)
|
||||
yield element_no_id
|
||||
|
||||
|
||||
@@ -190,15 +177,15 @@ class ConvertDataToDict(beam.DoFn):
|
||||
json_object = {
|
||||
"property_id": element[0],
|
||||
# "readable_address": None,
|
||||
"flat_appartment": list(element[-1])[0][4],
|
||||
"builing": list(element[-1])[0][10],
|
||||
"number": list(element[-1])[0][3],
|
||||
"street": list(element[-1])[0][5],
|
||||
"locality": list(element[-1])[0][6],
|
||||
"town": list(element[-1])[0][7],
|
||||
"district": list(element[-1])[0][8],
|
||||
"county": list(element[-1])[0][9],
|
||||
"postcode": list(element[-1])[0][2],
|
||||
"flat_appartment": element[-1][0][4],
|
||||
"builing": element[-1][0][10],
|
||||
"number": element[-1][0][3],
|
||||
"street": element[-1][0][5],
|
||||
"locality": element[-1][0][6],
|
||||
"town": element[-1][0][7],
|
||||
"district": element[-1][0][8],
|
||||
"county": element[-1][0][9],
|
||||
"postcode": element[-1][0][2],
|
||||
"property_transactions": property_transactions,
|
||||
"latest_transaction_year": self.get_latest_transaction(
|
||||
[
|
||||
@@ -226,106 +213,101 @@ class ConvertDataToDict(beam.DoFn):
|
||||
yield json_object
|
||||
|
||||
|
||||
def run(argv=None, save_main_session=True):
|
||||
"""Entrypoint and definition of the pipeline."""
|
||||
# Default input/output files
|
||||
input_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "input"
|
||||
/ "pp-monthly-update-new-version.csv"
|
||||
)
|
||||
output_file = (
|
||||
pathlib.Path(__file__).parents[1]
|
||||
/ "data"
|
||||
/ "output"
|
||||
/ "pp-monthly-update-new-version"
|
||||
)
|
||||
|
||||
# Arguments
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--input", dest="input", default=str(input_file), help="Input file."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output", dest="output", default=str(output_file), help="Output file."
|
||||
)
|
||||
known_args, pipeline_args = parser.parse_known_args(argv)
|
||||
|
||||
# Pipeline options. save_main_session needed for DataFlow for global imports.
|
||||
pipeline_options = PipelineOptions(pipeline_args)
|
||||
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
|
||||
|
||||
def main():
|
||||
# Load in the data from a csv file.
|
||||
with beam.Pipeline(options=pipeline_options) as pipeline:
|
||||
# csv_data = resources.path(
|
||||
# "analyse_properties.data.input",
|
||||
# "pp-monthly-update-new-version.csv"
|
||||
# # "analyse_properties.data.input",
|
||||
# # "pp-complete.csv",
|
||||
# )
|
||||
|
||||
options = PipelineOptions(
|
||||
[
|
||||
"--runner=PortableRunner",
|
||||
"--job_endpoint=localhost:8099",
|
||||
"--environment_type=LOOPBACK",
|
||||
]
|
||||
)
|
||||
|
||||
with beam.Pipeline(options=options) as pipeline:
|
||||
# Load the data
|
||||
load = (
|
||||
pipeline
|
||||
| "Read input data" >> beam.io.ReadFromText(known_args.input)
|
||||
| "Split by ','" >> beam.Map(lambda element: element.split(","))
|
||||
| "Remove leading and trailing quotes"
|
||||
>> beam.Map(lambda element: [el.strip('"') for el in element])
|
||||
# with csv_data as csv_data_file:
|
||||
# # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
|
||||
# load = (
|
||||
# pipeline
|
||||
# | fileio.MatchFiles(str(csv_data_file))
|
||||
# | fileio.ReadMatches()
|
||||
# | beam.FlatMap(csv_reader)
|
||||
# )
|
||||
|
||||
load = pipeline | beam.Create(
|
||||
[
|
||||
"🍓Strawberry,🥕Carrot,🍆Eggplant",
|
||||
"🍅Tomato,🥔Potato",
|
||||
]
|
||||
)
|
||||
|
||||
# Clean the data by dropping unneeded rows.
|
||||
clean_drop = (
|
||||
load
|
||||
| "Drop unneeded columns"
|
||||
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
|
||||
| "Convert to Upper Case"
|
||||
>> beam.Map(lambda element: [e.upper() for e in element])
|
||||
| "Strip leading/trailing whitespace"
|
||||
>> beam.Map(lambda element: [e.strip() for e in element])
|
||||
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||
| "Drop empty PAON if missing SAON"
|
||||
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
|
||||
| "Split PAON into two columns if separated by comma"
|
||||
>> beam.ParDo(SplitColumn(3, ","))
|
||||
)
|
||||
# clean_drop = (
|
||||
# load
|
||||
# | "Drop unneeded columns"
|
||||
# >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
|
||||
# | "Convert to Upper Case"
|
||||
# >> beam.Map(lambda element: [e.upper() for e in element])
|
||||
# | "Strip leading/trailing whitespace"
|
||||
# >> beam.Map(lambda element: [e.strip() for e in element])
|
||||
# | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||
# | "Drop empty PAON if missing SAON"
|
||||
# >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||
# | "Split PAON into two columns if separated by comma"
|
||||
# >> beam.ParDo(SplitColumn(3, ","))
|
||||
# )
|
||||
|
||||
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
|
||||
clean_deduplicate = (
|
||||
clean_drop
|
||||
| "Generate unique ID for all columns"
|
||||
>> beam.ParDo(GenerateUniqueID(all_columns=True))
|
||||
| "Group by the ID for all columns"
|
||||
>> beam.GroupByKey()
|
||||
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||
)
|
||||
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
|
||||
# clean_deduplicate = (
|
||||
# clean_drop
|
||||
# | "Generate unique ID for all columns"
|
||||
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
|
||||
# | "Group by the ID for all columns"
|
||||
# >> beam.GroupBy(lambda element: element[-1])
|
||||
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||
# )
|
||||
|
||||
# Prepare the data by generating an ID using the uniquely identifying
|
||||
# information only and grouping them by this ID.
|
||||
prepare = (
|
||||
clean_deduplicate
|
||||
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
||||
| "Generate unique ID ignoring price & date"
|
||||
>> beam.ParDo(GenerateUniqueID())
|
||||
| "Group by the ID ignoring price & date"
|
||||
>> beam.GroupByKey()
|
||||
# | beam.Map(print)
|
||||
)
|
||||
# # Prepare the data by generating an ID using the uniquely identifying information only
|
||||
# # and grouping them by this ID.
|
||||
# prepare = (
|
||||
# clean_deduplicate
|
||||
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
||||
# | "Generate unique ID ignoring price & date"
|
||||
# >> beam.ParDo(GenerateUniqueID())
|
||||
# | "Group by the ID ignoring price & date"
|
||||
# >> beam.GroupBy(lambda element: element[-1])
|
||||
# )
|
||||
|
||||
# Format the data into a dict.
|
||||
formatted = (
|
||||
prepare
|
||||
| "Convert the prepared data into a dict object"
|
||||
>> beam.ParDo(ConvertDataToDict())
|
||||
)
|
||||
# # Format the data into a dict.
|
||||
# formatted = (
|
||||
# prepare
|
||||
# | "Convert the prepared data into a dict object"
|
||||
# >> beam.ParDo(ConvertDataToDict())
|
||||
# )
|
||||
|
||||
# Save the data to a .json file.
|
||||
(
|
||||
formatted
|
||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
||||
| "Format output" >> beam.Map(json.dumps)
|
||||
| "Save to .json file"
|
||||
>> beam.io.WriteToText(
|
||||
file_path_prefix=known_args.output,
|
||||
file_name_suffix=".json",
|
||||
)
|
||||
)
|
||||
# # Save the data to a .json file.
|
||||
# output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
|
||||
# # output_file = "/tmp/file"
|
||||
|
||||
# (
|
||||
# formatted
|
||||
# | "Combine into one PCollection" >> beam.combiners.ToList()
|
||||
# | beam.Map(print)
|
||||
# # | "Save to .json file"
|
||||
# # >> beam.io.WriteToText(
|
||||
# # file_path_prefix=str(output_file),
|
||||
# # file_name_suffix=".json",
|
||||
# # shard_name_template="",
|
||||
# # )
|
||||
# )
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
run()
|
||||
main()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Full data set
|
||||
# wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input
|
||||
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input
|
||||
|
||||
# Monthly update data set
|
||||
wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input
|
||||
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input
|
||||
|
||||
14
notes/docker-commands-spark.txt
Normal file
14
notes/docker-commands-spark.txt
Normal file
@@ -0,0 +1,14 @@
|
||||
docker run --rm \
|
||||
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
|
||||
--name=beam_spark \
|
||||
apache/beam_spark_job_server:latest
|
||||
|
||||
|
||||
docker pull apache/beam_spark_job_server:2.33.0_rc1
|
||||
|
||||
|
||||
docker run --rm \
|
||||
-e SPARK_DRIVER_MEMORY=8g \
|
||||
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
|
||||
--name=beam_spark \
|
||||
apache/beam_spark_job_server:latest
|
||||
@@ -1,24 +0,0 @@
|
||||
# DataFlow
|
||||
|
||||
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
|
||||
|
||||
Export env variable:
|
||||
|
||||
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
|
||||
|
||||
Run the pipeline:
|
||||
|
||||
python -m analyse_properties.main \
|
||||
--region europe-west2 \
|
||||
--input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \
|
||||
--output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \
|
||||
--runner DataflowRunner \
|
||||
--project street-group \
|
||||
--temp_location gs://street-group-technical-test-dmot/tmp
|
||||
|
||||
|
||||
## Errors
|
||||
|
||||
Unsubscriptable error on window:
|
||||
|
||||
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
|
||||
@@ -20,7 +20,6 @@ pylint:
|
||||
- super-init-not-called
|
||||
- arguments-differ
|
||||
- inconsistent-return-statements
|
||||
- expression-not-assigned
|
||||
enable:
|
||||
|
||||
options:
|
||||
|
||||
6
requirements-docs.txt
Normal file
6
requirements-docs.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
apache-beam==2.32.0; python_version >= "3.6"
|
||||
avro-python3==1.9.2.1; python_version >= "3.6"
|
||||
cachetools==4.2.2; python_version >= "3.6" and python_version < "4.0" and (python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.6.0")
|
||||
certifi==2021.5.30; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3.6"
|
||||
mkdocs-material==7.3.0
|
||||
mkdocs==1.2.2; python_version >= "3.6"
|
||||
Reference in New Issue
Block a user