migrate beam pipeline to main.py

This commit is contained in:
2021-09-26 01:10:09 +01:00
parent 54cf5e3e36
commit 9fdc6dce05
2 changed files with 293 additions and 259 deletions

View File

@@ -3,262 +3,3 @@ Daniel Tomlinson (dtomlinson@panaetius.co.uk).
Technical test for Street Group. Technical test for Street Group.
""" """
import csv
from datetime import datetime
from pprint import pprint
import hashlib
import io
from importlib import resources
import itertools
import pathlib
import apache_beam as beam
from apache_beam.io import fileio
from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
return csv.reader(io.TextIOWrapper(csv_file.open()))
def slice_by_range(element, *ranges):
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element):
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode())
element.append(hashed_string.hexdigest())
yield element
class DeduplicateByGroup(beam.DoFn):
def process(self, element):
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
def process(self, element):
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
class ConvertDataToDict(beam.DoFn):
@staticmethod
def get_latest_transaction(transaction_dates):
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# print(deduplicated_address_part)
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element):
property_transactions = [
{
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
}
for entry in element[-1]
]
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
}
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
def main():
csv_data = resources.path(
"analyse_properties.data.input",
"pp-monthly-update-new-version.csv"
# "analyse_properties.data", "pp-complete.csv"
)
with beam.Pipeline() as pipeline:
# Load the data
with csv_data as csv_data_file:
# https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
load = (
pipeline
| fileio.MatchFiles(str(csv_data_file))
| fileio.ReadMatches()
| beam.FlatMap(csv_reader)
)
# Clean the data
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(3, ","))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "AL1 4SZ"))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns"
>> beam.ParDo(DeduplicateByGroup())
)
# Prepare the data
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1])
)
# Format the data
formatted = (
prepare
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
| beam.Map(pprint)
)
# Save the data
with resources.path("analyse_properties.data.output")
if __name__ == "__main__":
main()

293
analyse_properties/main.py Normal file
View File

@@ -0,0 +1,293 @@
import csv
from datetime import datetime
import hashlib
import io
from importlib import resources
import itertools
import pathlib
import apache_beam as beam
from apache_beam.io import fileio
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
"""Read in a csv file."""
return csv.reader(io.TextIOWrapper(csv_file.open()))
def slice_by_range(element, *ranges):
"""Slice a list with multiple ranges."""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
"""If a given item in a list is empty, drop this entry from the PCollection."""
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
return None
yield element
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""If two given items in a list are both empty, drop this entry from the PCollection."""
def __init__(self, index_0, index_1):
self.index_0 = index_0
self.index_1 = index_1
def process(self, element):
column_0 = element[self.index_0]
column_1 = element[self.index_1]
if len(column_0) == 0 and len(column_1) == 0:
return None
yield element
class SplitColumn(beam.DoFn):
"""Split an item in a list into two separate items in the PCollection."""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
element.append("")
yield element
class GenerateUniqueID(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
"""
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element):
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode())
# append the hash to the end
element.append(hashed_string.hexdigest())
yield element
class DeduplicateByID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
"""
def process(self, element):
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
def process(self, element):
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
class ConvertDataToDict(beam.DoFn):
"""Convert the processed data into a dict to be exported as a JSON object."""
@staticmethod
def get_latest_transaction(transaction_dates):
"""Get the date of the latest transaction."""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element):
# Group together all the transactions for the property.
property_transactions = [
{
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
}
for entry in element[-1]
]
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
}
# Create a human readable address to go in the dict.
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
def main():
# Load in the data from a csv file.
csv_data = resources.path(
# "analyse_properties.data.input",
# "pp-monthly-update-new-version.csv"
"analyse_properties.data.input", "pp-complete.csv"
)
with beam.Pipeline() as pipeline:
# Load the data
with csv_data as csv_data_file:
# https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
load = (
pipeline
| fileio.MatchFiles(str(csv_data_file))
| fileio.ReadMatches()
| beam.FlatMap(csv_reader)
)
# Clean the data by dropping unneeded rows.
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
clean_deduplicate = (
clean_drop
| "Generate unique ID for all columns"
>> beam.ParDo(GenerateUniqueID(all_columns=True))
| "Group by the ID for all columns"
>> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
)
# Prepare the data by generating an ID using the uniquely identifying information only
# and grouping them by this ID.
prepare = (
clean_deduplicate
| "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
| "Generate unique ID ignoring price & date"
>> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1])
)
# Format the data into a dict.
formatted = (
prepare
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# Save the data to a .json file.
output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
output = (
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=str(output_file),
file_name_suffix=".json",
shard_name_template="",
)
)
if __name__ == "__main__":
main()