Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check duplicate prefixes #363

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ rule all:
# Build all the Parquet files.
config['output_directory'] + '/duckdb/done',

# Build all the DuckDB (index-wide) reports.
config['output_directory'] + '/reports/duckdb/done',

# Build all the exports.
config['output_directory'] + '/kgx/done',
config['output_directory'] + '/sapbert-training-data/done',
Expand Down
169 changes: 169 additions & 0 deletions src/exporters/duckdb_exporters.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# The DuckDB exporter can be used to export particular intermediate files into the
# in-process database engine DuckDB (https://duckdb.org) for future querying.
import csv
import json
import os.path
import pathlib
import time
from collections import Counter, defaultdict

import duckdb

Expand Down Expand Up @@ -225,6 +228,172 @@ def check_for_duplicate_curies(parquet_root, duckdb_filename, duplicate_curies_t
""").write_csv(duplicate_curies_tsv, sep="\t")


def generate_prefix_report(parquet_root, duckdb_filename, prefix_report_json, prefix_report_tsv):
"""
Generate a report about all the prefixes within this system.

See thoughts at https://github.com/TranslatorSRI/Babel/issues/359

:param parquet_root: The root directory for the Parquet files. We expect these to have subdirectories named
e.g. `filename=AnatomicalEntity/Clique.parquet`, etc.
:param duckdb_filename: A temporary DuckDB file to use.
:param prefix_report_json: The prefix report as JSON.
:param prefix_report_tsv: The prefix report as TSV.
"""

db = setup_duckdb(duckdb_filename)
edges = db.read_parquet(
os.path.join(parquet_root, "**/Edge.parquet"),
hive_partitioning=True
)
cliques = db.read_parquet(
os.path.join(parquet_root, "**/Clique.parquet"),
hive_partitioning=True
)

# Step 1. Generate a by-prefix summary.
curie_prefix_summary = db.sql("""
SELECT
split_part(curie, ':', 1) AS curie_prefix,
COUNT(curie) AS curie_count,
COUNT(DISTINCT curie) AS curie_distinct_count,
COUNT(DISTINCT clique_leader) AS clique_distinct_count,
STRING_AGG(edges.filename, '||' ORDER BY edges.filename ASC) AS filenames
FROM
edges
GROUP BY
curie_prefix
ORDER BY
curie_prefix ASC
""")
rows = curie_prefix_summary.fetchall()

by_curie_prefix_results = {}
for row in rows:
curie_prefix = row[0]

filename_counts = Counter(row[4].split('||'))

by_curie_prefix_results[curie_prefix] = {
'curie_count': row[1],
'curie_distinct_count': row[2],
'clique_distinct_count': row[3],
'filenames': filename_counts,
}

# Step 2. Generate a by-clique summary.
clique_summary = db.sql("""
SELECT
filename,
split_part(clique_leader, ':', 1) AS clique_leader_prefix,
COUNT(DISTINCT clique_leader) AS clique_count,
STRING_AGG(split_part(curie, ':', 1), '||' ORDER BY curie ASC) AS curie_prefixes
FROM
edges
GROUP BY
filename, clique_leader_prefix
ORDER BY
filename ASC, clique_leader_prefix ASC
""")
rows = clique_summary.fetchall()

by_clique_results = {}
for row in rows:
filename = row[0]
clique_leader_prefix = row[1]
clique_count = row[2]
curie_prefixes = row[3].split('||')
curie_prefix_counts = Counter(curie_prefixes)

if clique_leader_prefix not in by_clique_results:
by_clique_results[clique_leader_prefix] = {
'count_cliques': 0,
'by_file': {}
}

by_clique_results[clique_leader_prefix]['count_cliques'] += clique_count

if filename not in by_clique_results[clique_leader_prefix]['by_file']:
by_clique_results[clique_leader_prefix]['by_file'][filename] = defaultdict(int)

for curie_prefix in curie_prefix_counts.keys():
by_clique_results[clique_leader_prefix]['by_file'][filename][curie_prefix] += curie_prefix_counts[curie_prefix]

# Generate totals.
total_cliques = 0
total_curies = 0
for curie_leader_prefix in by_clique_results.keys():
count_curies = 0
total_cliques += by_clique_results[curie_leader_prefix]['count_cliques']
for filename in by_clique_results[curie_leader_prefix]['by_file'].keys():
count_curies += sum(by_clique_results[curie_leader_prefix]['by_file'][filename].values())
by_clique_results[curie_leader_prefix]['count_curies'] = count_curies
total_curies += count_curies

# Step 3. Write out prefix report in JSON.
with open(prefix_report_json, 'w') as fout:
json.dump({
'count_cliques': total_cliques,
'count_curies': total_curies,
'by_clique': by_clique_results,
'by_curie_prefix': by_curie_prefix_results
}, fout, indent=2, sort_keys=True)

# Step 4. Write out prefix report in TSV. This is primarily based on the by-clique information, but also
# includes totals.
with open(prefix_report_tsv, 'w') as fout:
csv_writer = csv.DictWriter(fout, dialect='excel-tab', fieldnames=[
'Clique prefix', 'Filename', 'Clique count', 'CURIEs'
])
csv_writer.writeheader()

curie_totals = defaultdict(int)

for prefix in sorted(by_clique_results.keys()):
by_clique_result = by_clique_results[prefix]
by_file = by_clique_result['by_file']

count_cliques = by_clique_result['count_cliques']
filename_curie_counts = defaultdict(int)

for filename in by_file.keys():
curie_prefixes_sorted = map(lambda x: f"{x[0]}: {x[1]}", sorted(by_file[filename].items(), key=lambda x: x[1], reverse=True))

filename_count_curies = 0
for curie_prefix in by_file[filename]:
curie_totals[curie_prefix] += by_file[filename][curie_prefix]
filename_curie_counts[curie_prefix] += by_file[filename][curie_prefix]
filename_count_curies += by_file[filename][curie_prefix]

csv_writer.writerow({
'Clique prefix': prefix,
'Filename': filename,
'Clique count': count_cliques,
'CURIEs': f"{filename_count_curies}: " + ', '.join(curie_prefixes_sorted)
})

filename_curie_sorted = map(lambda x: f"{x[0]}: {x[1]}", sorted(filename_curie_counts.items(), key=lambda x: x[1], reverse=True))
count_curies = sum(filename_curie_counts.values())

# Don't bother with a total for the prefix unless there are at least two files.
if len(by_file) > 1:
csv_writer.writerow({
'Clique prefix': prefix,
'Filename': f"Total for prefix {prefix}",
'Clique count': count_cliques,
'CURIEs': f"{count_curies}: " + ', '.join(filename_curie_sorted)
})

curie_totals_sorted = map(lambda x: f"{x[0]}: {x[1]}", sorted(curie_totals.items(), key=lambda x: x[1], reverse=True))
total_curies = sum(curie_totals.values())
csv_writer.writerow({
'Clique prefix': 'Total cliques',
'Filename': '',
'Clique count': total_cliques,
'CURIEs': f"{total_curies}: " + ', '.join(curie_totals_sorted)
})


def get_label_distribution(duckdb_filename, output_filename):
db = setup_duckdb(duckdb_filename)

Expand Down
24 changes: 24 additions & 0 deletions src/snakefiles/duckdb.snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,27 @@ rule check_for_duplicate_curies:
duplicate_curies = config['output_directory'] + '/reports/duckdb/duplicate_curies.tsv',
run:
duckdb_exporters.check_for_duplicate_curies(input.parquet_dir, output.duckdb_filename, output.duplicate_curies)

rule generate_prefix_report:
input:
config['output_directory'] + '/duckdb/done',
parquet_dir = config['output_directory'] + '/duckdb/parquet/',
output:
duckdb_filename = config['output_directory'] + '/duckdb/prefix_report.duckdb',
prefix_report_json = config['output_directory'] + '/reports/duckdb/prefix_report.json',
prefix_report_tsv = config['output_directory'] + '/reports/duckdb/prefix_report.tsv',
run:
duckdb_exporters.generate_prefix_report(input.parquet_dir, output.duckdb_filename,
output.prefix_report_json,
output.prefix_report_tsv)

rule all_duckdb_reports:
input:
config['output_directory'] + '/duckdb/done',
identically_labeled_cliques_tsv = config['output_directory'] + '/reports/duckdb/identically_labeled_cliques.tsv',
duplicate_curies = config['output_directory'] + '/reports/duckdb/duplicate_curies.tsv',
prefix_report = config['output_directory'] + '/reports/duckdb/prefix_report.json',
output:
x = config['output_directory'] + '/reports/duckdb/done',
shell:
"echo 'done' >> {output.x}"