2 Commits

Author SHA1 Message Date
5505dbf24a adding docker commands for spark 2021-09-26 05:34:30 +01:00
2a1c4fe68e adding spark runner 2021-09-26 05:33:39 +01:00
18 changed files with 184 additions and 458 deletions

View File

@@ -1 +0,0 @@
3.7.9

View File

@@ -1,9 +1,2 @@
# street_group_tech_test
Technical Test for Street Group for Daniel Tomlinson.
## Documentation
Read the documentation on github pages for instructions around running the code and a discussion on the approach.
https://dtomlinson91.github.io/street_group_tech_test/
Technical Test for Street Group

View File

View File

@@ -22,9 +22,3 @@ class DebugShowColumnWithValueIn(beam.DoFn):
if self.value in column:
yield element
return None
class DebugPrint(beam.DoFn):
def process(self, element):
print(element)
yield element

View File

@@ -1,44 +1,30 @@
import argparse
import csv
from datetime import datetime
import hashlib
import io
from importlib import resources
import itertools
import json
import logging
import pathlib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
# from analyse_properties.debug import DebugShowEmptyColumn, DebugShowColumnWithValueIn
def csv_reader(csv_file):
"""Read in a csv file."""
return csv.reader(io.TextIOWrapper(csv_file.open()))
from analyse_properties.debug import * # noqa
def slice_by_range(element, *ranges):
"""
Slice a list with multiple ranges.
Args:
element : The element.
*ranges (tuple): Tuples containing a start,end index to slice the element.
E.g (0, 3), (5, 6) - Keeps columns 0,1,2,5. Drops everything else.
Returns:
list: The list sliced by the ranges
"""
"""Slice a list with multiple ranges."""
return itertools.chain(*(itertools.islice(element, *r) for r in ranges))
class DropRecordsSingleEmptyColumn(beam.DoFn):
"""
Drop the entire row if a given column is empty.
Args:
index : The index of the column in the list.
Returns:
None: If the length of the column is 0, drop the element.
Yields:
element: If the length of the column is >0, keep the element.
"""
"""If a given item in a list is empty, drop this entry from the PCollection."""
def __init__(self, index):
self.index = index
@@ -51,19 +37,7 @@ class DropRecordsSingleEmptyColumn(beam.DoFn):
class DropRecordsTwoEmptyColumn(beam.DoFn):
"""
Drop the entire row if both of two given columns are empty.
Args:
index_0 : The index of the first column in the list.
index_1 : The index of the second column in the list.
Returns:
None: If the length of both columns is 0, drop the element.
Yields:
element: If the length of both columns is >0, keep the element.
"""
"""If two given items in a list are both empty, drop this entry from the PCollection."""
def __init__(self, index_0, index_1):
self.index_0 = index_0
@@ -78,91 +52,65 @@ class DropRecordsTwoEmptyColumn(beam.DoFn):
class SplitColumn(beam.DoFn):
"""
Split one column into two columns by a character.
Args:
index : The index of the column in the list.
split_char: The character to split the column by.
"""
"""Split an item in a list into two separate items in the PCollection."""
def __init__(self, index, split_char):
self.index = index
self.split_char = split_char
def process(self, element):
# If there is a split based on the split_char, then keep the second result in
# place (street number) and append the first result (building) at the end.
# If there is a split based on the split_char, then keep the first result in
# place and append the second.
try:
part_0, part_1 = element[self.index].split(self.split_char)
element[self.index] = part_1.strip()
element.append(part_0.strip())
yield element
except ValueError:
# append a blank column to keep column numbers consistent.
element.append("")
yield element
class CreateMappingTable(beam.DoFn):
class GenerateUniqueID(beam.DoFn):
"""
Create a mapping table to be used as a side-input.
This mapping table has a key of an ID generated across all columns and a value of
the raw property data.
The table is used to populate the raw property data after a GroupByKey using
only the IDs in order to reduce the amount of data processed in the GroupByKey operation.
Generate a unique ID for the PCollection, either for all the columns or for the
uniquely identifying data only.
"""
def __init__(self, all_columns=False):
self.all_columns = all_columns
def process(self, element):
# Join the row into a string.
unique_string = ",".join(element)
# Hash the string.
unique_string = (
",".join(element[2:]) if not self.all_columns else ",".join(element)
)
hashed_string = hashlib.md5(unique_string.encode())
# Format the resulting PCollection with the key of id and value of raw data.
new_element = (hashed_string.hexdigest(), list(element))
yield new_element
# append the hash to the end
element.append(hashed_string.hexdigest())
yield element
class CreateUniquePropertyID(beam.DoFn):
class DeduplicateByID(beam.DoFn):
"""
Create a unique property ID which does not include the price and date of sale.
Uses each row of the mapping table to create a PCollection with a key of the
unique property ID and a value of the ID generated across all columns.
If the PCollection has multiple entries after being grouped by ID for all columns,
deduplicate the list to keep only one.
"""
def process(self, element):
unique_string = ",".join(element[-1][2:])
hashed_string = hashlib.md5(unique_string.encode())
new_element = (hashed_string.hexdigest(), element[0])
yield new_element
if len(element[1]) > 0:
deduplicated_element = (element[0], [element[1][0]])
yield deduplicated_element
else:
yield element
class DeduplicateIDs(beam.DoFn):
"""Deduplicate a list of IDs."""
class RemoveUniqueID(beam.DoFn):
"""Remove the unique ID from the PCollection, transforming it back into a list."""
def process(self, element):
deduplicated_list = list(set(element[-1]))
new_element = (element[0], deduplicated_list)
yield new_element
def insert_data_for_id(element, mapping_table):
"""
Replace the ID with the raw data from the mapping table.
Args:
element: The element.
mapping_table (dict): The mapping table.
Yields:
The element with IDs replaced with raw data.
"""
replaced_list = [mapping_table[data_id] for data_id in element[-1]]
new_element = (element[0], replaced_list)
yield new_element
element_no_id = element[-1][0]
element_no_id.pop(-1)
yield element_no_id
class ConvertDataToDict(beam.DoFn):
@@ -170,15 +118,7 @@ class ConvertDataToDict(beam.DoFn):
@staticmethod
def get_latest_transaction(transaction_dates):
"""
Get the date of the latest transaction for a list of dates.
Args:
transaction_dates (str): A date in the form "%Y-%m-%d".
Returns:
str: The year in the form "%Y" of the latest transaction date.
"""
"""Get the date of the latest transaction."""
transaction_dates = [
datetime.strptime(individual_transaction, "%Y-%m-%d")
for individual_transaction in transaction_dates
@@ -186,17 +126,7 @@ class ConvertDataToDict(beam.DoFn):
return max(transaction_dates).strftime("%Y")
@staticmethod
def get_readable_address(address_components, address_comparisons):
"""
Create a human readable address from the locality/town/district/county columns.
Args:
address_components (list): The preceeding parts of the address (street, postcode etc.)
address_comparisons (list): The locality/town/district/county.
Returns:
str: The complete address deduplicated & cleaned.
"""
def get_readable_address(address_components: list, address_comparisons: list):
# Get pairwise comparison to see if two locality/town/district/counties
# are equivalent
pairwise_comparison = [
@@ -217,6 +147,7 @@ class ConvertDataToDict(beam.DoFn):
applied_mask = list(itertools.compress(address_comparisons, mask))
# Filter out empty items in list
deduplicated_address_part = list(filter(None, applied_mask))
# Filter out any missing parts of the address components
cleaned_address_components = list(filter(None, address_components))
@@ -235,9 +166,9 @@ class ConvertDataToDict(beam.DoFn):
# Group together all the transactions for the property.
property_transactions = [
{
"price": int(entry[0]),
"price": entry[0],
"transaction_date": entry[1].replace(" 00:00", ""),
"year": int(entry[1][0:4]),
"year": entry[1][0:4],
}
for entry in element[-1]
]
@@ -245,158 +176,138 @@ class ConvertDataToDict(beam.DoFn):
# Create the dict to hold all the information about the property.
json_object = {
"property_id": element[0],
"readable_address": None,
"flat_appartment": list(element[-1])[0][4],
"builing": list(element[-1])[0][10],
"number": list(element[-1])[0][3],
"street": list(element[-1])[0][5],
"locality": list(element[-1])[0][6],
"town": list(element[-1])[0][7],
"district": list(element[-1])[0][8],
"county": list(element[-1])[0][9],
"postcode": list(element[-1])[0][2],
# "readable_address": None,
"flat_appartment": element[-1][0][4],
"builing": element[-1][0][10],
"number": element[-1][0][3],
"street": element[-1][0][5],
"locality": element[-1][0][6],
"town": element[-1][0][7],
"district": element[-1][0][8],
"county": element[-1][0][9],
"postcode": element[-1][0][2],
"property_transactions": property_transactions,
"latest_transaction_year": int(self.get_latest_transaction(
"latest_transaction_year": self.get_latest_transaction(
[
transaction["transaction_date"]
for transaction in property_transactions
]
)),
),
}
# Create a human readable address to go in the dict.
json_object["readable_address"] = self.get_readable_address(
[
json_object["flat_appartment"],
json_object["builing"],
f'{json_object["number"]} {json_object["street"]}',
json_object["postcode"],
],
[
json_object["locality"],
json_object["town"],
json_object["district"],
json_object["county"],
],
)
# json_object["readable_address"] = self.get_readable_address(
# [
# json_object["flat_appartment"],
# json_object["builing"],
# f'{json_object["number"]} {json_object["street"]}',
# json_object["postcode"],
# ],
# [
# json_object["locality"],
# json_object["town"],
# json_object["district"],
# json_object["county"],
# ],
# )
yield json_object
def run(argv=None, save_main_session=True):
"""Entrypoint and definition of the pipeline."""
logging.getLogger().setLevel(logging.INFO)
def main():
# Load in the data from a csv file.
# csv_data = resources.path(
# "analyse_properties.data.input",
# "pp-monthly-update-new-version.csv"
# # "analyse_properties.data.input",
# # "pp-complete.csv",
# )
# Default input/output files
input_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "input"
/ "pp-2020.csv"
# / "pp-complete.csv"
)
output_file = (
pathlib.Path(__file__).parents[1]
/ "data"
/ "output"
/ "pp-2020"
# / "pp-complete"
options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
)
# Arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default=str(input_file),
help="Full path to the input file.",
)
parser.add_argument(
"--output",
dest="output",
default=str(output_file),
help="Full path to the output file without extension.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
# Pipeline options. save_main_session needed for DataFlow for global imports.
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as pipeline:
with beam.Pipeline(options=options) as pipeline:
# Load the data
load = (
pipeline
| "Read input data" >> beam.io.ReadFromText(known_args.input)
| "Split by ','" >> beam.Map(lambda element: element.split(","))
| "Remove leading and trailing quotes"
>> beam.Map(lambda element: [el.strip('"') for el in element])
# with csv_data as csv_data_file:
# # https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/io/fileio_test.py#L155-L170
# load = (
# pipeline
# | fileio.MatchFiles(str(csv_data_file))
# | fileio.ReadMatches()
# | beam.FlatMap(csv_reader)
# )
load = pipeline | beam.Create(
[
"🍓Strawberry,🥕Carrot,🍆Eggplant",
"🍅Tomato,🥔Potato",
]
)
# Clean the data.
clean_drop = (
load
| "Drop unneeded columns"
>> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
| "Convert to Upper Case"
>> beam.Map(lambda element: [e.upper() for e in element])
| "Strip leading/trailing whitespace"
>> beam.Map(lambda element: [e.strip() for e in element])
| "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
| "Drop empty PAON if missing SAON"
>> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
| "Split PAON into two columns if separated by comma"
>> beam.ParDo(SplitColumn(3, ","))
)
# Clean the data by dropping unneeded rows.
# clean_drop = (
# load
# | "Drop unneeded columns"
# >> beam.Map(lambda element: list(slice_by_range(element, (1, 4), (7, 14))))
# | "Convert to Upper Case"
# >> beam.Map(lambda element: [e.upper() for e in element])
# | "Strip leading/trailing whitespace"
# >> beam.Map(lambda element: [e.strip() for e in element])
# | "Drop Empty Postcodes" >> beam.ParDo(DropRecordsSingleEmptyColumn(2))
# | "Drop empty PAON if missing SAON"
# >> beam.ParDo(DropRecordsTwoEmptyColumn(3, 4))
# | "Split PAON into two columns if separated by comma"
# >> beam.ParDo(SplitColumn(3, ","))
# )
# Create a mapping table
mapping_table_raw = (
clean_drop
| "Create a mapping table with key of id_all_columns and value of cleaned data."
>> beam.ParDo(CreateMappingTable())
)
# # Clean the data by creating an ID, and deduplicating to eliminate repeated rows.
# clean_deduplicate = (
# clean_drop
# | "Generate unique ID for all columns"
# >> beam.ParDo(GenerateUniqueID(all_columns=True))
# | "Group by the ID for all columns"
# >> beam.GroupBy(lambda element: element[-1])
# | "Deduplicate by the ID for all columns" >> beam.ParDo(DeduplicateByID())
# )
# Condense mapping table into a single dict.
mapping_table_condensed = (
mapping_table_raw
| "Condense mapping table into single dict" >> beam.combiners.ToDict()
)
# # Prepare the data by generating an ID using the uniquely identifying information only
# # and grouping them by this ID.
# prepare = (
# clean_deduplicate
# | "Remove previous unique ID" >> beam.ParDo(RemoveUniqueID())
# | "Generate unique ID ignoring price & date"
# >> beam.ParDo(GenerateUniqueID())
# | "Group by the ID ignoring price & date"
# >> beam.GroupBy(lambda element: element[-1])
# )
# Prepare the data by creating IDs, grouping together and using mapping table
# to reinsert raw data.
prepared = (
mapping_table_raw
| "Create unique ID ignoring price & date"
>> beam.ParDo(CreateUniquePropertyID())
| "Group by ID"
>> beam.GroupByKey()
| "Deduplicate to eliminate repeated transactions"
>> beam.ParDo(DeduplicateIDs())
| "Insert the raw data using the mapping table"
>> beam.FlatMap(
insert_data_for_id, beam.pvalue.AsSingleton(mapping_table_condensed)
)
)
# # Format the data into a dict.
# formatted = (
# prepare
# | "Convert the prepared data into a dict object"
# >> beam.ParDo(ConvertDataToDict())
# )
# Format the data into a dict.
formatted = (
prepared
| "Convert the prepared data into a dict object"
>> beam.ParDo(ConvertDataToDict())
)
# # Save the data to a .json file.
# output_file = pathlib.Path(__file__).parent / "data" / "output" / "pp-complete"
# # output_file = "/tmp/file"
# Save the data to a .json file.
(
formatted
| "Combine into one PCollection" >> beam.combiners.ToList()
| "Format output" >> beam.Map(json.dumps, indent=2)
| "Save to .json file"
>> beam.io.WriteToText(
file_path_prefix=known_args.output,
file_name_suffix=".json",
)
)
# (
# formatted
# | "Combine into one PCollection" >> beam.combiners.ToList()
# | beam.Map(print)
# # | "Save to .json file"
# # >> beam.io.WriteToText(
# # file_path_prefix=str(output_file),
# # file_name_suffix=".json",
# # shard_name_template="",
# # )
# )
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
main()

View File

@@ -1,8 +1,5 @@
# Full data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-complete.csv -P data/input
wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-complete.csv -P analyse_properties/data/input
# Monthly update data set
# wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-monthly-update-new-version.csv -P data/input
# 2020 data set
wget https://storage.googleapis.com/street-group-technical-test-dmot-euw1/input/pp-2020.csv -P data/input
# wget https://storage.googleapis.com/street-group-technical-test-dmot/pp-monthly-update-new-version.csv -P analyse_properties/data/input

View File

@@ -1,16 +1,13 @@
import pathlib
from importlib import resources
import pandas as pd
from pandas_profiling import ProfileReport
def main():
input_file = (
pathlib.Path(__file__).parents[1] / "data" / "input" / "pp-complete.csv"
)
with input_file.open() as csv:
with resources.path("analyse_properties.data", "pp-complete.csv") as csv_file:
df_report = pd.read_csv(
csv,
csv_file,
names=[
"transaction_id",
"price",

View File

@@ -0,0 +1,14 @@
docker run --rm \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest
docker pull apache/beam_spark_job_server:2.33.0_rc1
docker run --rm \
-e SPARK_DRIVER_MEMORY=8g \
-p 8098:8098 -p 8097:8097 -p 8099:8099 \
--name=beam_spark \
apache/beam_spark_job_server:latest

View File

@@ -1,93 +0,0 @@
# DataFlow
<https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python>
## Examples
Full example of beam pipeline on dataflow:
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset>
## Setup
Export env variable:
`export GOOGLE_APPLICATION_CREDENTIALS="/home/dtomlinson/git-repos/work/street_group/street_group_tech_test/street-group-0c490d23a9d0.json"`
## Run pipeline
### Dataflow
#### Yearly dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-2020.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-2020 \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-2
```
#### Full dataset
```bash
python -m analyse_properties.main \
--region europe-west1 \
--input gs://street-group-technical-test-dmot-euw1/input/pp-complete.csv \
--output gs://street-group-technical-test-dmot-euw1/output/pp-complete \
--runner DataflowRunner \
--project street-group \
--temp_location gs://street-group-technical-test-dmot-euw1/tmp \
--subnetwork=https://www.googleapis.com/compute/v1/projects/street-group/regions/europe-west1/subnetworks/europe-west-1-dataflow \
--no_use_public_ips \
--worker_machine_type=n1-highmem-8 \
--num_workers=3 \
--autoscaling_algorithm=NONE
```
### Locally
Run the pipeline locally:
`python -m analyse_properties.main --runner DirectRunner`
## Errors
Unsubscriptable error on window:
<https://stackoverflow.com/questions/42276520/what-does-object-of-type-unwindowedvalues-has-no-len-mean>
## Documentation
Running in its own private VPC without public IPs
- <https://stackoverflow.com/questions/58893082/which-compute-engine-quotas-need-to-be-updated-to-run-dataflow-with-50-workers>
- <https://cloud.google.com/dataflow/docs/guides/specifying-networks#subnetwork_parameter>
Error help
- <https://cloud.google.com/dataflow/docs/guides/common-errors>
- <https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline>
Scaling
Using DataFlowPrime: <https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime#enable-prime>
Use `--experiments=enable_prime`
Deploying a pipeline (with scaling options): <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline>
Available VM types (with pricing): <https://cloud.google.com/compute/vm-instance-pricing#n1_predefined>
Performance
Sideinput performance: <https://stackoverflow.com/questions/48242320/google-dataflow-apache-beam-python-side-input-from-pcollection-kills-perform>
Common use cases:
- Part 1 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
- Part 2 <https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2>

View File

@@ -1,27 +0,0 @@
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 181, in execute
op.finish()
File "dataflow_worker/native_operations.py", line 93, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 94, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "dataflow_worker/native_operations.py", line 95, in dataflow_worker.native_operations.NativeWriteOperation.finish
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/nativeavroio.py", line 308, in __exit__
self._data_file_writer.flush()
File "fastavro/_write.pyx", line 664, in fastavro._write.Writer.flush
File "fastavro/_write.pyx", line 639, in fastavro._write.Writer.dump
File "fastavro/_write.pyx", line 451, in fastavro._write.snappy_write_block
File "fastavro/_write.pyx", line 458, in fastavro._write.snappy_write_block
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystemio.py", line 200, in write
self._uploader.put(b)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/gcsio.py", line 720, in put
self._conn.send_bytes(data.tobytes())
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
"
"Out of memory: Killed process 2042 (python) total-vm:28616496kB, anon-rss:25684136kB, file-rss:0kB, shmem-rss:0kB, UID:0 pgtables:51284kB oom_score_adj:900"

View File

@@ -1,44 +0,0 @@
[{
"property_id": "3cf3c06632c46754696f2017933702f3",
"flat_appartment": "",
"builing": "",
"number": "63",
"street": "ROTTON PARK STREET",
"locality": "",
"town": "BIRMINGHAM",
"district": "BIRMINGHAM",
"county": "WEST MIDLANDS",
"postcode": "B16 0AE",
"property_transactions": [
{ "price": "385000", "transaction_date": "2021-01-08", "year": "2021" },
{ "price": "701985", "transaction_date": "2019-03-28", "year": "2019" },
{ "price": "1748761", "transaction_date": "2020-05-27", "year": "2020" }
],
"latest_transaction_year": "2021"
},
{
"property_id": "c650d5d7bb0daf0a19bb2cacabbee74e",
"readable_address": "16 STATION ROAD\nPARKGATE\nNESTON\nCHESHIRE WEST AND CHESTER\nCH64 6QJ",
"flat_appartment": "",
"builing": "",
"number": "16",
"street": "STATION ROAD",
"locality": "PARKGATE",
"town": "NESTON",
"district": "CHESHIRE WEST AND CHESTER",
"county": "CHESHIRE WEST AND CHESTER",
"postcode": "CH64 6QJ",
"property_transactions": [
{
"price": "280000",
"transaction_date": "2020-11-30",
"year": "2020"
},
{
"price": "265000",
"transaction_date": "2020-05-29",
"year": "2020"
}
],
"latest_transaction_year": "2020"
}]

View File

@@ -1,16 +0,0 @@
Create Mapping table
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
('fd4634faec47c29de40bbf7840723b41', ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', ''])
Condensing
{'fd4634faec47c29de40bbf7840723b41': ['317500', '2020-11-13 00:00', 'B90 3LA', '1', '', 'VERSTONE ROAD', 'SHIRLEY', 'SOLIHULL', 'SOLIHULL', 'WEST MIDLANDS', '']}
Prepared
GroupByKey
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41', 'fd4634faec47c29de40bbf7840723b41'])
deduplicated
('fe205bfe66bc7f18c50c8f3d77ec3e30', ['fd4634faec47c29de40bbf7840723b41'])

View File

@@ -20,8 +20,6 @@ pylint:
- super-init-not-called
- arguments-differ
- inconsistent-return-statements
- expression-not-assigned
- line-too-long
enable:
options:

View File

@@ -18,6 +18,3 @@ pandas-profiling = "^3.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
"analyse-properties" = "analyse_properties.main:run"

6
requirements-docs.txt Normal file
View File

@@ -0,0 +1,6 @@
apache-beam==2.32.0; python_version >= "3.6"
avro-python3==1.9.2.1; python_version >= "3.6"
cachetools==4.2.2; python_version >= "3.6" and python_version < "4.0" and (python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.6.0")
certifi==2021.5.30; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3.6"
mkdocs-material==7.3.0
mkdocs==1.2.2; python_version >= "3.6"