Skip to content

Commit

Permalink
unpack_archive_to_bucket_path: allow additional opts to be passed to …
Browse files Browse the repository at this point in the history
…`gcloud storage cp`; reorder of inputs and their descriptions
  • Loading branch information
tomkinsc committed Dec 12, 2024
1 parent 4cf59d4 commit cfa65c3
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 36 deletions.
13 changes: 3 additions & 10 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ task check_terra_env {

# create Terra-related output files
touch user_email.txt
touch workspace_id.txt
touch workspace_name.txt
touch workspace_namespace.txt
touch workspace_bucket_path.txt
Expand All @@ -56,6 +57,7 @@ task check_terra_env {
touch method_version.txt
touch method_source.txt
touch method_path.txt
touch top_level_submission_id.txt

# disable the version update alert messages gcloud sometimes emits when executing any command
gcloud config set component_manager/disable_update_check true
Expand Down Expand Up @@ -134,7 +136,7 @@ task check_terra_env {
WORKSPACE_NAME="$(jq -cr '.workspace.name | select (.!=null)' workspace_info.json | tee workspace_name.txt)"
WORKSPACE_NAME_URL_ENCODED="$(jq -rn --arg x "${WORKSPACE_NAME}" '$x|@uri')"
WORKSPACE_NAMESPACE="$(jq -cr '.workspace.namespace | select (.!=null)' workspace_info.json | tee workspace_namespace.txt)"
WORKSPACE_BUCKET="$(echo gs://${WORKSPACE_ID} | tee workspace_bucket_path.txt)"
WORKSPACE_BUCKET="$(echo "gs://${WORKSPACE_ID}" | tee workspace_bucket_path.txt)"

echo "WORKSPACE_NAME: ${WORKSPACE_NAME}"
echo "WORKSPACE_NAMESPACE: ${WORKSPACE_NAMESPACE}"
Expand Down Expand Up @@ -194,15 +196,6 @@ task check_terra_env {
else
echo "Not running on Terra+GCP"
fi
ls -1 /sys
echo "--"
ls -1 /sys/fs
echo "--"
ls -1 /sys/fs/cgroup
echo "-- memory.peak:"
cat /sys/fs/cgroup/memory.peak
echo "--"
#ls -1 /sys/fs/cgroup/memory
echo -n'' "MEM_BYTES: "; { if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } | tee MEM_BYTES
>>>
output {
Expand Down
59 changes: 38 additions & 21 deletions pipes/WDL/tasks/tasks_utils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,24 @@ task unpack_archive_to_bucket_path {
String bucket_path_prefix
String? out_dir_name

# gcloud storage options
Boolean clobber_existing = false
String? gcloud_access_token

# tar options
Boolean bypass_disk_and_unpack_directly_to_bucket = false
Int? archive_wrapper_directories_to_strip
String tar_opts = "-v --ignore-zeros --no-ignore-command-error"

# gcloud storage options
Boolean clobber_existing = false
String? gcloud_access_token
String gcloud_storage_cp_opts = ""

# resource requirements
# execution and resource requirements
Int disk_size = 500
Int machine_mem_gb = 128
String docker = "quay.io/broadinstitute/viral-core:2.4.0"
}

parameter_meta {
# data I/O inputs
input_archive_files: {
description: "List of input archive files to unpack.",
patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"]
Expand All @@ -65,27 +67,40 @@ task unpack_archive_to_bucket_path {
out_dir_name: {
description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix."
}
gcloud_access_token: {
description: "Access token for the Google Cloud Storage bucket, if needed to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be used (service/pet account on Terra)."

# tar params
bypass_disk_and_unpack_directly_to_bucket: {
description: "(tar) If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation."
}
archive_wrapper_directories_to_strip: {
description: "If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar."
description: "(tar) If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar."
}
tar_opts: {
description: "(tar) Options to pass to GNU tar during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'"
}

# 'gcloud storage cp' params
clobber_existing: {
description: "If true, overwrite files in the target directory of the bucket if they already exist."
description: "(gcloud storage cp) If true, overwrite files in the target directory of the bucket if they already exist."
}
bypass_disk_and_unpack_directly_to_bucket: {
description: "If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation."
gcloud_access_token: {
description: "(gcloud storage cp) Access token for the Google Cloud Storage bucket, for account authorized to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be obtained via 'gcloud auth print-access-token' for the active authenticated user (on Terra, the service worker/'pet' account)."
}
tar_opts: {
description: "Options to pass to the tar command during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'"
gcloud_storage_cp_opts: {
description: "(gcloud storage cp) Additional options to pass to the 'gcloud storage cp' command at the time of upload."
}


# execution and resource requirements
disk_size: {
description: "Size of the disk to allocate for the task, in GB. Note that if multiple files are provided to 'input_archive_files', and extracted data is written to the disk (bypass_disk_and_unpack_directly_to_bucket=false), the extracted data from one archive will be removed before extracting and uploading data from the next input archive."
}
machine_mem_gb: {
description: "Memory to allocate for the task, in GB."
}
docker: {
description: "Docker image to use for the task. For this task, the image must provide GNU tar and the google-cloud-cli ('gcloud' command)"
}
}

command <<<
Expand All @@ -98,10 +113,11 @@ task unpack_archive_to_bucket_path {
if ~{if(defined(gcloud_access_token)) then 'true' else 'false'}; then
# set access token env var expected by gcloud,
# if provided by the user
export CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}"
CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}"
else
export CLOUDSDK_AUTH_ACCESS_TOKEN="$(gcloud auth print-access-token)"
CLOUDSDK_AUTH_ACCESS_TOKEN="$(gcloud auth print-access-token)"
fi
export CLOUDSDK_AUTH_ACCESS_TOKEN

# check that the gcloud access token is populated
if [ -z "${CLOUDSDK_AUTH_ACCESS_TOKEN}" ]; then
Expand All @@ -124,7 +140,7 @@ task unpack_archive_to_bucket_path {
# by trying a simple write action, since we cannot rely on
# the user having the permissions needed to view the IAM policies
# that determine their (write) access to the bucket
if ! echo "write_test" | gcloud storage cp - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then
if ! echo "write_test" | gcloud storage cp --verbosity error - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then
echo "ERROR: user does not have write access to the target bucket: ~{bucket_path_prefix}" >&2
exit 1
else
Expand Down Expand Up @@ -152,12 +168,12 @@ task unpack_archive_to_bucket_path {
# https://www.gnu.org/software/tar/manual/html_section/extract-options.html#Writing-to-an-External-Program
tar ~{tar_opts} -x \
~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \
--to-command='gcloud storage cp ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \
--to-command='gcloud storage cp ~{gcloud_storage_cp_opts} ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \
-f "${input_archive}"

# otherwise extract to disk and then upload to the bucket
else
echo 'Extracting archive '$(basename "${input_archive}")' to disk before upload...'
echo 'Extracting archive '"$(basename "${input_archive}")"' to disk before upload...'

# create a temporary directory to extract the archive contents to
mkdir -p extracted_tmp
Expand All @@ -177,6 +193,7 @@ task unpack_archive_to_bucket_path {
--recursive \
~{if clobber_existing then "" else "--no-clobber"} \
--verbosity warning \
~{gcloud_storage_cp_opts} \
./ "${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}"

popd
Expand Down Expand Up @@ -452,7 +469,7 @@ task download_from_url {
mv response01 "${downloaded_file_name}" && \
rm "tmp/$downloaded_file_name"
else
mv "tmp/${downloaded_file_name} ${downloaded_file_name}"
mv "tmp/${downloaded_file_name}" "${downloaded_file_name}"
fi
# alternative python implementation to split response headers from body
# via https://stackoverflow.com/a/75483099
Expand Down Expand Up @@ -490,11 +507,11 @@ task download_from_url {
if ~{if defined(md5_hash_expected) then 'true' else 'false'}; then
md5_hash_expected="~{md5_hash_expected}"
check_md5_sum $md5_hash_expected $md5sum_of_downloaded
check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded"
fi
if ~{if defined(md5_hash_expected_file_url) then 'true' else 'false'}; then
md5_hash_expected="$(curl --silent ~{md5_hash_expected_file_url} | cut -f1 -d' ')"
check_md5_sum $md5_hash_expected $md5sum_of_downloaded
check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded"
fi
# report the file size, in bytes
Expand Down
2 changes: 0 additions & 2 deletions pipes/WDL/workflows/sarscov2_illumina_full.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ workflow sarscov2_illumina_full {
description: "amplicon primers to trim in reference coordinate space (0-based BED format)",
patterns: ["*.bed"]
}

biosample_attributes: {
description: "A post-submission attributes file from NCBI BioSample, which is available at https://submit.ncbi.nlm.nih.gov/subs/ and clicking on 'Download attributes file with BioSample accessions'. The 'sample_name' column must match the external_ids used in sample_rename_map (or internal ids if sample_rename_map is omitted).",
patterns: ["*.txt", "*.tsv"]
}

}

input {
Expand Down
31 changes: 28 additions & 3 deletions pipes/WDL/workflows/unpack_archive_to_bucket.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,40 @@ workflow unpack_archive_to_bucket {
}

input {
String? gcloud_auth_token
Array[File] input_archive_files
String? bucket_path_prefix
String? out_dir_name

String? gcloud_access_token
}

parameter_meta {
input_archive_files: {
description: "List of input archive files to unpack.",
patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"]
}
bucket_path_prefix: {
description: "Path within the Google Storage bucket to unpack the archive files. If not provided, the root of the bucket will be used."
}
out_dir_name: {
description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix."
}
gcloud_access_token: {
description: "Access token for the Google Cloud Storage bucket, for account authorized to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be obtained via 'gcloud auth print-access-token' for the active authenticated user (on Terra, the service worker/'pet' account)."
}
}

call tasks_terra.check_terra_env

if( (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) || defined(gcloud_auth_token) ) {
# only run the task if we are running on GCP or the user provides an auth token to interact with GCP
# if needed, we can also inspect 'check_terra_env.is_running_on_terra'
if( check_terra_env.is_backed_by_gcp || defined(gcloud_access_token) ) {
call tasks_utils.unpack_archive_to_bucket_path {
input:
gcloud_access_token = gcloud_auth_token
input_archive_files = input_archive_files,
gcloud_access_token = gcloud_access_token,
bucket_path_prefix = if (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) then select_first([bucket_path_prefix,check_terra_env.workspace_bucket_path]) else select_first([bucket_path_prefix]),
out_dir_name = out_dir_name
}
}
}

0 comments on commit cfa65c3

Please sign in to comment.