mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 20:05:45 +00:00
Compare commits
9 Commits
wip/pathli
...
wip/datafl
| Author | SHA1 | Date | |
|---|---|---|---|
| 8e8469579e | |||
| 4e3771c728 | |||
| 8856a9763f | |||
| fded858932 | |||
| bb71d55f8c | |||
| 8047b5ced4 | |||
| 9f53c66975 | |||
| e6ec110d54 | |||
| 83807616e0 |
@@ -1,25 +1,49 @@
|
|||||||
|
import argparse
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import itertools
|
import itertools
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
import apache_beam as beam
|
import apache_beam as beam
|
||||||
|
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||||
|
|
||||||
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
|
||||||
|
|
||||||
|
|
||||||
def slice_by_range(element, *ranges):
|
def slice_by_range(element, *ranges):
|
||||||
"""Slice a list with multiple ranges."""
|
"""
|
||||||
|
Slice a list with multiple ranges.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
element : The element.
|
||||||
|
*ranges (tuple): Tuples containing a start,end index to slice the element.
|
||||||
|
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: The list sliced by the ranges
|
||||||
|
"""
|
||||||
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
|
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
|
||||||
|
|
||||||
|
|
||||||
class DropRecordsSingleEmptyColumn(beam.DoFn):
|
class DropRecordsSingleEmptyColumn(beam.DoFn):
|
||||||
"""If a given item in a list is empty, drop this entry from the PCollection."""
|
|
||||||
|
|
||||||
def __init__(self, index):
|
def __init__(self, index):
|
||||||
self.index = index
|
self.index = index
|
||||||
|
|
||||||
def process(self, element):
|
def process(self, element):
|
||||||
|
"""
|
||||||
|
Drop the entire row if a given column is empty.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
element : The element
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None: If the length of the column is 0, drop the element.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
element: If the length of the column isn't 0, keep the element.
|
||||||
|
"""
|
||||||
column = element[self.index]
|
column = element[self.index]
|
||||||
if len(column) == 0:
|
if len(column) == 0:
|
||||||
return None
|
return None
|
||||||
@@ -75,9 +99,9 @@ class GenerateUniqueID(beam.DoFn):
|
|||||||
",".join(element[2:]) if not self.all_columns else ",".join(element)
|
",".join(element[2:]) if not self.all_columns else ",".join(element)
|
||||||
)
|
)
|
||||||
hashed_string = hashlib.md5(unique_string.encode())
|
hashed_string = hashlib.md5(unique_string.encode())
|
||||||
# append the hash to the end
|
# add the hash as a key to the data.
|
||||||
element.append(hashed_string.hexdigest())
|
new_element = (hashed_string.hexdigest(), list(element))
|
||||||
yield element
|
yield new_element
|
||||||
|
|
||||||
|
|
||||||
class DeduplicateByID(beam.DoFn):
|
class DeduplicateByID(beam.DoFn):
|
||||||
@@ -87,8 +111,8 @@ class DeduplicateByID(beam.DoFn):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def process(self, element):
|
def process(self, element):
|
||||||
if len(element[1]) > 0:
|
if len(list(element[1])) > 0:
|
||||||
deduplicated_element = (element[0], [element[1][0]])
|
deduplicated_element = (element[0], [list(element[1])[0]])
|
||||||
yield deduplicated_element
|
yield deduplicated_element
|
||||||
else:
|
else:
|
||||||
yield element
|
yield element
|
||||||
@@ -99,7 +123,6 @@ class RemoveUniqueID(beam.DoFn):
|
|||||||
|
|
||||||
def process(self, element):
|
def process(self, element):
|
||||||
element_no_id = element[-1][0]
|
element_no_id = element[-1][0]
|
||||||
element_no_id.pop(-1)
|
|
||||||
yield element_no_id
|
yield element_no_id
|
||||||
|
|
||||||
|
|
||||||
@@ -166,16 +189,16 @@ class ConvertDataToDict(beam.DoFn):
|
|||||||
# Create the dict to hold all the information about the property.
|
# Create the dict to hold all the information about the property.
|
||||||
json_object = {
|
json_object = {
|
||||||
"property_id": element[0],
|
"property_id": element[0],
|
||||||
"readable_address": None,
|
# "readable_address": None,
|
||||||
"flat_appartment": element[-1][0][4],
|
"flat_appartment": list(element[-1])[0][4],
|
||||||
"builing": element[-1][0][10],
|
"builing": list(element[-1])[0][10],
|
||||||
"number": element[-1][0][3],
|
"number": list(element[-1])[0][3],
|
||||||
"street": element[-1][0][5],
|
"street": list(element[-1])[0][5],
|
||||||
"locality": element[-1][0][6],
|
"locality": list(element[-1])[0][6],
|
||||||
"town": element[-1][0][7],
|
"town": list(element[-1])[0][7],
|
||||||
"district": element[-1][0][8],
|
"district": list(element[-1])[0][8],
|
||||||
"county": element[-1][0][9],
|
"county": list(element[-1])[0][9],
|
||||||
"postcode": element[-1][0][2],
|
"postcode": list(element[-1])[0][2],
|
||||||
"property_transactions": property_transactions,
|
"property_transactions": property_transactions,
|
||||||
"latest_transaction_year": self.get_latest_transaction(
|
"latest_transaction_year": self.get_latest_transaction(
|
||||||
[
|
[
|
||||||
@@ -186,37 +209,59 @@ class ConvertDataToDict(beam.DoFn):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Create a human readable address to go in the dict.
|
# Create a human readable address to go in the dict.
|
||||||
json_object["readable_address"] = self.get_readable_address(
|
# json_object["readable_address"] = self.get_readable_address(
|
||||||
[
|
# [
|
||||||
json_object["flat_appartment"],
|
# json_object["flat_appartment"],
|
||||||
json_object["builing"],
|
# json_object["builing"],
|
||||||
f'{json_object["number"]} {json_object["street"]}',
|
# f'{json_object["number"]} {json_object["street"]}',
|
||||||
json_object["postcode"],
|
# json_object["postcode"],
|
||||||
],
|
# ],
|
||||||
[
|
# [
|
||||||
json_object["locality"],
|
# json_object["locality"],
|
||||||
json_object["town"],
|
# json_object["town"],
|
||||||
json_object["district"],
|
# json_object["district"],
|
||||||
json_object["county"],
|
# json_object["county"],
|
||||||
],
|
# ],
|
||||||
)
|
# )
|
||||||
yield json_object
|
yield json_object
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def run(argv=None, save_main_session=True):
|
||||||
# Load in the data from a csv file.
|
"""Entrypoint and definition of the pipeline."""
|
||||||
|
# Default input/output files
|
||||||
input_file = (
|
input_file = (
|
||||||
pathlib.Path(__file__).parents[1]
|
pathlib.Path(__file__).parents[1]
|
||||||
/ "data"
|
/ "data"
|
||||||
/ "input"
|
/ "input"
|
||||||
/ "pp-monthly-update-new-version.csv"
|
/ "pp-monthly-update-new-version.csv"
|
||||||
)
|
)
|
||||||
|
output_file = (
|
||||||
|
pathlib.Path(__file__).parents[1]
|
||||||
|
/ "data"
|
||||||
|
/ "output"
|
||||||
|
/ "pp-monthly-update-new-version"
|
||||||
|
)
|
||||||
|
|
||||||
with beam.Pipeline() as pipeline:
|
# Arguments
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
"--input", dest="input", default=str(input_file), help="Input file."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--output", dest="output", default=str(output_file), help="Output file."
|
||||||
|
)
|
||||||
|
known_args, pipeline_args = parser.parse_known_args(argv)
|
||||||
|
|
||||||
|
# Pipeline options. save_main_session needed for DataFlow for global imports.
|
||||||
|
pipeline_options = PipelineOptions(pipeline_args)
|
||||||
|
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
|
||||||
|
|
||||||
|
# Load in the data from a csv file.
|
||||||
|
with beam.Pipeline(options=pipeline_options) as pipeline:
|
||||||
# Load the data
|
# Load the data
|
||||||
load = (
|
load = (
|
||||||
pipeline
|
pipeline
|
||||||
| "Read input data" >> beam.io.ReadFromText(str(input_file))
|
| "Read input data" >> beam.io.ReadFromText(known_args.input)
|
||||||
| "Split by ','" >> beam.Map(lambda element: element.split(","))
|
| "Split by ','" >> beam.Map(lambda element: element.split(","))
|
||||||
| "Remove leading and trailing quotes"
|
| "Remove leading and trailing quotes"
|
||||||
>> beam.Map(lambda element: [el.strip('"') for el in element])
|
>> beam.Map(lambda element: [el.strip('"') for el in element])
|
||||||
@@ -234,6 +279,7 @@ def main():
|
|||||||
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||||
| "Drop empty PAON if missing SAON"
|
| "Drop empty PAON if missing SAON"
|
||||||
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||||
|
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
|
||||||
| "Split PAON into two columns if separated by comma"
|
| "Split PAON into two columns if separated by comma"
|
||||||
>> beam.ParDo(SplitColumn(3, ","))
|
>> beam.ParDo(SplitColumn(3, ","))
|
||||||
)
|
)
|
||||||
@@ -244,19 +290,20 @@ def main():
|
|||||||
| "Generate unique ID for all columns"
|
| "Generate unique ID for all columns"
|
||||||
>> beam.ParDo(GenerateUniqueID(all_columns=True))
|
>> beam.ParDo(GenerateUniqueID(all_columns=True))
|
||||||
| "Group by the ID for all columns"
|
| "Group by the ID for all columns"
|
||||||
>> beam.GroupBy(lambda element: element[-1])
|
>> beam.GroupByKey()
|
||||||
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||||
)
|
)
|
||||||
|
|
||||||
# Prepare the data by generating an ID using the uniquely identifying information only
|
# Prepare the data by generating an ID using the uniquely identifying
|
||||||
# and grouping them by this ID.
|
# information only and grouping them by this ID.
|
||||||
prepare = (
|
prepare = (
|
||||||
clean_deduplicate
|
clean_deduplicate
|
||||||
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
|
||||||
| "Generate unique ID ignoring price & date"
|
| "Generate unique ID ignoring price & date"
|
||||||
>> beam.ParDo(GenerateUniqueID())
|
>> beam.ParDo(GenerateUniqueID())
|
||||||
| "Group by the ID ignoring price & date"
|
| "Group by the ID ignoring price & date"
|
||||||
>> beam.GroupBy(lambda element: element[-1])
|
>> beam.GroupByKey()
|
||||||
|
# | beam.Map(print)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Format the data into a dict.
|
# Format the data into a dict.
|
||||||
@@ -267,19 +314,18 @@ def main():
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Save the data to a .json file.
|
# Save the data to a .json file.
|
||||||
output_file = (
|
(
|
||||||
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
|
|
||||||
)
|
|
||||||
output = (
|
|
||||||
formatted
|
formatted
|
||||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
| "Combine into one PCollection" >> beam.combiners.ToList()
|
||||||
|
| "Format output" >> beam.Map(json.dumps)
|
||||||
| "Save to .json file"
|
| "Save to .json file"
|
||||||
>> beam.io.WriteToText(
|
>> beam.io.WriteToText(
|
||||||
file_path_prefix=str(output_file),
|
file_path_prefix=known_args.output,
|
||||||
file_name_suffix=".json",
|
file_name_suffix=".json",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
|
run()
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
# Full data set
|
# Full data set
|
||||||
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P data/input
|
# wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input
|
||||||
|
|
||||||
# Monthly update data set
|
# Monthly update data set
|
||||||
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P data/input
|
wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input
|
||||||
|
|||||||
24
notes/documentation/dataflow.md
Normal file
24
notes/documentation/dataflow.md
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
# DataFlow
|
||||||
|
|
||||||
|
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
|
||||||
|
|
||||||
|
Export env variable:
|
||||||
|
|
||||||
|
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
|
||||||
|
|
||||||
|
Run the pipeline:
|
||||||
|
|
||||||
|
python -m analyse_properties.main \
|
||||||
|
--region europe-west2 \
|
||||||
|
--input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \
|
||||||
|
--output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \
|
||||||
|
--runner DataflowRunner \
|
||||||
|
--project street-group \
|
||||||
|
--temp_location gs://street-group-technical-test-dmot/tmp
|
||||||
|
|
||||||
|
|
||||||
|
## Errors
|
||||||
|
|
||||||
|
Unsubscriptable error on window:
|
||||||
|
|
||||||
|
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
|
||||||
@@ -20,6 +20,7 @@ pylint:
|
|||||||
- super-init-not-called
|
- super-init-not-called
|
||||||
- arguments-differ
|
- arguments-differ
|
||||||
- inconsistent-return-statements
|
- inconsistent-return-statements
|
||||||
|
- expression-not-assigned
|
||||||
enable:
|
enable:
|
||||||
|
|
||||||
options:
|
options:
|
||||||
|
|||||||
Reference in New Issue
Block a user