mirror of
https://github.com/dtomlinson91/street_group_tech_test
synced 2025-12-22 03:55:43 +00:00
updating beam pipeline to use GroupByKey
This commit is contained in:
@@ -2,6 +2,7 @@ import argparse
|
||||
from datetime import datetime
|
||||
import hashlib
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
|
||||
@@ -12,17 +13,37 @@ from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
|
||||
|
||||
|
||||
def slice_by_range(element, *ranges):
|
||||
"""Slice a list with multiple ranges."""
|
||||
"""
|
||||
Slice a list with multiple ranges.
|
||||
|
||||
Args:
|
||||
element : The element.
|
||||
*ranges (tuple): Tuples containing a start,end index to slice the element.
|
||||
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
|
||||
|
||||
Returns:
|
||||
list: The list sliced by the ranges
|
||||
"""
|
||||
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
|
||||
|
||||
|
||||
class DropRecordsSingleEmptyColumn(beam.DoFn):
|
||||
"""If a given item in a list is empty, drop this entry from the PCollection."""
|
||||
|
||||
def __init__(self, index):
|
||||
self.index = index
|
||||
|
||||
def process(self, element):
|
||||
"""
|
||||
Drop the entire row if a given column is empty.
|
||||
|
||||
Args:
|
||||
element : The element
|
||||
|
||||
Returns:
|
||||
None: If the length of the column is 0, drop the element.
|
||||
|
||||
Yields:
|
||||
element: If the length of the column isn't 0, keep the element.
|
||||
"""
|
||||
column = element[self.index]
|
||||
if len(column) == 0:
|
||||
return None
|
||||
@@ -78,9 +99,9 @@ class GenerateUniqueID(beam.DoFn):
|
||||
",".join(element[2:]) if not self.all_columns else ",".join(element)
|
||||
)
|
||||
hashed_string = hashlib.md5(unique_string.encode())
|
||||
# append the hash to the end
|
||||
element.append(hashed_string.hexdigest())
|
||||
yield element
|
||||
# add the hash as a key to the data.
|
||||
new_element = (hashed_string.hexdigest(), list(element))
|
||||
yield new_element
|
||||
|
||||
|
||||
class DeduplicateByID(beam.DoFn):
|
||||
@@ -91,7 +112,7 @@ class DeduplicateByID(beam.DoFn):
|
||||
|
||||
def process(self, element):
|
||||
if len(list(element[1])) > 0:
|
||||
deduplicated_element = (list(element[0]), [list(element[1])[0]])
|
||||
deduplicated_element = (element[0], [list(element[1])[0]])
|
||||
yield deduplicated_element
|
||||
else:
|
||||
yield element
|
||||
@@ -102,7 +123,6 @@ class RemoveUniqueID(beam.DoFn):
|
||||
|
||||
def process(self, element):
|
||||
element_no_id = element[-1][0]
|
||||
element_no_id.pop(-1)
|
||||
yield element_no_id
|
||||
|
||||
|
||||
@@ -169,7 +189,7 @@ class ConvertDataToDict(beam.DoFn):
|
||||
# Create the dict to hold all the information about the property.
|
||||
json_object = {
|
||||
"property_id": element[0],
|
||||
"readable_address": None,
|
||||
# "readable_address": None,
|
||||
"flat_appartment": list(element[-1])[0][4],
|
||||
"builing": list(element[-1])[0][10],
|
||||
"number": list(element[-1])[0][3],
|
||||
@@ -189,20 +209,20 @@ class ConvertDataToDict(beam.DoFn):
|
||||
}
|
||||
|
||||
# Create a human readable address to go in the dict.
|
||||
json_object["readable_address"] = self.get_readable_address(
|
||||
[
|
||||
json_object["flat_appartment"],
|
||||
json_object["builing"],
|
||||
f'{json_object["number"]} {json_object["street"]}',
|
||||
json_object["postcode"],
|
||||
],
|
||||
[
|
||||
json_object["locality"],
|
||||
json_object["town"],
|
||||
json_object["district"],
|
||||
json_object["county"],
|
||||
],
|
||||
)
|
||||
# json_object["readable_address"] = self.get_readable_address(
|
||||
# [
|
||||
# json_object["flat_appartment"],
|
||||
# json_object["builing"],
|
||||
# f'{json_object["number"]} {json_object["street"]}',
|
||||
# json_object["postcode"],
|
||||
# ],
|
||||
# [
|
||||
# json_object["locality"],
|
||||
# json_object["town"],
|
||||
# json_object["district"],
|
||||
# json_object["county"],
|
||||
# ],
|
||||
# )
|
||||
yield json_object
|
||||
|
||||
|
||||
@@ -259,6 +279,7 @@ def run(argv=None, save_main_session=True):
|
||||
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
|
||||
| "Drop empty PAON if missing SAON"
|
||||
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
|
||||
# | beam.ParDo(DebugShowColumnWithValueIn(2, "B16 0AE"))
|
||||
| "Split PAON into two columns if separated by comma"
|
||||
>> beam.ParDo(SplitColumn(3, ","))
|
||||
)
|
||||
@@ -269,7 +290,7 @@ def run(argv=None, save_main_session=True):
|
||||
| "Generate unique ID for all columns"
|
||||
>> beam.ParDo(GenerateUniqueID(all_columns=True))
|
||||
| "Group by the ID for all columns"
|
||||
>> beam.GroupBy(lambda element: element[-1])
|
||||
>> beam.GroupByKey()
|
||||
| "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
|
||||
)
|
||||
|
||||
@@ -281,7 +302,8 @@ def run(argv=None, save_main_session=True):
|
||||
| "Generate unique ID ignoring price & date"
|
||||
>> beam.ParDo(GenerateUniqueID())
|
||||
| "Group by the ID ignoring price & date"
|
||||
>> beam.GroupBy(lambda element: element[-1])
|
||||
>> beam.GroupByKey()
|
||||
# | beam.Map(print)
|
||||
)
|
||||
|
||||
# Format the data into a dict.
|
||||
@@ -295,6 +317,7 @@ def run(argv=None, save_main_session=True):
|
||||
(
|
||||
formatted
|
||||
| "Combine into one PCollection" >> beam.combiners.ToList()
|
||||
| "Format output" >> beam.Map(json.dumps)
|
||||
| "Save to .json file"
|
||||
>> beam.io.WriteToText(
|
||||
file_path_prefix=known_args.output,
|
||||
|
||||
Reference in New Issue
Block a user