diff --git a/analyse_properties/__init__.py b/analyse_properties/__init__.py index 88c30c7..d2a362d 100644 --- a/analyse_properties/__init__.py +++ b/analyse_properties/__init__.py @@ -5,10 +5,13 @@ Technical test for Street Group. """ import csv +from datetime import datetime +from pprint import pprint import hashlib import io from importlib import resources -from itertools import chain, islice +import itertools +import pathlib import apache_beam as beam from apache_beam.io import fileio @@ -21,7 +24,7 @@ def csv_reader(csv_file): def slice_by_range(element, *ranges): - return chain(*(islice(element, *r) for r in ranges)) + return itertools.chain(*(itertools.islice(element, *r) for r in ranges)) class DropRecordsSingleEmptyColumn(beam.DoFn): @@ -94,30 +97,103 @@ class RemoveUniqueID(beam.DoFn): class ConvertDataToDict(beam.DoFn): - @property - def dict_keys(self): - return [ - "price", - "transaction_date", - "postcode", - "number", - "flat_appartment", - "street", - "locality", - "town_city", - "district", - "county", - "building", - "property_id", + @staticmethod + def get_latest_transaction(transaction_dates): + transaction_dates = [ + datetime.strptime(individual_transaction, "%Y-%m-%d") + for individual_transaction in transaction_dates ] + return max(transaction_dates).strftime("%Y") + + @staticmethod + def get_readable_address(address_components: list, address_comparisons: list): + # Get pairwise comparison to see if two locality/town/district/counties + # are equivalent + pairwise_comparison = [ + x == y + for i, x in enumerate(address_comparisons) + for j, y in enumerate(address_comparisons) + if i > j + ] + # Create a mask to eliminate the redundant parts of the address + mask = [True, True, True, True] + if pairwise_comparison[0]: + mask[1] = False + if pairwise_comparison[1] or pairwise_comparison[2]: + mask[2] = False + if pairwise_comparison[3] or pairwise_comparison[4] or pairwise_comparison[5]: + mask[3] = False + # Apply the mask + applied_mask = list(itertools.compress(address_comparisons, mask)) + # Filter out empty items in list + deduplicated_address_part = list(filter(None, applied_mask)) + # print(deduplicated_address_part) + + # Filter out any missing parts of the address components + cleaned_address_components = list(filter(None, address_components)) + + # Return the readable address + return "\n".join( + itertools.chain.from_iterable( + [ + cleaned_address_components[0:-1], + deduplicated_address_part, + [cleaned_address_components[-1]], + ] + ) + ) def process(self, element): - pass + property_transactions = [ + { + "price": entry[0], + "transaction_date": entry[1].replace(" 00:00", ""), + "year": entry[1][0:4], + } + for entry in element[-1] + ] + + json_object = { + "property_id": element[0], + "readable_address": None, + "flat_appartment": element[-1][0][4], + "builing": element[-1][0][10], + "number": element[-1][0][3], + "street": element[-1][0][5], + "locality": element[-1][0][6], + "town": element[-1][0][7], + "district": element[-1][0][8], + "county": element[-1][0][9], + "postcode": element[-1][0][2], + "property_transactions": property_transactions, + "latest_transaction_year": self.get_latest_transaction( + [ + transaction["transaction_date"] + for transaction in property_transactions + ] + ), + } + + json_object["readable_address"] = self.get_readable_address( + [ + json_object["flat_appartment"], + json_object["builing"], + f'{json_object["number"]} {json_object["street"]}', + json_object["postcode"], + ], + [ + json_object["locality"], + json_object["town"], + json_object["district"], + json_object["county"], + ], + ) + yield json_object def main(): csv_data = resources.path( - "analyse_properties.data", + "analyse_properties.data.input", "pp-monthly-update-new-version.csv" # "analyse_properties.data", "pp-complete.csv" ) @@ -160,7 +236,6 @@ def main(): >> beam.GroupBy(lambda element: element[-1]) | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByGroup()) - # | beam.Map(print) ) # Prepare the data @@ -171,15 +246,19 @@ def main(): >> beam.ParDo(GenerateUniqueID()) | "Group by the ID ignoring price & date" >> beam.GroupBy(lambda element: element[-1]) - | beam.Map(print) ) # Format the data formatted = ( prepare - # | "Convert list to dict object" >> + | "Convert the prepared data into a dict object" + >> beam.ParDo(ConvertDataToDict()) + | beam.Map(pprint) ) + # Save the data + with resources.path("analyse_properties.data.output") + if __name__ == "__main__": main()