adding latest beam pipeline code for dataflow

This commit is contained in:
2021-09-27 21:17:39 +01:00
parent cad6612ebe
commit f9eeb8bfad

View File

@@ -9,6 +9,7 @@ import pathlib
import apache_beam as beam import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from analyse_properties.debug import * # noqa
def slice_by_range(element, *ranges): def slice_by_range(element, *ranges):
""" """
@@ -36,7 +37,7 @@ class DropRecordsSingleEmptyColumn(beam.DoFn):
None: If the length of the column is 0, drop the element. None: If the length of the column is 0, drop the element.
Yields: Yields:
element: If the length of the column isn't 0, keep the element. element: If the length of the column is >0, keep the element.
""" """
def __init__(self, index): def __init__(self, index):
@@ -61,7 +62,7 @@ class DropRecordsTwoEmptyColumn(beam.DoFn):
None: If the length of both columns is 0, drop the element. None: If the length of both columns is 0, drop the element.
Yields: Yields:
element: If the length of both columns isn't 0, keep the element. element: If the length of both columns is >0, keep the element.
""" """
def __init__(self, index_0, index_1): def __init__(self, index_0, index_1):
@@ -90,14 +91,15 @@ class SplitColumn(beam.DoFn):
self.split_char = split_char self.split_char = split_char
def process(self, element): def process(self, element):
# If there is a split based on the split_char, then keep the first result in # If there is a split based on the split_char, then keep the second result in
# place and append the second column at the end. # place (street number) and append the first result (building) at the end.
try: try:
part_0, part_1 = element[self.index].split(self.split_char) part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip() element[self.index] = part_1.strip()
element.append(part_0.strip()) element.append(part_0.strip())
yield element yield element
except ValueError: except ValueError:
# append a blank column to keep column numbers consistent.
element.append("") element.append("")
yield element yield element
@@ -111,20 +113,11 @@ class CreateMappingTable(beam.DoFn):
The table is used to populate the raw property data after a GroupByKey using The table is used to populate the raw property data after a GroupByKey using
only the IDs in order to reduce the amount of data processed in the GroupByKey operation. only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
Args:
all_columns(bool): If True will use all fields to calculate the ID. If false
will exclude the first two.
""" """
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element): def process(self, element):
# Join the row into a string. # Join the row into a string.
unique_string = ( unique_string = ",".join(element)
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
# Hash the string. # Hash the string.
hashed_string = hashlib.md5(unique_string.encode()) hashed_string = hashlib.md5(unique_string.encode())
# Format the resulting PCollection with the key of id and value of raw data. # Format the resulting PCollection with the key of id and value of raw data.
@@ -148,6 +141,8 @@ class CreateUniquePropertyID(beam.DoFn):
class DeduplicateIDs(beam.DoFn): class DeduplicateIDs(beam.DoFn):
"""Deduplicate a list of IDs."""
def process(self, element): def process(self, element):
deduplicated_list = list(set(element[-1])) deduplicated_list = list(set(element[-1]))
new_element = (element[0], deduplicated_list) new_element = (element[0], deduplicated_list)
@@ -155,6 +150,16 @@ class DeduplicateIDs(beam.DoFn):
def insert_data_for_id(element, mapping_table): def insert_data_for_id(element, mapping_table):
"""
Replace the ID with the raw data from the mapping table.
Args:
element: The element.
mapping_table (dict): The mapping table.
Yields:
The element with IDs replaced with raw data.
"""
replaced_list = [mapping_table[data_id] for data_id in element[-1]] replaced_list = [mapping_table[data_id] for data_id in element[-1]]
new_element = (element[0], replaced_list) new_element = (element[0], replaced_list)
yield new_element yield new_element
@@ -165,7 +170,15 @@ class ConvertDataToDict(beam.DoFn):
@staticmethod @staticmethod
def get_latest_transaction(transaction_dates): def get_latest_transaction(transaction_dates):
"""Get the date of the latest transaction.""" """
Get the date of the latest transaction for a list of dates.
Args:
transaction_dates (str): A date in the form "%Y-%m-%d".
Returns:
str: The year in the form "%Y" of the latest transaction date.
"""
transaction_dates = [ transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d") datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates for individual_transaction in transaction_dates
@@ -173,7 +186,17 @@ class ConvertDataToDict(beam.DoFn):
return max(transaction_dates).strftime("%Y") return max(transaction_dates).strftime("%Y")
@staticmethod @staticmethod
def get_readable_address(address_components: list, address_comparisons: list): def get_readable_address(address_components, address_comparisons):
"""
Create a human readable address from the locality/town/district/county columns.
Args:
address_components (list): The preceeding parts of the address (street, postcode etc.)
address_comparisons (list): The locality/town/district/county.
Returns:
str: The complete address deduplicated & cleaned.
"""
# Get pairwise comparison to see if two locality/town/district/counties # Get pairwise comparison to see if two locality/town/district/counties
# are equivalent # are equivalent
pairwise_comparison = [ pairwise_comparison = [
@@ -194,7 +217,6 @@ class ConvertDataToDict(beam.DoFn):
applied_mask = list(itertools.compress(address_comparisons, mask)) applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list # Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask)) deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components # Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components)) cleaned_address_components = list(filter(None, address_components))
@@ -213,9 +235,9 @@ class ConvertDataToDict(beam.DoFn):
# Group together all the transactions for the property. # Group together all the transactions for the property.
property_transactions = [ property_transactions = [
{ {
"price": entry[0], "price": int(entry[0]),
"transaction_date": entry[1].replace(" 00:00", ""), "transaction_date": entry[1].replace(" 00:00", ""),
"year": entry[1][0:4], "year": int(entry[1][0:4]),
} }
for entry in element[-1] for entry in element[-1]
] ]
@@ -234,12 +256,12 @@ class ConvertDataToDict(beam.DoFn):
"county": list(element[-1])[0][9], "county": list(element[-1])[0][9],
"postcode": list(element[-1])[0][2], "postcode": list(element[-1])[0][2],
"property_transactions": property_transactions, "property_transactions": property_transactions,
"latest_transaction_year": self.get_latest_transaction( "latest_transaction_year": int(self.get_latest_transaction(
[ [
transaction["transaction_date"] transaction["transaction_date"]
for transaction in property_transactions for transaction in property_transactions
] ]
), )),
} }
# Create a human readable address to go in the dict. # Create a human readable address to go in the dict.
@@ -262,6 +284,8 @@ class ConvertDataToDict(beam.DoFn):
def run(argv=None, save_main_session=True): def run(argv=None, save_main_session=True):
"""Entrypoint and definition of the pipeline.""" """Entrypoint and definition of the pipeline."""
logging.getLogger().setLevel(logging.INFO)
# Default input/output files # Default input/output files
input_file = ( input_file = (
pathlib.Path(__file__).parents[1] pathlib.Path(__file__).parents[1]
@@ -281,10 +305,16 @@ def run(argv=None, save_main_session=True):
# Arguments # Arguments
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
"--input", dest="input", default=str(input_file), help="Input file." "--input",
dest="input",
default=str(input_file),
help="Full path to the input file.",
) )
parser.add_argument( parser.add_argument(
"--output", dest="output", default=str(output_file), help="Output file." "--output",
dest="output",
default=str(output_file),
help="Full path to the output file without extension.",
) )
known_args, pipeline_args = parser.parse_known_args(argv) known_args, pipeline_args = parser.parse_known_args(argv)
@@ -292,7 +322,6 @@ def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(pipeline_args) pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# Load in the data from a csv file.
with beam.Pipeline(options=pipeline_options) as pipeline: with beam.Pipeline(options=pipeline_options) as pipeline:
# Load the data # Load the data
load = ( load = (
@@ -303,7 +332,7 @@ def run(argv=None, save_main_session=True):
>> beam.Map(lambda element: [el.strip('"') for el in element]) >> beam.Map(lambda element: [el.strip('"') for el in element])
) )
# Clean the data by dropping unneeded rows. # Clean the data.
clean_drop = ( clean_drop = (
load load
| "Drop unneeded columns" | "Drop unneeded columns"
@@ -323,19 +352,22 @@ def run(argv=None, save_main_session=True):
mapping_table_raw = ( mapping_table_raw = (
clean_drop clean_drop
| "Create a mapping table with key of id_all_columns and value of cleaned data." | "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable(all_columns=True)) >> beam.ParDo(CreateMappingTable())
) )
# Condense mapping table into a single dict.
mapping_table_condensed = ( mapping_table_condensed = (
mapping_table_raw mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict() | "Condense mapping table into single dict" >> beam.combiners.ToDict()
) )
# Prepare the data by creating IDs, grouping together and using mapping table
# to reinsert raw data.
prepared = ( prepared = (
mapping_table_raw mapping_table_raw
| "Create unique ID ignoring price & date" | "Create unique ID ignoring price & date"
>> beam.ParDo(CreateUniquePropertyID()) >> beam.ParDo(CreateUniquePropertyID())
| "Group IDs using all columns by IDs ignoring price & date" | "Group by ID"
>> beam.GroupByKey() >> beam.GroupByKey()
| "Deduplicate to eliminate repeated transactions" | "Deduplicate to eliminate repeated transactions"
>> beam.ParDo(DeduplicateIDs()) >> beam.ParDo(DeduplicateIDs())
@@ -356,7 +388,7 @@ def run(argv=None, save_main_session=True):
( (
formatted formatted
| "Combine into one PCollection" >> beam.combiners.ToList() | "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps) | "Format output" >> beam.Map(json.dumps, indent=2)
| "Save to .json file" | "Save to .json file"
>> beam.io.WriteToText( >> beam.io.WriteToText(
file_path_prefix=known_args.output, file_path_prefix=known_args.output,