Merge branch 'wip/dataflow_refactor_group' into develop

This commit is contained in:
2021-09-27 21:18:48 +01:00
11 changed files with 397 additions and 83 deletions

View File

@@ -22,3 +22,9 @@ class DebugShowColumnWithValueIn(beam.DoFn):
if self.value in column:
yield element
return None
class DebugPrint(beam.DoFn):
def process(self, element):
print(element)
yield element

View File

@@ -1,20 +1,44 @@
import argparse
from datetime import datetime
import hashlib
import itertools
import json
import logging
import pathlib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
from analyse_properties.debug import * # noqa
def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges."""
"""
Slice a list with multiple ranges.
Args:
element : The element.
*ranges (tuple): Tuples containing a start,end index to slice the element.
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
Returns:
list: The list sliced by the ranges
"""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
"""If a given item in a list is empty, drop this entry from the PCollection."""
"""
Drop the entire row if a given column is empty.
Args:
index : The index of the column in the list.
Returns:
None: If the length of the column is 0, drop the element.
Yields:
element: If the length of the column is >0, keep the element.
"""
def __init__(self, index):
self.index = index
@@ -27,7 +51,19 @@ class DropRecordsSingleEmptyColumn(beam.DoFn):
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""If two given items in a list are both empty, drop this entry from the PCollection."""
"""
Drop the entire row if both of two given columns are empty.
Args:
index_0 : The index of the first column in the list.
index_1 : The index of the second column in the list.
Returns:
None: If the length of both columns is 0, drop the element.
Yields:
element: If the length of both columns is >0, keep the element.
"""
def __init__(self, index_0, index_1):
self.index_0 = index_0
@@ -42,65 +78,91 @@ class DropRecordsTwoEmptyColumn(beam.DoFn):
class SplitColumn(beam.DoFn):
"""Split an item in a list into two separate items in the PCollection."""
"""
Split one column into two columns by a character.
Args:
index : The index of the column in the list.
split_char: The character to split the column by.
"""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
# If there is a split based on the split_char, then keep the second result in
# place (street number) and append the first result (building) at the end.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
# append a blank column to keep column numbers consistent.
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
class CreateMappingTable(beam.DoFn):
"""
Create a mapping table to be used as a side-input.
def __init__(self, all_columns=False):
self.all_columns = all_columns
This mapping table has a key of an ID generated across all columns and a value of
the raw property data.
The table is used to populate the raw property data after a GroupByKey using
only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
"""
def process(self, element):
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
# Join the row into a string.
unique_string = ",".join(element)
# Hash the string.
hashed_string = hashlib.md5(unique_string.encode())
# append the hash to the end
element.append(hashed_string.hexdigest())
yield element
# Format the resulting PCollection with the key of id and value of raw data.
new_element = (hashed_string.hexdigest(), list(element))
yield new_element
class DeduplicateByID(beam.DoFn):
class CreateUniquePropertyID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
Create a unique property ID which does not include the price and date of sale.
Uses each row of the mapping table to create a PCollection with a key of the
unique property ID and a value of the ID generated across all columns.
"""
def process(self, element):
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
unique_string = ",".join(element[-1][2:])
hashed_string = hashlib.md5(unique_string.encode())
new_element = (hashed_string.hexdigest(), element[0])
yield new_element
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
class DeduplicateIDs(beam.DoFn):
"""Deduplicate a list of IDs."""
def process(self, element):
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
deduplicated_list = list(set(element[-1]))
new_element = (element[0], deduplicated_list)
yield new_element
def insert_data_for_id(element, mapping_table):
"""
Replace the ID with the raw data from the mapping table.
Args:
element: The element.
mapping_table (dict): The mapping table.
Yields:
The element with IDs replaced with raw data.
"""
replaced_list = [mapping_table[data_id] for data_id in element[-1]]
new_element = (element[0], replaced_list)
yield new_element
class ConvertDataToDict(beam.DoFn):
@@ -108,7 +170,15 @@ class ConvertDataToDict(beam.DoFn):
@staticmethod
def get_latest_transaction(transaction_dates):
"""Get the date of the latest transaction."""
"""
Get the date of the latest transaction for a list of dates.
Args:
transaction_dates (str): A date in the form "%Y-%m-%d".
Returns:
str: The year in the form "%Y" of the latest transaction date.
"""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
@@ -116,7 +186,17 @@ class ConvertDataToDict(beam.DoFn):
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
def get_readable_address(address_components, address_comparisons):
"""
Create a human readable address from the locality/town/district/county columns.
Args:
address_components (list): The preceeding parts of the address (street, postcode etc.)
address_comparisons (list): The locality/town/district/county.
Returns:
str: The complete address deduplicated & cleaned.
"""
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
@@ -137,7 +217,6 @@ class ConvertDataToDict(beam.DoFn):
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
@@ -156,9 +235,9 @@ class ConvertDataToDict(beam.DoFn):
# Group together all the transactions for the property.
property_transactions = [
{
"price": entry[0],
"price": int(entry[0]),
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
"year": int(entry[1][0:4]),
}
for entry in element[-1]
]
@@ -167,22 +246,22 @@ class ConvertDataToDict(beam.DoFn):
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"flat_appartment": list(element[-1])[0][4],
"builing": list(element[-1])[0][10],
"number": list(element[-1])[0][3],
"street": list(element[-1])[0][5],
"locality": list(element[-1])[0][6],
"town": list(element[-1])[0][7],
"district": list(element[-1])[0][8],
"county": list(element[-1])[0][9],
"postcode": list(element[-1])[0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
"latest_transaction_year": int(self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
)),
}
# Create a human readable address to go in the dict.
@@ -203,26 +282,57 @@ class ConvertDataToDict(beam.DoFn):
yield json_object
def main():
# Load in the data from a csv file.
def run(argv=None, save_main_session=True):
"""Entrypoint and definition of the pipeline."""
logging.getLogger().setLevel(logging.INFO)
# Default input/output files
input_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-monthly-update-new-version.csv"
/ "pp-2020.csv"
# / "pp-complete.csv"
)
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-2020"
# / "pp-complete"
)
with beam.Pipeline() as pipeline:
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default=str(input_file),
help="Full path to the input file.",
)
parser.add_argument(
"--output",
dest="output",
default=str(output_file),
help="Full path to the output file without extension.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options. save_main_session needed for DataFlow for global imports.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
# Load the data
load = (
pipeline
| "Read input data" >> beam.io.ReadFromText(str(input_file))
| "Read input data" >> beam.io.ReadFromText(known_args.input)
| "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element])
)
# Clean the data by dropping unneeded rows.
# Clean the data.
clean_drop = (
load
| "Drop unneeded columns"
@@ -238,48 +348,55 @@ def main():
>> beam.ParDo(SplitColumn(3, ","))
)
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = (
# Create a mapping table
mapping_table_raw = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
| "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable())
)
# Prepare the data by generating an ID using the uniquely identifying information only
# and grouping them by this ID.
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1])
# Condense mapping table into a single dict.
mapping_table_condensed = (
mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
)
# Prepare the data by creating IDs, grouping together and using mapping table
# to reinsert raw data.
prepared = (
mapping_table_raw
| "Create unique ID ignoring price & date"
>> beam.ParDo(CreateUniquePropertyID())
| "Group by ID"
>> beam.GroupByKey()
| "Deduplicate to eliminate repeated transactions"
>> beam.ParDo(DeduplicateIDs())
| "Insert the raw data using the mapping table"
>> beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
)
# Format the data into a dict.
formatted = (
prepare
prepared
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# Save the data to a .json file.
output_file = (
pathlib.Path(__file__).parents[1] / "data" / "output" / "pp-complete"
)
output = (
(
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps, indent=2)
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=str(output_file),
file_path_prefix=known_args.output,
file_name_suffix=".json",
)
)
if __name__ == "__main__":
main()
logging.getLogger().setLevel(logging.INFO)
run()

View File

@@ -1,5 +1,8 @@
# Full data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-complete.csv -P data/input
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input
# Monthly update data set
wget https://storage.googleapis.com/street-group-technical-test-dmot/input/pp-monthly-update-new-version.csv -P data/input
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input
# 2020 data set
wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input

View File

@@ -1,13 +1,16 @@
from importlib import resources
import pathlib
import pandas as pd
from pandas_profiling import ProfileReport
def main():
with resources.path("analyse_properties.data", "pp-complete.csv") as csv_file:
input_file = (
pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-complete.csv"
)
with input_file.open() as csv:
df_report = pd.read_csv(
csv_file,
csv,
names=[
"transaction_id",
"price",

View File

@@ -0,0 +1,93 @@
# DataFlow
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
## Examples
Full example of beam pipeline on dataflow:
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset>
## Setup
Export env variable:
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
## Run pipeline
### Dataflow
#### Yearly dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-2
```
#### Full dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-complete.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-complete \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-8 \
--num_workers=3 \
--autoscaling_algorithm=NONE
```
### Locally
Run the pipeline locally:
`python -m analyse_properties.main --runner DirectRunner`
## Errors
Unsubscriptable error on window:
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
## Documentation
Running in its own private VPC without public IPs
- <https://stackoverflow.com/questions/58893082/which-compute-engine-quotas-need-to-be-updated-to-run-dataflow-with-50-workers>
- <https://cloud.google.com/dataflow/docs/guides/specifying-networks#subnetwork_parameter>
Error help
- <https://cloud.google.com/dataflow/docs/guides/common-errors>
- <https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline>
Scaling
Using DataFlowPrime: <https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime#enable-prime>
Use `--experiments=enable_prime`
Deploying a pipeline (with scaling options): <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline>
Available VM types (with pricing): <https://cloud.google.com/compute/vm-instance-pricing#n1_predefined>
Performance
Sideinput performance: <https://stackoverflow.com/questions/48242320/google-dataflow-apache-beam-python-side-input-from-pcollection-kills-perform>
Common use cases:
- Part 1 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
- Part 2 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2>

27
notes/tmp/errordata Normal file
View File

@@ -0,0 +1,27 @@
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 181, in execute
op.finish()
File "dataflow_worker/native_operations.py", line 93, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 94, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 95, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/nativeavroio.py", line 308, in __exit__
self._data_file_writer.flush()
File "fastavro/_write.pyx", line 664, in fastavro._write.Writer.flush
File "fastavro/_write.pyx", line 639, in fastavro._write.Writer.dump
File "fastavro/_write.pyx", line 451, in fastavro._write.snappy_write_block
File "fastavro/_write.pyx", line 458, in fastavro._write.snappy_write_block
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystemio.py", line 200, in write
self._uploader.put(b)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py", line 720, in put
self._conn.send_bytes(data.tobytes())
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
"
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"

44
notes/tmp/exampledata Normal file
View File

@@ -0,0 +1,44 @@
[{
"property_id": "3cf3c06632c46754696f2017933702f3",
"flat_appartment": "",
"builing": "",
"number": "63",
"street": "ROTTON PARK STREET",
"locality": "",
"town": "BIRMINGHAM",
"district": "BIRMINGHAM",
"county": "WEST MIDLANDS",
"postcode": "B16 0AE",
"property_transactions": [
{ "price": "385000", "transaction_date": "2021-01-08", "year": "2021" },
{ "price": "701985", "transaction_date": "2019-03-28", "year": "2019" },
{ "price": "1748761", "transaction_date": "2020-05-27", "year": "2020" }
],
"latest_transaction_year": "2021"
},
{
"property_id": "c650d5d7bb0daf0a19bb2cacabbee74e",
"readable_address": "16 STATION ROAD\nPARKGATE\nNESTON\nCHESHIRE WEST AND CHESTER\nCH64 6QJ",
"flat_appartment": "",
"builing": "",
"number": "16",
"street": "STATION ROAD",
"locality": "PARKGATE",
"town": "NESTON",
"district": "CHESHIRE WEST AND CHESTER",
"county": "CHESHIRE WEST AND CHESTER",
"postcode": "CH64 6QJ",
"property_transactions": [
{
"price": "280000",
"transaction_date": "2020-11-30",
"year": "2020"
},
{
"price": "265000",
"transaction_date": "2020-05-29",
"year": "2020"
}
],
"latest_transaction_year": "2020"
}]

16
notes/tmp/runningdata Normal file
View File

@@ -0,0 +1,16 @@
Create Mapping table
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
Condensing
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
Prepared
GroupByKey
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
deduplicated
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])

View File

@@ -20,6 +20,8 @@ pylint:
- super-init-not-called
- arguments-differ
- inconsistent-return-statements
- expression-not-assigned
- line-too-long
enable:
options:

View File

@@ -18,3 +18,6 @@ pandas-profiling = "^3.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
"analyse-properties" = "analyse_properties.main:run"