Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vs 629 failure to retrieve job information during ingest #8047

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a5c5bc9
getting rid of unnecessary (and breaking) location=US references
koncheto-broad Sep 23, 2022
be66892
adding dockstore branch
koncheto-broad Sep 23, 2022
f4a3bce
tweaking utils.py to account for the job's location as well
koncheto-broad Sep 26, 2022
8980fca
adding more debug info to the python util.py to view the location (wi…
koncheto-broad Sep 26, 2022
b3d7074
adding more debug info to the python util.py to view the location (wi…
koncheto-broad Sep 26, 2022
38be38c
updating GvsPopulateAltAllele.wdl to refer to a new docker images to …
koncheto-broad Sep 26, 2022
850adb0
cleaning up debug outputs in utils.py, and updating dockstore info fo…
koncheto-broad Sep 27, 2022
244dc12
adding in user defined function median to replace the bqutil version …
koncheto-broad Sep 30, 2022
d8eaee6
updating GvsPrepareRangeCallset to use the latest docker for this bra…
koncheto-broad Oct 5, 2022
4e7fcd1
updating dockstore yaml
koncheto-broad Oct 5, 2022
55958f7
Removing dockstore entries for working branches
koncheto-broad Oct 6, 2022
b1c2567
Merge branch 'ah_var_store' into VS-629-failure-to-retrieve-job-infor…
koncheto-broad Oct 6, 2022
080a692
cleanup
koncheto-broad Oct 6, 2022
25dd596
merging in alping changes from ah_var_store
koncheto-broad Oct 6, 2022
6876f2f
updating path for depositing jars to new project bucket
koncheto-broad Oct 7, 2022
2d959a7
adding wdls back to dockstore for final testing post-merge
koncheto-broad Oct 7, 2022
ced43b4
atempting smaller docker image in PopulateAltAllele
koncheto-broad Oct 7, 2022
6a39c0d
fixing location cleanup error?
koncheto-broad Oct 7, 2022
96859b7
continuing to update and slim down images
koncheto-broad Oct 7, 2022
5c8f953
comments from PR
koncheto-broad Oct 7, 2022
5e24ad7
replace this docker image... but later
koncheto-broad Oct 7, 2022
ab5b85d
okay, at least updating it to the same version that CreateFilterSet w…
koncheto-broad Oct 7, 2022
81b4213
Updated to fully use alpine versions of libraries.
koncheto-broad Oct 7, 2022
9b0b01a
Ensuring it is using the alping image WITH the location changes
koncheto-broad Oct 11, 2022
840a5e1
Ensuring it is using the alpine image WITH the location changes
koncheto-broad Oct 11, 2022
bfa4d36
updating GvsExtractExtract with alpine docker images (and a newer but…
koncheto-broad Oct 11, 2022
011ee4f
merging in changes from ah_var_store
koncheto-broad Oct 11, 2022
9b1b981
Testing with slightly smaller docker
koncheto-broad Oct 11, 2022
38e90c3
adding branch back in to dockstore
koncheto-broad Oct 11, 2022
07c6b83
cleaning up dockstore branches
koncheto-broad Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions scripts/variantstore/utils/pushGATKtoGCS.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
JAR=$(ls -t build/libs/*-SNAPSHOT-local.jar | head -1)
DT=$(date '+%Y%m%d')

branch=$(git symbolic-ref HEAD 2>/dev/null)
branch=${branch#refs/heads/}

DEST="gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/${branch}_${DT}/"

gsutil cp $JAR $DEST

echo "Copied to $DEST$(basename $JAR)"
koncheto-broad marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsAssignIds.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ task CreateCostObservabilityTable {
PARTITION_STRING="--time_partitioning_field call_start_timestamp --time_partitioning_type DAY"
echo "making table $TABLE"
echo '~{cost_observability_json}' > schema.json
bq --location=US mk ${PARTITION_STRING} --project_id=~{project_id} $TABLE schema.json
bq ${PARTITION_STRING} --project_id=~{project_id} $TABLE schema.json
fi
>>>
runtime {
Expand Down
4 changes: 2 additions & 2 deletions scripts/variantstore/wdl/GvsCallsetCost.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ task CoreStorageModelSizes {
local table_pattern="$1"
local output_file_name="$2"

bq query --location=US --project_id='~{project_id}' --format=csv --use_legacy_sql=false \
bq query --project_id='~{project_id}' --format=csv --use_legacy_sql=false \
"SELECT round(sum(total_billable_bytes) / (1024*1024*1024),2) \
FROM \`~{project_id}.~{dataset_name}.INFORMATION_SCHEMA.PARTITIONS\` \
WHERE table_name LIKE '${table_pattern}'" | tail -1 > ${output_file_name}
Expand Down Expand Up @@ -118,7 +118,7 @@ task ReadCostObservabilityTable {
String call_set_identifier
}
command <<<
bq query --location=US --project_id='~{project_id}' --format=prettyjson --use_legacy_sql=false \
bq query --project_id='~{project_id}' --format=prettyjson --use_legacy_sql=false \
"SELECT step, event_key, round(sum(event_bytes) / (1024*1024*1024), 2) AS sum_event_gibibytes \
FROM \`~{project_id}.~{dataset_name}.cost_observability\` \
WHERE call_set_identifier = '~{call_set_identifier}' GROUP BY step, event_key ORDER BY step" \
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsCreateTables.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ task CreateTables {
if [ $BQ_SHOW_RC -ne 0 ]; then
echo "making table $TABLE"
echo '~{schema_json}' > schema.json
bq --location=US mk ${PARTITION_STRING} ${CLUSTERING_STRING} --project_id=~{project_id} $TABLE schema.json
bq mk ${PARTITION_STRING} ${CLUSTERING_STRING} --project_id=~{project_id} $TABLE schema.json
fi
done
>>>
Expand Down
4 changes: 2 additions & 2 deletions scripts/variantstore/wdl/GvsExtractCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ task ValidateFilterSetName {

echo "project_id = ~{query_project}" > ~/.bigqueryrc

OUTPUT=$(bq --location=US --project_id=~{query_project} --format=csv query --use_legacy_sql=false ~{bq_labels} "SELECT filter_set_name as available_filter_set_names FROM \`~{data_project}.~{data_dataset}.filter_set_info\` GROUP BY filter_set_name")
OUTPUT=$(bq --project_id=~{query_project} --format=csv query --use_legacy_sql=false ~{bq_labels} "SELECT filter_set_name as available_filter_set_names FROM \`~{data_project}.~{data_dataset}.filter_set_info\` GROUP BY filter_set_name")
FILTERSETS=${OUTPUT#"available_filter_set_names"}

if [[ $FILTERSETS =~ "~{filter_set_name}" ]]; then
Expand Down Expand Up @@ -468,7 +468,7 @@ task GenerateSampleListFile {

echo "project_id = ~{query_project}" > ~/.bigqueryrc

bq --location=US --project_id=~{query_project} --format=csv query --use_legacy_sql=false ~{bq_labels} "SELECT sample_name FROM ~{fq_samples_to_extract_table}" | sed 1d > sample-name-list.txt
bq --project_id=~{query_project} --format=csv query --use_legacy_sql=false ~{bq_labels} "SELECT sample_name FROM ~{fq_samples_to_extract_table}" | sed 1d > sample-name-list.txt

if [ -n "$OUTPUT_GCS_DIR" ]; then
gsutil cp sample-name-list.txt ${OUTPUT_GCS_DIR}/
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsImportGenomes.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ task SetIsLoadedColumn {
echo "project_id = ~{project_id}" > ~/.bigqueryrc

# set is_loaded to true if there is a corresponding vet table partition with rows for that sample_id
bq --location=US --project_id=~{project_id} query --format=csv --use_legacy_sql=false ~{bq_labels} \
bq --project_id=~{project_id} query --format=csv --use_legacy_sql=false ~{bq_labels} \
'UPDATE `~{dataset_name}.sample_info` SET is_loaded = true WHERE sample_id IN (SELECT CAST(partition_id AS INT64) from `~{dataset_name}.INFORMATION_SCHEMA.PARTITIONS` WHERE partition_id NOT LIKE "__%" AND total_logical_bytes > 0 AND table_name LIKE "vet_%") OR sample_id IN (SELECT sample_id FROM `~{dataset_name}.sample_load_status` GROUP BY 1 HAVING COUNT(1) = 2)'
>>>
runtime {
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsIngestTieout.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ task IngestTieout {
check_table() {
local table_name=$1

bq query --location=US --project_id=~{project} --format=csv --use_legacy_sql=false \
bq query --project_id=~{project} --format=csv --use_legacy_sql=false \
"select actual.sample_id, expected.sample_id from
(select sample_id, count(*) as count from \`gvs-internal.~{dataset_name}.${table_name}\` group by sample_id) actual full outer join
(select sample_id, count(*) as count from \`gvs-internal.~{reference_dataset_name}.${table_name}\` group by sample_id) expected on actual.sample_id = expected.sample_id
Expand Down
8 changes: 4 additions & 4 deletions scripts/variantstore/wdl/GvsPopulateAltAllele.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ task GetMaxSampleId {
set -o errexit -o nounset -o xtrace -o pipefail

echo "project_id = ~{project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false \
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false \
'SELECT IFNULL(MAX(sample_id), 0) AS max_sample_id FROM `~{dataset_name}.alt_allele`' > num_rows.csv

# remove the header row from the CSV file
Expand Down Expand Up @@ -122,7 +122,7 @@ task GetVetTableNames {
fi

# use the number calculated from the above math to get the vet_* table names to grab data from
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false ~{bq_labels} \
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false ~{bq_labels} \
"SELECT table_name FROM \`~{project_id}.~{dataset_name}.INFORMATION_SCHEMA.TABLES\` WHERE table_name LIKE 'vet_%' AND CAST(SUBSTRING(table_name, length('vet_') + 1) AS INT64) >= ${min_vat_table_num}" > vet_tables.csv

# remove the header row from the CSV file, count the number of tables and divide them up into
Expand Down Expand Up @@ -168,7 +168,7 @@ task CreateAltAlleleTable {
set -e

echo "project_id = ~{project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false ~{bq_labels} \
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false ~{bq_labels} \
'CREATE TABLE IF NOT EXISTS `~{project_id}.~{dataset_name}.alt_allele` (
location INT64,
sample_id INT64,
Expand Down Expand Up @@ -243,7 +243,7 @@ task PopulateAltAlleleTable {
done
>>>
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-09-28-slim"
docker: "us.gcr.io/broad-dsde-methods/variantstore:vs_629_location_failures_22_09_26"
memory: "3 GB"
disks: "local-disk 10 HDD"
cpu: 1
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsPrepareRangesCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ task PrepareRangesCallsetTask {
}

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-09-28-slim"
docker: "us.gcr.io/broad-dsde-methods/variantstore:vs_629_location_failures_22_09_26"
memory: "3 GB"
disks: "local-disk 100 HDD"
bootDiskSizeGb: 15
Expand Down
4 changes: 2 additions & 2 deletions scripts/variantstore/wdl/GvsQuickstartIntegration.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ task AssertCostIsTrackedAndExpected {
set -o pipefail

echo "project_id = ~{project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false \
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false \
"SELECT call, step, event_key, sum(event_bytes) \
FROM \`~{dataset_name}.cost_observability\` \
GROUP BY call, step, event_key \
Expand Down Expand Up @@ -314,7 +314,7 @@ task AssertTableSizesAreExpected {
set -o pipefail

echo "project_id = ~{project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false \
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false \
"SELECT 'vet_total' AS total_name, sum(total_billable_bytes) AS total_bytes FROM \
\`~{dataset_name}.INFORMATION_SCHEMA.PARTITIONS\` WHERE table_name LIKE 'vet_%' \
UNION ALL \
Expand Down
6 changes: 3 additions & 3 deletions scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ task GetBQTableLastModifiedDatetime {
# bq needs the project name to be separate by a colon
DATASET_TABLE_COLON=$(echo ~{fq_table} | sed 's/\./:/')

LASTMODIFIED=$(bq --location=US --project_id=~{query_project} --format=json show ${DATASET_TABLE_COLON} | python3 -c "import sys, json; print(json.load(sys.stdin)['lastModifiedTime']);")
LASTMODIFIED=$(bq --project_id=~{query_project} --format=json show ${DATASET_TABLE_COLON} | python3 -c "import sys, json; print(json.load(sys.stdin)['lastModifiedTime']);")
if [[ $LASTMODIFIED =~ ^[0-9]+$ ]]; then
echo $LASTMODIFIED
else
Expand Down Expand Up @@ -191,7 +191,7 @@ task GetBQTablesMaxLastModifiedTimestamp {

echo "project_id = ~{query_project}" > ~/.bigqueryrc

bq --location=US --project_id=~{query_project} query --format=csv --use_legacy_sql=false \
bq --project_id=~{query_project} query --format=csv --use_legacy_sql=false \
"SELECT UNIX_MICROS(MAX(last_modified_time)) last_modified_time FROM \`~{data_project}\`.~{data_dataset}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name like '~{sep="' OR table_name like '" table_patterns}'" > results.txt

tail -1 results.txt | cut -d, -f1 > max_last_modified_timestamp.txt
Expand Down Expand Up @@ -369,7 +369,7 @@ task GetNumSamplesLoaded {
set -o errexit -o nounset -o xtrace -o pipefail

echo "project_id = ~{project_id}" > ~/.bigqueryrc
bq query --location=US --project_id=~{project_id} --format=csv --use_legacy_sql=false '
bq query --project_id=~{project_id} --format=csv --use_legacy_sql=false '

SELECT COUNT(*) FROM `~{fq_sample_table}` WHERE
is_loaded = true AND
Expand Down
3 changes: 2 additions & 1 deletion scripts/variantstore/wdl/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def execute_with_retry(client, label, sql):
query = client.query(sql, job_config=job_config)
print(f"STARTING - {label} (jobid: {query.job_id})")
results = query.result()
job = client.get_job(query.job_id)
print(f"CHECKING ON - {query.job_id} at {query.location}")
job = client.get_job(query.job_id, location=query.location)
mb_billed = int(0 if job.total_bytes_billed is None else job.total_bytes_billed) / (
1024 * 1024)
print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class ExtractFeaturesBQ {
private final static String FEATURE_EXTRACT_USER_DEFINED_FUNCTIONS =
"org/broadinstitute/hellbender/tools/gvs/filtering/udf_freq_table.sql";

private final static String FEATURE_EXTRACT_USER_DEFINED_FUNCTION_MEDIAN =
"org/broadinstitute/hellbender/tools/gvs/filtering/udf_median.sql";

private final static String VQSR_TRAINING_SITES_TABLE =
"broad-dsp-spec-ops.joint_genotyping_ref.vqsr_training_sites_*";

Expand Down Expand Up @@ -43,7 +46,9 @@ public static String getVQSRFeatureExtractQueryString(final TableReference altAl
public static String getVQSRFeatureExtractUserDefinedFunctionsString() {
try {
File file = Resource.getResourceContentsAsFile(FEATURE_EXTRACT_USER_DEFINED_FUNCTIONS);
return FileUtils.readFileToString(file, "UTF-8");
// also grab our definition of median
File fileMedian = Resource.getResourceContentsAsFile(FEATURE_EXTRACT_USER_DEFINED_FUNCTION_MEDIAN);
return FileUtils.readFileToString(file, "UTF-8") + FileUtils.readFileToString(fileMedian, "UTF-8");
koncheto-broad marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception ioe) {
throw new GATKException("Unable to read udf file from resources", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static BigQueryResultAndStatistics submitQueryAndWaitForResults(final Bi

// Get the results.
logger.info("Retrieving query results...");
final QueryResponse response = bigQuery.getQueryResults(jobId);
final QueryResponse response = bigQuery.getQueryResults(queryJob.getJobId());
final TableResult result;
final JobStatistics.QueryStatistics queryStatistics;
try {
Expand All @@ -392,7 +392,7 @@ private static BigQueryResultAndStatistics submitQueryAndWaitForResults(final Bi
return new BigQueryResultAndStatistics(result, queryStatistics);
}

private static long getQueryCostBytesProcessedEstimate(String queryString, String projectID) {
private static long getQueryCostBytesProcessedEstimate(String queryString, String projectID, String datasetID) {
final QueryJobConfiguration dryRunQueryConfig =
QueryJobConfiguration.newBuilder( queryString )
.setUseLegacySql(false)
Expand All @@ -401,11 +401,20 @@ private static long getQueryCostBytesProcessedEstimate(String queryString, Strin
.setPriority(QueryJobConfiguration.Priority.INTERACTIVE)
.build();

Job dryRunJob = getBigQueryEndPoint(projectID).create(JobInfo.newBuilder(dryRunQueryConfig).build());
final BigQuery bigQuery = getBigQueryEndPoint(projectID);

// Get the location for the BQ dataset, because it isn't always able to be discerned by parsing the sql and will
// fail if a location was specified at the time of dataset creation
DatasetId datasetIDObject = DatasetId.of(projectID, datasetID);
Dataset dataset = bigQuery.getDataset(datasetIDObject);
String location = dataset.getLocation();
// By explicitly creating a JobId, we can set the location in which the job to run
final JobId jobId = JobId.newBuilder().setLocation(location).build();

Job dryRunJob = getBigQueryEndPoint(projectID).create(JobInfo.newBuilder(dryRunQueryConfig).setJobId(jobId).build());
long bytesProcessed = ((JobStatistics.QueryStatistics) dryRunJob.getStatistics()).getTotalBytesProcessed();
return bytesProcessed;
}

public static StorageAPIAvroReaderAndBigQueryStatistics executeQueryWithStorageAPI(final String queryString,
final List<String> fieldsToRetrieve,
final String projectID,
Expand All @@ -420,7 +429,7 @@ public static StorageAPIAvroReaderAndBigQueryStatistics executeQueryWithStorageA

logger.info(queryStringWithUDFs);

long bytesProcessed = getQueryCostBytesProcessedEstimate(queryStringWithUDFs, projectID);
long bytesProcessed = getQueryCostBytesProcessedEstimate(queryStringWithUDFs, projectID, datasetID);
logger.info(String.format("Estimated %s MB scanned", bytesProcessed/1000000));

// UDFs need to come before the CREATE TABLE clause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ WITH
ref,
allele,
IFNULL(SUM(qual),0) as RAW_QUAL,
`bqutil`.fn.median(ARRAY_AGG( raw_mqranksum_x_10 IGNORE NULLS)) / 10.0 as AS_MQRankSum,
median(ARRAY_AGG( raw_mqranksum_x_10 IGNORE NULLS)) / 10.0 as AS_MQRankSum,
freq_table(ARRAY_AGG(raw_mqranksum_x_10 IGNORE NULLS)) AS_MQRankSum_ft,
`bqutil`.fn.median(ARRAY_AGG(raw_readposranksum_x_10 IGNORE NULLS)) / 10.0 as AS_ReadPosRankSum,
median(ARRAY_AGG(raw_readposranksum_x_10 IGNORE NULLS)) / 10.0 as AS_ReadPosRankSum,
freq_table(ARRAY_AGG(raw_readposranksum_x_10 IGNORE NULLS)) as AS_ReadPosRankSum_ft,
IFNULL(SUM(RAW_MQ),0) as RAW_MQ,
IFNULL(SUM((SELECT SUM(CAST(x AS INT64)) FROM UNNEST(SPLIT(call_ad, ",")) x)),0) SUM_AD,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This is a local implementation of median, as defined in the open source bigquery udf library. It was previously
-- referenced at "`bqutil`.fn.median"
-- However, bqutil is not installed in locations that are not "US" so we will define it ourselves.
-- https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/median.sqlx

CREATE TEMPORARY FUNCTION median(arr ANY TYPE) AS ((
SELECT IF (
MOD(ARRAY_LENGTH(arr), 2) = 0,
(arr[OFFSET(DIV(ARRAY_LENGTH(arr), 2) - 1)] + arr[OFFSET(DIV(ARRAY_LENGTH(arr), 2))]) / 2,
arr[OFFSET(DIV(ARRAY_LENGTH(arr), 2))]
)
FROM (SELECT ARRAY_AGG(x ORDER BY x) AS arr FROM UNNEST(arr) AS x)
));