Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WDL to extract Avro files for Hail import [VS-579] #7981

Merged
merged 24 commits into from
Aug 10, 2022
8 changes: 8 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ workflows:
- master
- ah_var_store
- cost_robustitude
- name: GvsExtractAvroFilesForHail
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
filters:
branches:
- master
- ah_var_store
- vs_579_vds_avro_wdl
- name: MitochondriaPipeline
subclass: WDL
primaryDescriptorPath: /scripts/mitochondria_m2_wdl/MitochondriaPipeline.wdl
Expand Down
200 changes: 200 additions & 0 deletions scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
version 1.0

workflow GvsExtractAvroFilesForHail {
input {
String project_id
String dataset
String filter_set_name
}

call OutputPath { input: go = true }

call ExtractFromNonSuperpartitionedTables {
input:
project_id = project_id,
dataset = dataset,
filter_set_name = filter_set_name,
avro_sibling = OutputPath.out
}

call CountSamples {
input:
project_id = project_id,
dataset = dataset
}

# Superpartitions have max size 4000. The inner '- 1' is so the 4000th (and multiples of 4000) sample lands in the
# appropriate partition, the outer '+ 1' is to iterate over the correct number of partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only thing that I'd want to see tested out (before the tie out) and I'm not sure of the best way to do this...maybe this can wait until we do this with 10k samples

scatter (i in range(((CountSamples.num_samples - 1) / 4000) + 1)) {
call ExtractFromSuperpartitionedTables {
input:
project_id = project_id,
dataset = dataset,
filter_set_name = filter_set_name,
avro_sibling = OutputPath.out,
table_index = i + 1 # 'i' is 0-based so add 1.
}
}
output {
String output_prefix = ExtractFromNonSuperpartitionedTables.output_prefix
}
}


task OutputPath {
meta {
description: "Does nothing but produce the cloud path to its stdout."
}
input {
Boolean go = true
}
command <<<
>>>
output {
File out = stdout()
}
runtime {
docker: "ubuntu:latest"
}
}


task CountSamples {
meta {
description: "Counts the number of samples in the sample_info table efficiently."
}
input {
String project_id
String dataset
}
command <<<
python3 <<FIN

from google.cloud import bigquery

client = bigquery.Client(project="~{project_id}")
sample_info_table_id = f'~{project_id}.~{dataset}.sample_info'
sample_info_table = client.get_table(sample_info_table_id)

print(str(sample_info_table.num_rows))

FIN
>>>

output {
Int num_samples = read_int(stdout())
}
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_2022_08_01"
}
}


task ExtractFromNonSuperpartitionedTables {
meta {
description: "Extracts from the non-superpartitioned tables: sample_info, filter_set_info, filter_set_sites"
}
input {
String project_id
String dataset
String filter_set_name
String avro_sibling
}
parameter_meta {
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written."
}
command <<<
set -o errexit -o nounset -o xtrace -o pipefail
echo "project_id = ~{project_id}" > ~/.bigqueryrc

avro_prefix="$(dirname ~{avro_sibling})/avro"
echo $avro_prefix > "avro_prefix.out"

bq query --nouse_legacy_sql --project_id=~{project_id} "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/sample_mapping/sample_mapping_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT sample_id, sample_name, '40',
'gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list' as intervals_file
FROM \`~{project_id}.~{dataset}.sample_info\`
ORDER BY sample_id
"

bq query --nouse_legacy_sql --project_id=~{project_id} "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/vqsr_filtering/vqsr_filtering_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, type as model, ref, alt, vqslod, yng_status
FROM \`~{project_id}.~{dataset}.filter_set_info\`
WHERE filter_set_name = '~{filter_set_name}'
ORDER BY location
"

bq query --nouse_legacy_sql --project_id=~{project_id} "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/site_filtering/site_filtering_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, filters
FROM \`~{project_id}.~{dataset}.filter_set_sites\`
WHERE filter_set_name = '~{filter_set_name}'
ORDER BY location
"
>>>

output {
Boolean done = true
String output_prefix = read_string("avro_prefix.out")
}

runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:latest"
}
}


task ExtractFromSuperpartitionedTables {
meta {
description: "Extracts from the superpartitioned tables: vet_<table index>, ref_ranges_<table index>"
}
input {
String project_id
String dataset
String filter_set_name
String avro_sibling
Int table_index
}
parameter_meta {
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written."
table_index: "1-based index for the superpartitioned ref_ranges and vet tables to be extracted by this call."
}
command <<<
set -o errexit -o nounset -o xtrace -o pipefail
echo "project_id = ~{project_id}" > ~/.bigqueryrc

avro_prefix="$(dirname ~{avro_sibling})/avro"
str_table_index=$(printf "%03d" ~{table_index})

# These bq exports error out if there are any objects at the sibling level to where output files would be written
# so an extra layer of `vet_${str_table_index}` is inserted here.
bq query --nouse_legacy_sql --project_id=~{project_id} "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/vet/vet_${str_table_index}/vet_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, sample_id, ref, REPLACE(alt,',<NON_REF>','') alt, call_GT as GT, call_AD as AD, call_GQ as GQ, cast(SPLIT(call_pl,',')[OFFSET(0)] as int64) as RGQ
FROM \`~{project_id}.~{dataset}.vet_${str_table_index}\`
ORDER BY location
"

bq query --nouse_legacy_sql --project_id=~{project_id} "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/ref_ranges/ref_ranges_${str_table_index}/ref_ranges_${str_table_index}_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT location, sample_id, length, state
FROM \`~{project_id}.~{dataset}.ref_ranges_${str_table_index}\`
ORDER BY location
"
>>>

output {
Boolean done = true
}

runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:latest"
}
}