adding latest beam pipeline code for dataflow with group optimisation

This commit is contained in:
2021-09-27 03:18:17 +01:00
parent 377e3c703f
commit 3a74579440

View File

@@ -9,8 +9,6 @@ import pathlib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def slice_by_range(element, *ranges):
"""
@@ -28,15 +26,11 @@ def slice_by_range(element, *ranges):
class DropRecordsSingleEmptyColumn(beam.DoFn):
def __init__(self, index):
self.index = index
def process(self, element):
"""
Drop the entire row if a given column is empty.
Args:
element : The element
index : The index of the column in the list.
Returns:
None: If the length of the column is 0, drop the element.
@@ -44,6 +38,11 @@ class DropRecordsSingleEmptyColumn(beam.DoFn):
Yields:
element: If the length of the column isn't 0, keep the element.
"""
def __init__(self, index):
self.index = index
def process(self, element):
column = element[self.index]
if len(column) == 0:
return None
@@ -51,7 +50,19 @@ class DropRecordsSingleEmptyColumn(beam.DoFn):
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""If two given items in a list are both empty, drop this entry from the PCollection."""
"""
Drop the entire row if both of two given columns are empty.
Args:
index_0 : The index of the first column in the list.
index_1 : The index of the second column in the list.
Returns:
None: If the length of both columns is 0, drop the element.
Yields:
element: If the length of both columns isn't 0, keep the element.
"""
def __init__(self, index_0, index_1):
self.index_0 = index_0
@@ -66,7 +77,14 @@ class DropRecordsTwoEmptyColumn(beam.DoFn):
class SplitColumn(beam.DoFn):
"""Split an item in a list into two separate items in the PCollection."""
"""
Split one column into two columns by a character.
Args:
index : The index of the column in the list.
split_char: The character to split the column by.
"""
def __init__(self, index, split_char):
self.index = index
@@ -74,7 +92,7 @@ class SplitColumn(beam.DoFn):
def process(self, element):
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
# place and append the second column at the end.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
@@ -87,8 +105,16 @@ class SplitColumn(beam.DoFn):
class CreateMappingTable(beam.DoFn):
"""
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
Create a mapping table to be used as a side-input.
This mapping table has a key of an id generated across all columns and a value of
the raw property data.
The table is used to populate the raw property data after a GroupByKey which is done on ids only
in order to reduce the amount of data processed in the GroupByKey operation.
Args:
all_columns
"""
def __init__(self, all_columns=False):
@@ -104,14 +130,6 @@ class CreateMappingTable(beam.DoFn):
yield new_element
# class CreateMappingTable(beam.DoFn):
# def process(self, element):
# unique_string = ",".join(element)
# hashed_string = hashlib.md5(unique_string.encode())
# new_element = {hashed_string.hexdigest(): list(element)}
# yield new_element
class CreateUniquePropertyID(beam.DoFn):
def process(self, element):
unique_string = ",".join(element[-1][2:])
@@ -127,48 +145,12 @@ class DeduplicateIDs(beam.DoFn):
yield new_element
# class InsertDataForID(beam.DoFn):
# def __init__(self, mapping_table):
# self.mapping_table = mapping_table
# def process(self, element):
# replaced_list = [self.mapping_table[x] for x in element[-1]]
# new_element = (element[0], replaced_list)
# yield new_element
def insert_data_for_id(element, mapping_table):
replaced_list = []
for data_id in element[-1]:
replaced_list.append(mapping_table[data_id])
# replaced_list = [mapping_table[x] for x in element[-1]]
replaced_list = [mapping_table[data_id] for data_id in element[-1]]
new_element = (element[0], replaced_list)
yield new_element
# old
class DeduplicateByID(beam.DoFn):
"""
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
"""
def process(self, element):
if len(list(element[1])) > 0:
deduplicated_element = (element[0], [list(element[1])[0]])
yield deduplicated_element
else:
yield element
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
def process(self, element):
element_no_id = element[-1][0]
yield element_no_id
class ConvertDataToDict(beam.DoFn):
"""Convert the processed data into a dict to be exported as a JSON object."""
@@ -232,7 +214,7 @@ class ConvertDataToDict(beam.DoFn):
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
# "readable_address": None,
"readable_address": None,
"flat_appartment": list(element[-1])[0][4],
"builing": list(element[-1])[0][10],
"number": list(element[-1])[0][3],
@@ -252,20 +234,20 @@ class ConvertDataToDict(beam.DoFn):
}
# Create a human readable address to go in the dict.
# json_object["readable_address"] = self.get_readable_address(
# [
# json_object["flat_appartment"],
# json_object["builing"],
# f'{json_object["number"]} {json_object["street"]}',
# json_object["postcode"],
# ],
# [
# json_object["locality"],
# json_object["town"],
# json_object["district"],
# json_object["county"],
# ],
# )
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
yield json_object
@@ -276,13 +258,15 @@ def run(argv=None, save_main_session=True):
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-monthly-update-new-version.csv"
/ "pp-2020.csv"
# / "pp-complete.csv"
)
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-monthly-update-new-version"
/ "pp-2020"
# / "pp-complete"
)
# Arguments
@@ -322,33 +306,20 @@ def run(argv=None, save_main_session=True):
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B90 3LA"))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
# clean_deduplicate = (
# clean_drop
# | "Generate ID using all columns"
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
# | "Group by the ID for all columns" >> beam.GroupByKey()
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
# )
# Create a mapping table
mapping_table_raw = (
clean_drop
| "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable(all_columns=True))
# | beam.Map(print)
)
mapping_table_condensed = (
mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
# | beam.Map(print)
)
prepared = (
@@ -363,26 +334,13 @@ def run(argv=None, save_main_session=True):
>> beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
# | beam.Map(print)
)
# # Prepare the data by generating an ID using the uniquely identifying
# # information only and grouping them by this ID.
# prepare = (
# clean_deduplicate
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
# | "Generate unique ID ignoring price & date old"
# >> beam.ParDo(GenerateUniqueID())
# | "Group by the ID ignoring price & date" >> beam.GroupByKey()
# # | beam.Map(print)
# )
# Format the data into a dict.
formatted = (
prepared
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
# | beam.Map(print)
)
# Save the data to a .json file.