diff --git a/analyse_properties/main.py b/analyse_properties/main.py index d2bce05..969a09d 100644 --- a/analyse_properties/main.py +++ b/analyse_properties/main.py @@ -9,6 +9,7 @@ import pathlib import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from analyse_properties.debug import * # noqa def slice_by_range(element, *ranges): """ @@ -36,7 +37,7 @@ class DropRecordsSingleEmptyColumn(beam.DoFn): None: If the length of the column is 0, drop the element. Yields: - element: If the length of the column isn't 0, keep the element. + element: If the length of the column is >0, keep the element. """ def __init__(self, index): @@ -61,7 +62,7 @@ class DropRecordsTwoEmptyColumn(beam.DoFn): None: If the length of both columns is 0, drop the element. Yields: - element: If the length of both columns isn't 0, keep the element. + element: If the length of both columns is >0, keep the element. """ def __init__(self, index_0, index_1): @@ -90,14 +91,15 @@ class SplitColumn(beam.DoFn): self.split_char = split_char def process(self, element): - # If there is a split based on the split_char, then keep the first result in - # place and append the second column at the end. + # If there is a split based on the split_char, then keep the second result in + # place (street number) and append the first result (building) at the end. try: part_0, part_1 = element[self.index].split(self.split_char) element[self.index] = part_1.strip() element.append(part_0.strip()) yield element except ValueError: + # append a blank column to keep column numbers consistent. element.append("") yield element @@ -111,20 +113,11 @@ class CreateMappingTable(beam.DoFn): The table is used to populate the raw property data after a GroupByKey using only the IDs in order to reduce the amount of data processed in the GroupByKey operation. - - Args: - all_columns(bool): If True will use all fields to calculate the ID. If false - will exclude the first two. """ - def __init__(self, all_columns=False): - self.all_columns = all_columns - def process(self, element): # Join the row into a string. - unique_string = ( - ",".join(element[2:]) if not self.all_columns else ",".join(element) - ) + unique_string = ",".join(element) # Hash the string. hashed_string = hashlib.md5(unique_string.encode()) # Format the resulting PCollection with the key of id and value of raw data. @@ -148,6 +141,8 @@ class CreateUniquePropertyID(beam.DoFn): class DeduplicateIDs(beam.DoFn): + """Deduplicate a list of IDs.""" + def process(self, element): deduplicated_list = list(set(element[-1])) new_element = (element[0], deduplicated_list) @@ -155,6 +150,16 @@ class DeduplicateIDs(beam.DoFn): def insert_data_for_id(element, mapping_table): + """ + Replace the ID with the raw data from the mapping table. + + Args: + element: The element. + mapping_table (dict): The mapping table. + + Yields: + The element with IDs replaced with raw data. + """ replaced_list = [mapping_table[data_id] for data_id in element[-1]] new_element = (element[0], replaced_list) yield new_element @@ -165,7 +170,15 @@ class ConvertDataToDict(beam.DoFn): @staticmethod def get_latest_transaction(transaction_dates): - """Get the date of the latest transaction.""" + """ + Get the date of the latest transaction for a list of dates. + + Args: + transaction_dates (str): A date in the form "%Y-%m-%d". + + Returns: + str: The year in the form "%Y" of the latest transaction date. + """ transaction_dates = [ datetime.strptime(individual_transaction, "%Y-%m-%d") for individual_transaction in transaction_dates @@ -173,7 +186,17 @@ class ConvertDataToDict(beam.DoFn): return max(transaction_dates).strftime("%Y") @staticmethod - def get_readable_address(address_components: list, address_comparisons: list): + def get_readable_address(address_components, address_comparisons): + """ + Create a human readable address from the locality/town/district/county columns. + + Args: + address_components (list): The preceeding parts of the address (street, postcode etc.) + address_comparisons (list): The locality/town/district/county. + + Returns: + str: The complete address deduplicated & cleaned. + """ # Get pairwise comparison to see if two locality/town/district/counties # are equivalent pairwise_comparison = [ @@ -194,7 +217,6 @@ class ConvertDataToDict(beam.DoFn): applied_mask = list(itertools.compress(address_comparisons, mask)) # Filter out empty items in list deduplicated_address_part = list(filter(None, applied_mask)) - # Filter out any missing parts of the address components cleaned_address_components = list(filter(None, address_components)) @@ -213,9 +235,9 @@ class ConvertDataToDict(beam.DoFn): # Group together all the transactions for the property. property_transactions = [ { - "price": entry[0], + "price": int(entry[0]), "transaction_date": entry[1].replace(" 00:00", ""), - "year": entry[1][0:4], + "year": int(entry[1][0:4]), } for entry in element[-1] ] @@ -234,12 +256,12 @@ class ConvertDataToDict(beam.DoFn): "county": list(element[-1])[0][9], "postcode": list(element[-1])[0][2], "property_transactions": property_transactions, - "latest_transaction_year": self.get_latest_transaction( + "latest_transaction_year": int(self.get_latest_transaction( [ transaction["transaction_date"] for transaction in property_transactions ] - ), + )), } # Create a human readable address to go in the dict. @@ -262,6 +284,8 @@ class ConvertDataToDict(beam.DoFn): def run(argv=None, save_main_session=True): """Entrypoint and definition of the pipeline.""" + logging.getLogger().setLevel(logging.INFO) + # Default input/output files input_file = ( pathlib.Path(__file__).parents[1] @@ -281,10 +305,16 @@ def run(argv=None, save_main_session=True): # Arguments parser = argparse.ArgumentParser() parser.add_argument( - "--input", dest="input", default=str(input_file), help="Input file." + "--input", + dest="input", + default=str(input_file), + help="Full path to the input file.", ) parser.add_argument( - "--output", dest="output", default=str(output_file), help="Output file." + "--output", + dest="output", + default=str(output_file), + help="Full path to the output file without extension.", ) known_args, pipeline_args = parser.parse_known_args(argv) @@ -292,7 +322,6 @@ def run(argv=None, save_main_session=True): pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - # Load in the data from a csv file. with beam.Pipeline(options=pipeline_options) as pipeline: # Load the data load = ( @@ -303,7 +332,7 @@ def run(argv=None, save_main_session=True): >> beam.Map(lambda element: [el.strip('"') for el in element]) ) - # Clean the data by dropping unneeded rows. + # Clean the data. clean_drop = ( load | "Drop unneeded columns" @@ -323,19 +352,22 @@ def run(argv=None, save_main_session=True): mapping_table_raw = ( clean_drop | "Create a mapping table with key of id_all_columns and value of cleaned data." - >> beam.ParDo(CreateMappingTable(all_columns=True)) + >> beam.ParDo(CreateMappingTable()) ) + # Condense mapping table into a single dict. mapping_table_condensed = ( mapping_table_raw | "Condense mapping table into single dict" >> beam.combiners.ToDict() ) + # Prepare the data by creating IDs, grouping together and using mapping table + # to reinsert raw data. prepared = ( mapping_table_raw | "Create unique ID ignoring price & date" >> beam.ParDo(CreateUniquePropertyID()) - | "Group IDs using all columns by IDs ignoring price & date" + | "Group by ID" >> beam.GroupByKey() | "Deduplicate to eliminate repeated transactions" >> beam.ParDo(DeduplicateIDs()) @@ -356,7 +388,7 @@ def run(argv=None, save_main_session=True): ( formatted | "Combine into one PCollection" >> beam.combiners.ToList() - | "Format output" >> beam.Map(json.dumps) + | "Format output" >> beam.Map(json.dumps, indent=2) | "Save to .json file" >> beam.io.WriteToText( file_path_prefix=known_args.output,