9 Commits

4 changed files with 120 additions and 49 deletions

View File

@@ -1,25 +1,49 @@
import argparse
from datetime import datetime from datetime import datetime
import hashlib import hashlib
import itertools import itertools
import json
import logging
import pathlib import pathlib
import apache_beam as beam import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn # from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def slice_by_range(element, *ranges): def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges.""" """
Slice a list with multiple ranges.
Args:
element : The element.
*ranges (tuple): Tuples containing a start,end index to slice the element.
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
Returns:
list: The list sliced by the ranges
"""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn): class DropRecordsSingleEmptyColumn(beam.DoFn):
"""If a given item in a list is empty, drop this entry from the PCollection."""
def __init__(self, index): def __init__(self, index):
self.index = index self.index = index
def process(self, element): def process(self, element):
"""
Drop the entire row if a given column is empty.
Args:
element : The element
Returns:
None: If the length of the column is 0, drop the element.
Yields:
element: If the length of the column isn't 0, keep the element.
"""
column = element[self.index] column = element[self.index]
if len(column) == 0: if len(column) == 0:
return None return None
@@ -75,9 +99,9 @@ class GenerateUniqueID(beam.DoFn):
",".join(element[2:]) if not self.all_columns else ",".join(element) ",".join(element[2:]) if not self.all_columns else ",".join(element)
) )
hashed_string = hashlib.md5(unique_string.encode()) hashed_string = hashlib.md5(unique_string.encode())
# append the hash to the end # add the hash as a key to the data.
element.append(hashed_string.hexdigest()) new_element = (hashed_string.hexdigest(), list(element))
yield element yield new_element
class DeduplicateByID(beam.DoFn): class DeduplicateByID(beam.DoFn):
@@ -87,8 +111,8 @@ class DeduplicateByID(beam.DoFn):
""" """
def process(self, element): def process(self, element):
if len(element[1]) > 0: if len(list(element[1])) > 0:
deduplicated_element = (element[0], [element[1][0]]) deduplicated_element = (element[0], [list(element[1])[0]])
yield deduplicated_element yield deduplicated_element
else: else:
yield element yield element
@@ -99,7 +123,6 @@ class RemoveUniqueID(beam.DoFn):
def process(self, element): def process(self, element):
element_no_id = element[-1][0] element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id yield element_no_id
@@ -166,16 +189,16 @@ class ConvertDataToDict(beam.DoFn):
# Create the dict to hold all the information about the property. # Create the dict to hold all the information about the property.
json_object = { json_object = {
"property_id": element[0], "property_id": element[0],
"readable_address": None, # "readable_address": None,
"flat_appartment": element[-1][0][4], "flat_appartment": list(element[-1])[0][4],
"builing": element[-1][0][10], "builing": list(element[-1])[0][10],
"number": element[-1][0][3], "number": list(element[-1])[0][3],
"street": element[-1][0][5], "street": list(element[-1])[0][5],
"locality": element[-1][0][6], "locality": list(element[-1])[0][6],
"town": element[-1][0][7], "town": list(element[-1])[0][7],
"district": element[-1][0][8], "district": list(element[-1])[0][8],
"county": element[-1][0][9], "county": list(element[-1])[0][9],
"postcode": element[-1][0][2], "postcode": list(element[-1])[0][2],
"property_transactions": property_transactions, "property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction( "latest_transaction_year": self.get_latest_transaction(
[ [
@@ -186,37 +209,59 @@ class ConvertDataToDict(beam.DoFn):
} }
# Create a human readable address to go in the dict. # Create a human readable address to go in the dict.
json_object["readable_address"] = self.get_readable_address( # json_object["readable_address"] = self.get_readable_address(
[ # [
json_object["flat_appartment"], # json_object["flat_appartment"],
json_object["builing"], # json_object["builing"],
f'{json_object["number"]} {json_object["street"]}', # f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"], # json_object["postcode"],
], # ],
[ # [
json_object["locality"], # json_object["locality"],
json_object["town"], # json_object["town"],
json_object["district"], # json_object["district"],
json_object["county"], # json_object["county"],
], # ],
) # )
yield json_object yield json_object
def main(): def run(argv=None, save_main_session=True):
# Load in the data from a csv file. """Entrypoint and definition of the pipeline."""
# Default input/output files
input_file = ( input_file = (
pathlib.Path(__file__).parents[1] pathlib.Path(__file__).parents[1]
/ "data" / "data"
/ "input" / "input"
/ "pp-monthly-update-new-version.csv" / "pp-monthly-update-new-version.csv"
) )
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-monthly-update-new-version"
)
with beam.Pipeline() as pipeline: # Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input", dest="input", default=str(input_file), help="Input file."
)
parser.add_argument(
"--output", dest="output", default=str(output_file), help="Output file."
)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options. save_main_session needed for DataFlow for global imports.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# Load in the data from a csv file.
with beam.Pipeline(options=pipeline_options) as pipeline:
# Load the data # Load the data
load = ( load = (
pipeline pipeline
| "Read input data" >> beam.io.ReadFromText(str(input_file)) | "Read input data" >> beam.io.ReadFromText(known_args.input)
| "Split by ','" >> beam.Map(lambda element: element.split(",")) | "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes" | "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element]) >> beam.Map(lambda element: [el.strip('"') for el in element])
@@ -234,6 +279,7 @@ def main():
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2)) | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON" | "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4)) >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
| "Split PAON into two columns if separated by comma" | "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ",")) >> beam.ParDo(SplitColumn(3, ","))
) )
@@ -244,19 +290,20 @@ def main():
| "Generate unique ID for all columns" | "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True)) >> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns" | "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1]) >> beam.GroupByKey()
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID()) | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
) )
# Prepare the data by generating an ID using the uniquely identifying information only # Prepare the data by generating an ID using the uniquely identifying
# and grouping them by this ID. # information only and grouping them by this ID.
prepare = ( prepare = (
clean_deduplicate clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID()) | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date" | "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID()) >> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date" | "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1]) >> beam.GroupByKey()
# | beam.Map(print)
) )
# Format the data into a dict. # Format the data into a dict.
@@ -267,19 +314,18 @@ def main():
) )
# Save the data to a .json file. # Save the data to a .json file.
output_file = ( (
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
)
output = (
formatted formatted
| "Combine into one PCollection" >> beam.combiners.ToList() | "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps)
| "Save to .json file" | "Save to .json file"
>> beam.io.WriteToText( >> beam.io.WriteToText(
file_path_prefix=str(output_file), file_path_prefix=known_args.output,
file_name_suffix=".json", file_name_suffix=".json",
) )
) )
if __name__ == "__main__": if __name__ == "__main__":
main() logging.getLogger().setLevel(logging.INFO)
run()

View File

@@ -1,5 +1,5 @@
# Full data set # Full data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P data/input # wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input
# Monthly update data set # Monthly update data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P data/input wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input

View File

@@ -0,0 +1,24 @@
# DataFlow
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
Export env variable:
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
Run the pipeline:
python -m analyse_properties.main \
--region europe-west2 \
--input gs://street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv \
--output gs://street-group-technical-test-dmot/output/pp-monthly-update-new-version \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot/tmp
## Errors
Unsubscriptable error on window:
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>

View File

@@ -20,6 +20,7 @@ pylint:
- super-init-not-called - super-init-not-called
- arguments-differ - arguments-differ
- inconsistent-return-statements - inconsistent-return-statements
- expression-not-assigned
enable: enable:
options: options: