adding latest beam pipeline code

This commit is contained in:
2021-09-25 22:15:58 +01:00
parent adfbd8e93d
commit 2e42a453b0

View File

@@ -5,10 +5,13 @@ Technical test for Street Group.
""" """
import csv import csv
from datetime import datetime
from pprint import pprint
import hashlib import hashlib
import io import io
from importlib import resources from importlib import resources
from itertools import chain, islice import itertools
import pathlib
import apache_beam as beam import apache_beam as beam
from apache_beam.io import fileio from apache_beam.io import fileio
@@ -21,7 +24,7 @@ def csv_reader(csv_file):
def slice_by_range(element, *ranges): def slice_by_range(element, *ranges):
return chain(*(islice(element, *r) for r in ranges)) return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn): class DropRecordsSingleEmptyColumn(beam.DoFn):
@@ -94,30 +97,103 @@ class RemoveUniqueID(beam.DoFn):
class ConvertDataToDict(beam.DoFn): class ConvertDataToDict(beam.DoFn):
@property @staticmethod
def dict_keys(self): def get_latest_transaction(transaction_dates):
return [ transaction_dates = [
"price", datetime.strptime(individual_transaction, "%Y-%m-%d")
"transaction_date", for individual_transaction in transaction_dates
"postcode",
"number",
"flat_appartment",
"street",
"locality",
"town_city",
"district",
"county",
"building",
"property_id",
] ]
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
x == y
for i, x in enumerate(address_comparisons)
for j, y in enumerate(address_comparisons)
if i > j
]
# Create a mask to eliminate the redundant parts of the address
mask = [True, True, True, True]
if pairwise_comparison[0]:
mask[1] = False
if pairwise_comparison[1] or pairwise_comparison[2]:
mask[2] = False
if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]:
mask[3] = False
# Apply the mask
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# print(deduplicated_address_part)
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
# Return the readable address
return "\n".join(
itertools.chain.from_iterable(
[
cleaned_address_components[0:-1],
deduplicated_address_part,
[cleaned_address_components[-1]],
]
)
)
def process(self, element): def process(self, element):
pass property_transactions = [
{
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4],
}
for entry in element[-1]
]
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
),
}
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
def main(): def main():
csv_data = resources.path( csv_data = resources.path(
"analyse_properties.data", "analyse_properties.data.input",
"pp-monthly-update-new-version.csv" "pp-monthly-update-new-version.csv"
# "analyse_properties.data", "pp-complete.csv" # "analyse_properties.data", "pp-complete.csv"
) )
@@ -160,7 +236,6 @@ def main():
>> beam.GroupBy(lambda element: element[-1]) >> beam.GroupBy(lambda element: element[-1])
| "Deduplicate by the ID for all columns" | "Deduplicate by the ID for all columns"
>> beam.ParDo(DeduplicateByGroup()) >> beam.ParDo(DeduplicateByGroup())
# | beam.Map(print)
) )
# Prepare the data # Prepare the data
@@ -171,15 +246,19 @@ def main():
>> beam.ParDo(GenerateUniqueID()) >> beam.ParDo(GenerateUniqueID())
| "Group by the ID ignoring price & date" | "Group by the ID ignoring price & date"
>> beam.GroupBy(lambda element: element[-1]) >> beam.GroupBy(lambda element: element[-1])
| beam.Map(print)
) )
# Format the data # Format the data
formatted = ( formatted = (
prepare prepare
# | "Convert list to dict object" >> | "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
| beam.Map(pprint)
) )
# Save the data
with resources.path("analyse_properties.data.output")
if __name__ == "__main__": if __name__ == "__main__":
main() main()