Skip to content

Commit

Permalink
add workflow "unpack_archive_to_bucket"; add task "unpack_archive_to_…
Browse files Browse the repository at this point in the history
…bucket_path"

This adds a new WDL task, "unpack_archive_to_bucket_path", to unpack an input tarball to a specified bucket path, with options to set the target destination, layers of wrapping directories to remove (from around the tarball contents), and whether the files extracted should be uploaded directly via pipe or from a temporary location on-disk. This task is intended to run on Terra, but it can be run elsewhere if a Google Cloud auth token is passed in to the task. A corresponding workflow, unpack_archive_to_bucket, is also added.
  • Loading branch information
tomkinsc committed Dec 11, 2024
1 parent b8bb70b commit 7a79db3
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 22 deletions.
39 changes: 22 additions & 17 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/beast_gpu.wdl
testParameterFiles:
- /empty.json
- name: blastoff
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl
testParameterFiles:
- /empty.json
- name: chunk_blast
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl
testParameterFiles:
- /empty.json
- name: classify_kaiju
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/classify_kaiju.wdl
Expand Down Expand Up @@ -100,6 +110,16 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/coverage_table.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
testParameterFiles:
- /empty.json
- name: demux_metag
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/demux_metag.wdl
Expand Down Expand Up @@ -404,23 +424,8 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/bam_to_qiime.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
testParameterFiles:
- /empty.json
- name: blastoff
- name: unpack_archive_to_bucket
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl
testParameterFiles:
- /empty.json
- name: chunk_blast
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl
primaryDescriptorPath: /pipes/WDL/workflows/unpack_archive_to_bucket.wdl
testParameterFiles:
- /empty.json
169 changes: 164 additions & 5 deletions pipes/WDL/tasks/tasks_utils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,165 @@ task concatenate {
}
}

task unpack_archive_to_bucket_path {
meta {
description: "Unpack archive(s) to a target location within a Google Storage bucket"
}
input {
# input archive(s)
Array[File] input_archive_files

# destination for extracted files
String bucket_path_prefix
String? out_dir_name

# gcloud storage options
Boolean clobber_existing = false
String? gcloud_access_token

# tar options
Boolean bypass_disk_and_unpack_directly_to_bucket = false
Int? archive_wrapper_directories_to_strip
String tar_opts = "-v --ignore-zeros --no-ignore-command-error"

# resource requirements
Int disk_size = 500
Int machine_mem_gb = 128
String docker = "quay.io/broadinstitute/viral-core:2.4.0"
}

parameter_meta {
input_archive_files: {
description: "List of input archive files to unpack.",
patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"]
}
bucket_path_prefix: {
description: "The path prefix to the Google Storage bucket location where the archive contents will be unpacked. This must begin with the bucket name, should start with 'gs://', and can include as many sub-directories as desired."
}
out_dir_name: {
description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix."
}
gcloud_access_token: {
description: "Access token for the Google Cloud Storage bucket, if needed to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be used (service/pet account on Terra)."
}
archive_wrapper_directories_to_strip: {
description: "If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar."
}
clobber_existing: {
description: "If true, overwrite files in the target directory of the bucket if they already exist."
}
bypass_disk_and_unpack_directly_to_bucket: {
description: "If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation."
}
tar_opts: {
description: "Options to pass to the tar command during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'"
}
disk_size: {
description: "Size of the disk to allocate for the task, in GB. Note that if multiple files are provided to 'input_archive_files', and extracted data is written to the disk (bypass_disk_and_unpack_directly_to_bucket=false), the extracted data from one archive will be removed before extracting and uploading data from the next input archive."
}
machine_mem_gb: {
description: "Memory to allocate for the task, in GB."
}
}

command <<<
if ~{if(defined(gcloud_access_token)) then 'true' else 'false'}; then
# set access token env var expected by gcloud,
# if provided by the user
export CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}"
fi

# check whether the bucket path prefix begins with "gs://" and if not,
# prepend the 'protocol'; also strip leading or trailing slash if present
# (for flexibility—the user can specify the bucket path prefix with or without the protocol)
bucket_path_prefix=$(echo "~{bucket_path_prefix}" | sed -e 's|^gs://||' -e 's|/$||' -e 's|^/*||' -e 's|^|gs://|')

# check that, excluding the gs:// 'protocol' prefix, the bucket path prefix is not empty
if [ -z "${bucket_path_prefix/#gs:\/\//}" ]; then
echo "ERROR: bucket path prefix is empty" >&2
exit 1
fi

# check whether the user can write to the target bucket
# by trying a simple write action, since we cannot rely on
# the user having the permissions needed to view the IAM policie(ss) that
# determine their (write) access to the bucket
if ! echo "write_test" | gcloud storage cp - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then
echo "ERROR: user does not have write access to the target bucket: ~{bucket_path_prefix}" >&2
exit 1
else
# clean up the test file if the write test was successful
gcloud storage rm "${bucket_path_prefix}/.tmp/test-write-access.txt"
fi

# for each of the input archives provided, extract the contents to the target bucket
# either directly via pipe, or from an intermediate location on disk
for input_archive in ~{sep=' ' input_archive_files}; do
echo "Processing archive: $(basename "${input_archive}")"

# if the user has requested to bypass writing to disk between extraction and upload
if ~{if(bypass_disk_and_unpack_directly_to_bucket) then 'true' else 'false'}; then
echo "Unpacking archive(s) and piping directly to gcloud storage upload processes (bypassing the disk)..."

# TODO: parallelize if needed and if the increased memory usage is acceptable
# either via GNU parallel ( https://www.gnu.org/software/parallel/parallel_examples.html )
# or by simply pushing the tar processes to the background

# pipe each file to a command via stdout, relying GNU tar to pass file information
# out of band via special environment variables set for each file when using the --to-command
#
# documentation here:
# https://www.gnu.org/software/tar/manual/html_section/extract-options.html#Writing-to-an-External-Program
tar ~{tar_opts} -x \
~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \
--to-command='gcloud storage cp ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \
-f "${input_archive}"

# otherwise extract to disk and then upload to the bucket
else
echo 'Extracting archive '$(basename "${input_archive}")' to disk before upload...'

# create a temporary directory to extract the archive contents to
mkdir -p extracted_tmp

# extract the archive to the temporary directory
tar ~{tar_opts} -x \
--directory "./extracted_tmp" \
~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \
-f "${input_archive}"

pushd extracted_tmp

echo "Uploading extracted files to the target bucket..."

# gcloud storage rsync the extracted files to the target bucket in the target directory
gcloud storage rsync \
--recursive \
~{if clobber_existing then "" else "--no-clobber"} \
--verbosity warning \
./ "${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}"

popd
rm -r ./extracted_tmp
fi
done
>>>

runtime {
docker: docker
memory: machine_mem_gb + " GB"
cpu: 16
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
dx_instance_type: "mem3_ssd1_v2_x16"
preemptible: 0
maxRetries: 1
}

output {
}
}

task zcat {
meta {
description: "Glue together a bunch of text files that may or may not be compressed (autodetect among gz,xz,bz2,lz4,zst or uncompressed inputs). Optionally compress the output (depending on requested file extension)"
Expand Down Expand Up @@ -256,7 +415,7 @@ task download_from_url {
# ----
# get the name of the downloaded file
downloaded_file_name="$(basename $(ls -1 | head -n1))"
downloaded_file_name="$(basename "$(ls -1 | head -n1)")"
if [ ! -f "$downloaded_file_name" ]; then
echo "Could not locate downloaded file \"$downloaded_file_name\""
Expand All @@ -274,12 +433,12 @@ task download_from_url {
# since wget stores both in a single file separated by a couple newlines
if [[ "~{url_to_download}" =~ ^https?:// ]] && ~{if save_response_header_to_file then "true" else "false"}; then
echo "Saving response headers separately..."
csplit -f response -s tmp/${downloaded_file_name} $'/^\r$/+1' && \
mv response00 ../${downloaded_file_name}.headers && \
mv response01 ${downloaded_file_name} && \
csplit -f response -s "tmp/${downloaded_file_name}" $'/^\r$/+1' && \
mv response00 "../${downloaded_file_name}.headers" && \
mv response01 "${downloaded_file_name}" && \
rm "tmp/$downloaded_file_name"
else
mv tmp/${downloaded_file_name} ${downloaded_file_name}
mv "tmp/${downloaded_file_name} ${downloaded_file_name}"
fi
# alternative python implementation to split response headers from body
# via https://stackoverflow.com/a/75483099
Expand Down
24 changes: 24 additions & 0 deletions pipes/WDL/workflows/unpack_archive_to_bucket.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version 1.0

import "../tasks/tasks_utils.wdl" as tasks_utils
import "../tasks/tasks_terra.wdl" as tasks_terra

workflow unpack_archive_to_bucket {
meta {
description: "Unpack archive(s) to a target location within a Google Storage bucket"
author: "Broad Viral Genomics"
email: "[email protected]"

allowNestedInputs: true
}

input {
String? gcloud_auth_token
}

call tasks_terra.check_terra_env

if( (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) || defined(gcloud_auth_token) ) {
call tasks_utils.unpack_archive_to_bucket_path
}
}

0 comments on commit 7a79db3

Please sign in to comment.