-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelise duckdb resulting in e.g. 2-4x speedup on 6 core machine #1796
Conversation
@NickCrews just for info this is the speedup i mentioned last night, hopefully will be merged and released soon. To get best performance:
|
Thanks! Do you have evidence that ordering blocking rules matters? Wouldn't the engine always have to compute every rule, regardless of order? |
it's a hunch, but it's because of the way blocking pairs are created and then scored:
Since scoring is the computationally intensive bit, you want to do this in parallel as much as possible, so probably it's better for scoring to be 'balanced across' the rules, rather than a single blocking rule being responsible for most of the scored comparisons |
time train ufor apply_sort in [True]:
settings_dict = {
"probability_two_random_records_match": 0.0001,
"link_type": "dedupe_only",
"blocking_rules_to_generate_predictions": [],
"comparisons": [
jaro_winkler_at_thresholds("first_name"),
jaro_winkler_at_thresholds("surname"),
levenshtein_at_thresholds("dob"),
exact_match("birth_place"),
levenshtein_at_thresholds("postcode_fake"),
exact_match("occupation"),
],
"retain_matching_columns": False,
"retain_intermediate_calculation_columns": False,
}
linker = DuckDBLinker(df, settings_dict)
# linker.__apply_sort = apply_sort
start_time = time.time()
df_e = linker.estimate_u_using_random_sampling(1e6)
end_time = time.time()
df_e = linker.estimate_u_using_random_sampling(1e6)
print(
f"Execution time for "
f"apply_sort={apply_sort}: {end_time - start_time} seconds"
) Hyopthesis - could retain intermediate calculation columns be relevant? |
This is very weird but:
playgroundimport time
from splink.datasets import splink_datasets
from splink.duckdb.blocking_rule_library import block_on
from splink.duckdb.comparison_library import (
exact_match,
jaro_winkler_at_thresholds,
levenshtein_at_thresholds,
)
from splink.duckdb.duckdb_comparison_level_library import (
else_level,
null_level,
)
from splink.duckdb.duckdb_linker import DuckDBLinker
from splink.duckdb.linker import DuckDBLinker
df = splink_datasets.historical_50k
import duckdb
duckdb.__version__
df = df.drop("cluster", axis=1)
import logging
settings_dict = {
"probability_two_random_records_match": 0.0001,
"link_type": "dedupe_only",
"blocking_rules_to_generate_predictions": [
block_on("first_name", salting_partitions=3)
],
"comparisons": [
jaro_winkler_at_thresholds("first_name"),
jaro_winkler_at_thresholds("surname"),
levenshtein_at_thresholds("dob"),
exact_match("birth_place"),
levenshtein_at_thresholds("postcode_fake"),
exact_match("occupation"),
],
"retain_matching_columns": False,
"retain_intermediate_calculation_columns": False,
}
linker = DuckDBLinker(df, settings_dict)
import logging
logging.getLogger("splink").setLevel(1)
# linker.__apply_sort = True
# linker.estimate_u_using_random_sampling(1e6)
t = linker._initialise_df_concat_with_tf()
con = linker._con
con.execute(
f"""
CREATE TABLE __splink__df_concat_with_tf_sample_745b357b3
AS
(WITH __splink__df_concat_with_tf as (select * from {t.physical_name})
select *
from __splink__df_concat_with_tf
USING SAMPLE 5.593196980782878% (bernoulli)
)
"""
)
start_time = time.time()
sql = f"""
CREATE TABLE __splink__m_u_counts_fd157dff2
AS
(WITH __splink__df_concat_with_tf_sample as (select * from __splink__df_concat_with_tf_sample_745b357b3),
__splink__df_blocked as (
select
"l"."unique_id" AS "unique_id_l", "r"."unique_id" AS "unique_id_r", "l"."first_name" AS "first_name_l", "r"."first_name" AS "first_name_r", "l"."surname" AS "surname_l", "r"."surname" AS "surname_r", "l"."dob" AS "dob_l", "r"."dob" AS "dob_r", "l"."birth_place" AS "birth_place_l", "r"."birth_place" AS "birth_place_r", "l"."postcode_fake" AS "postcode_fake_l", "r"."postcode_fake" AS "postcode_fake_r", "l"."occupation" AS "occupation_l", "r"."occupation" AS "occupation_r"
, '0' as match_key
from __splink__df_concat_with_tf_sample as l
inner join __splink__df_concat_with_tf_sample as r
on
(1=1) AND (ceiling(l.__splink_salt * 2) = 1)
where l."unique_id" < r."unique_id"
UNION ALL
select
"l"."unique_id" AS "unique_id_l", "r"."unique_id" AS "unique_id_r", "l"."first_name" AS "first_name_l", "r"."first_name" AS "first_name_r", "l"."surname" AS "surname_l", "r"."surname" AS "surname_r", "l"."dob" AS "dob_l", "r"."dob" AS "dob_r", "l"."birth_place" AS "birth_place_l", "r"."birth_place" AS "birth_place_r", "l"."postcode_fake" AS "postcode_fake_l", "r"."postcode_fake" AS "postcode_fake_r", "l"."occupation" AS "occupation_l", "r"."occupation" AS "occupation_r"
, '0' as match_key
from __splink__df_concat_with_tf_sample as l
inner join __splink__df_concat_with_tf_sample as r
on
(1=1) AND (ceiling(l.__splink_salt * 2) = 2)
where l."unique_id" < r."unique_id"
),
__splink__df_comparison_vectors as (
select "unique_id_l","unique_id_r",CASE WHEN "first_name_l" IS NULL OR "first_name_r" IS NULL THEN -1 WHEN "first_name_l" = "first_name_r" THEN 3 WHEN jaro_winkler_similarity("first_name_l", "first_name_r") >= 0.9 THEN 2 WHEN jaro_winkler_similarity("first_name_l", "first_name_r") >= 0.7 THEN 1 ELSE 0 END as gamma_first_name,CASE WHEN "surname_l" IS NULL OR "surname_r" IS NULL THEN -1 WHEN "surname_l" = "surname_r" THEN 3 WHEN jaro_winkler_similarity("surname_l", "surname_r") >= 0.9 THEN 2 WHEN jaro_winkler_similarity("surname_l", "surname_r") >= 0.7 THEN 1 ELSE 0 END as gamma_surname,CASE WHEN "dob_l" IS NULL OR "dob_r" IS NULL THEN -1 WHEN "dob_l" = "dob_r" THEN 3 WHEN levenshtein("dob_l", "dob_r") <= 1 THEN 2 WHEN levenshtein("dob_l", "dob_r") <= 2 THEN 1 ELSE 0 END as gamma_dob,CASE WHEN "birth_place_l" IS NULL OR "birth_place_r" IS NULL THEN -1 WHEN "birth_place_l" = "birth_place_r" THEN 1 ELSE 0 END as gamma_birth_place,CASE WHEN "postcode_fake_l" IS NULL OR "postcode_fake_r" IS NULL THEN -1 WHEN "postcode_fake_l" = "postcode_fake_r" THEN 3 WHEN levenshtein("postcode_fake_l", "postcode_fake_r") <= 1 THEN 2 WHEN levenshtein("postcode_fake_l", "postcode_fake_r") <= 2 THEN 1 ELSE 0 END as gamma_postcode_fake,CASE WHEN "occupation_l" IS NULL OR "occupation_r" IS NULL THEN -1 WHEN "occupation_l" = "occupation_r" THEN 1 ELSE 0 END as gamma_occupation
from __splink__df_blocked
),
__splink__df_predict as (
select *, cast(0.0 as float8) as match_probability
from __splink__df_comparison_vectors
)
select
gamma_first_name as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'first_name' as output_column_name
from __splink__df_predict
group by gamma_first_name
union all
select
gamma_surname as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'surname' as output_column_name
from __splink__df_predict
group by gamma_surname
union all
select
gamma_dob as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'dob' as output_column_name
from __splink__df_predict
group by gamma_dob
union all
select
gamma_birth_place as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'birth_place' as output_column_name
from __splink__df_predict
group by gamma_birth_place
union all
select
gamma_postcode_fake as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'postcode_fake' as output_column_name
from __splink__df_predict
group by gamma_postcode_fake
union all
select
gamma_occupation as comparison_vector_value,
sum(match_probability * 1) as m_count,
sum((1-match_probability) * 1) as u_count,
'occupation' as output_column_name
from __splink__df_predict
group by gamma_occupation
union all
select 0 as comparison_vector_value,
sum(match_probability * 1) /
sum(1) as m_count,
sum((1-match_probability) * 1) /
sum(1) as u_count,
'_probability_two_random_records_match' as output_column_name
from __splink__df_predict
)
"""
con.execute(sql)
end_time = time.time()
apply_sort = True
print(
f"Execution time for " f"apply_sort={apply_sort}: {end_time - start_time} seconds"
)
# sql = f"""
# CREATE TABLE __splink__m_u_counts_fd157dff2
# AS
# (WITH __splink__df_concat_with_tf_sample as (select * from __splink__df_concat_with_tf_sample_745b357b3),
# __splink__df_blocked as (
# select
# "l"."unique_id" AS "unique_id_l", "r"."unique_id" AS "unique_id_r", "l"."first_name" AS "first_name_l", "r"."first_name" AS "first_name_r", "l"."surname" AS "surname_l", "r"."surname" AS "surname_r", "l"."dob" AS "dob_l", "r"."dob" AS "dob_r", "l"."birth_place" AS "birth_place_l", "r"."birth_place" AS "birth_place_r", "l"."postcode_fake" AS "postcode_fake_l", "r"."postcode_fake" AS "postcode_fake_r", "l"."occupation" AS "occupation_l", "r"."occupation" AS "occupation_r"
# , '0' as match_key
# from __splink__df_concat_with_tf_sample as l
# inner join __splink__df_concat_with_tf_sample as r
# on
# (1=1)
# where l."unique_id" < r."unique_id"
# ),
# __splink__df_comparison_vectors as (
# select "unique_id_l","unique_id_r",CASE WHEN "first_name_l" IS NULL OR "first_name_r" IS NULL THEN -1 WHEN "first_name_l" = "first_name_r" THEN 3 WHEN jaro_winkler_similarity("first_name_l", "first_name_r") >= 0.9 THEN 2 WHEN jaro_winkler_similarity("first_name_l", "first_name_r") >= 0.7 THEN 1 ELSE 0 END as gamma_first_name,CASE WHEN "surname_l" IS NULL OR "surname_r" IS NULL THEN -1 WHEN "surname_l" = "surname_r" THEN 3 WHEN jaro_winkler_similarity("surname_l", "surname_r") >= 0.9 THEN 2 WHEN jaro_winkler_similarity("surname_l", "surname_r") >= 0.7 THEN 1 ELSE 0 END as gamma_surname,CASE WHEN "dob_l" IS NULL OR "dob_r" IS NULL THEN -1 WHEN "dob_l" = "dob_r" THEN 3 WHEN levenshtein("dob_l", "dob_r") <= 1 THEN 2 WHEN levenshtein("dob_l", "dob_r") <= 2 THEN 1 ELSE 0 END as gamma_dob,CASE WHEN "birth_place_l" IS NULL OR "birth_place_r" IS NULL THEN -1 WHEN "birth_place_l" = "birth_place_r" THEN 1 ELSE 0 END as gamma_birth_place,CASE WHEN "postcode_fake_l" IS NULL OR "postcode_fake_r" IS NULL THEN -1 WHEN "postcode_fake_l" = "postcode_fake_r" THEN 3 WHEN levenshtein("postcode_fake_l", "postcode_fake_r") <= 1 THEN 2 WHEN levenshtein("postcode_fake_l", "postcode_fake_r") <= 2 THEN 1 ELSE 0 END as gamma_postcode_fake,CASE WHEN "occupation_l" IS NULL OR "occupation_r" IS NULL THEN -1 WHEN "occupation_l" = "occupation_r" THEN 1 ELSE 0 END as gamma_occupation
# from __splink__df_blocked
# ),
# __splink__df_predict as (
# select *, cast(0.0 as float8) as match_probability
# from __splink__df_comparison_vectors
# )
# select
# gamma_first_name as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'first_name' as output_column_name
# from __splink__df_predict
# group by gamma_first_name
# union all
# select
# gamma_surname as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'surname' as output_column_name
# from __splink__df_predict
# group by gamma_surname
# union all
# select
# gamma_dob as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'dob' as output_column_name
# from __splink__df_predict
# group by gamma_dob
# union all
# select
# gamma_birth_place as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'birth_place' as output_column_name
# from __splink__df_predict
# group by gamma_birth_place
# union all
# select
# gamma_postcode_fake as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'postcode_fake' as output_column_name
# from __splink__df_predict
# group by gamma_postcode_fake
# union all
# select
# gamma_occupation as comparison_vector_value,
# sum(match_probability * 1) as m_count,
# sum((1-match_probability) * 1) as u_count,
# 'occupation' as output_column_name
# from __splink__df_predict
# group by gamma_occupation
# union all
# select 0 as comparison_vector_value,
# sum(match_probability * 1) /
# sum(1) as m_count,
# sum((1-match_probability) * 1) /
# sum(1) as u_count,
# '_probability_two_random_records_match' as output_column_name
# from __splink__df_predict
# )
# """
# con.execute(sql)
# end_time = time.time()
# apply_sort = True
# print(
# f"Execution time for "
# f"apply_sort={apply_sort}: {end_time - start_time} seconds"
# ) |
…rain_u Faster duckdb train u
exampleimport time
from splink.datasets import splink_datasets
from splink.duckdb.blocking_rule_library import block_on
from splink.duckdb.comparison_library import (
exact_match,
jaro_winkler_at_thresholds,
levenshtein_at_thresholds,
)
from splink.duckdb.linker import DuckDBLinker
df = splink_datasets.historical_50k
df = df.drop("cluster", axis=1)
def get_brs(salt):
brs = [
["gender", "occupation", "full_name"],
["gender", "occupation", "postcode_fake"],
["occupation", "full_name"],
["occupation", "postcode_fake"],
["gender", "occupation", "first_and_surname"],
["first_name", "gender", "postcode_fake"],
["gender", "occupation", "dob"],
["first_name", "gender", "dob"],
["occupation", "first_and_surname"],
["gender", "occupation", "surname"],
]
if salt > 1:
brs = [block_on(x, salting_partitions=salt) for x in brs]
else:
brs = [block_on(x) for x in brs]
return brs
settings_dict = {
"probability_two_random_records_match": 0.0001,
"link_type": "dedupe_only",
"blocking_rules_to_generate_predictions": [],
"comparisons": [
jaro_winkler_at_thresholds("first_name"),
jaro_winkler_at_thresholds("surname"),
levenshtein_at_thresholds("dob"),
exact_match("birth_place"),
levenshtein_at_thresholds("postcode_fake"),
exact_match("occupation"),
],
"retain_matching_columns": False,
"retain_intermediate_calculation_columns": False,
}
# Time train u
linker = DuckDBLinker(df, settings_dict)
start_time = time.time()
df_e = linker.estimate_u_using_random_sampling(5e6)
end_time = time.time()
print(f"Execution time train u: {(end_time - start_time):,.2f} seconds")
# Time blocking no salting
settings_dict["blocking_rules_to_generate_predictions"] = get_brs(1)
linker = DuckDBLinker(df, settings_dict)
start_time = time.time()
df_e = linker.predict()
end_time = time.time()
print(f"Execution time predict no salt: {(end_time - start_time):,.2f} seconds")
# Time blocking with salting
settings_dict["blocking_rules_to_generate_predictions"] = get_brs(4)
linker = DuckDBLinker(df, settings_dict)
start_time = time.time()
df_e = linker.estimate_u_using_random_sampling(5e6)
end_time = time.time()
print(
f"Execution time predict salt multiple rules: {(end_time - start_time):,.2f} seconds"
)
# Time blocking with salting
settings_dict["blocking_rules_to_generate_predictions"] = [
block_on("birth_place", salting_partitions=4)
]
linker = DuckDBLinker(df, settings_dict)
start_time = time.time()
df_e = linker.estimate_u_using_random_sampling(5e6)
end_time = time.time()
print(f"Execution time predict salt one rules: {(end_time - start_time):,.2f} seconds") |
Ah, I figured all pairs were generated, and then compared. If the comparing happens in a streaming fashion as pairs are generated, then that makes sense. Thanks! |
@RobinL chiming in here, it'd be great to be able to split up predict() into two separate steps, one for doing the inner join, and then another to compute similarity scores. This could help provide more control for parallelization. I can also see myself wanting to experiment with different models using the same blocking rule, and in that case I could use a persisted record pairs table to speed things up across models. |
Yeah, I agree - I have been thinking about that too. I'm pretty sure it would guarantee parallelisation, but only at a big performance cost It means you have to persist (to memory or disk) all comparisons, whereas if you create the comparisons and score as a single step many of them can be created and immediately rejected (because their score doesn't meet the minimum threshold). Even if no threshold is set in your workflow (i.e. you're keeping all pairwise comparisons irrespective of score), it means the value comparisons get persisted, which you can avoid with the For smaller workflows none of this is really a problem because things are fast. But I'm not sure the benefits of parallelization outweigh the costs. Creating the pariwise comparisons is relatively cheap Vs scoring. Ultimate a lot of the above in a hunch rather than something I've rigorously tested, so I'll we should probably do some experiments . I think there should probably be an option to split up anyway (because it might be something the user wants for reasons other than performance). Just to add a bit of clarity, this difference is this: Combined steps algorithm
Two step algorithm
the later results in a potentially much higher disk and memory usage (and more work in outputting and reading back in) |
@RobinL That makes sense. I think both solutions make sense depending on the context. Some other thoughts that you've probably considered:
For my use case, I'll be disambiguating around 50M records, with around 5-10 complex comparisons per record pair. Since I'm working with labeled data for evaluation, I can quickly get feedback on the accuracy of a model and I'll want to try many models in a short period of time. I can work with a r6a.metal instance on AWS that has 192 cores and 1.5 TB or RAM. The cost is not prohibitive ($10 an hour), as long as the compute resources are fully utilized to get results fast. |
splink/estimate_u.py
Outdated
@@ -117,7 +117,23 @@ def estimate_u_values(linker: Linker, max_pairs, seed=None): | |||
training_linker._enqueue_sql(sql, "__splink__df_concat_with_tf_sample") | |||
df_sample = training_linker._execute_sql_pipeline([nodes_with_tf]) | |||
|
|||
settings_obj._blocking_rules_to_generate_predictions = [] | |||
if linker._sql_dialect == "duckdb" and max_pairs > 1e5: | |||
if max_pairs < 1e6: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a heuristic to make duckdb parallelise more when the user is asking for a bigger computation
I've been doing some more formal benchmarking using aws ec2 instances. Good news: the time seems to scale linearly with the number of CPUs for estimating u. Here we're comparing the latest pypi release with this PR. I've tested on: https://instances.vantage.sh/aws/ec2/c6gd.2xlarge We can see going from 2xlarge -> 4xlarge halved runtime Here's the code: |
splink/estimate_u.py
Outdated
@@ -51,6 +52,12 @@ def _proportion_sample_size_link_only( | |||
return proportion, sample_size | |||
|
|||
|
|||
def _get_duckdb_salting(max_pairs): | |||
logged = math.log(max_pairs, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a heuristic.
max_pairs, salting
1e+00 1
1e+01 1
1e+02 1
1e+03 1
1e+04 1
1e+05 3
1e+06 7
1e+07 16
1e+08 40
2e+08 52
1e+09 98
1e+10 245
1e+11 611
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to have a very brief explanation as a comment, as otherwise this function is maybe a little cryptic
In terms of picking AWS EC2 image types for benchmarking, it looks like memory isn't very important. Here's 100mil comparisons on a (this is estimate u only, so we're not persisting large datasets to memory or disk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, amazing results!
splink/estimate_u.py
Outdated
@@ -51,6 +52,12 @@ def _proportion_sample_size_link_only( | |||
return proportion, sample_size | |||
|
|||
|
|||
def _get_duckdb_salting(max_pairs): | |||
logged = math.log(max_pairs, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to have a very brief explanation as a comment, as otherwise this function is maybe a little cryptic
Also would be good to add this to the changelog |
See here for investigation into best salting settings
At the moment, DuckDB paraellelises little if any of the Splink workflow. This PR causes DuckDB to fully parallelise salted workloads, meaning operations like
predict()
are multiple times faster (especially on machines with many cores)See duckdb/duckdb#9710
time it example
In the example above I've hacked line 367 of blocking.py to allow __apply_sort
Speedup seems to be even greater for a single salted blocking rule. I would expect them also to be greater the more cores the machine has
Edit:
This seems to make training u slower, despite resulting in all cores being used. Definitely solvable, but converting to draft for now
Edit2: salting for u: