Skip to content

Commit

Permalink
Allow requester pays for S3 buckets (#5027)
Browse files Browse the repository at this point in the history

Signed-off-by: Nathan Thorpe <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Ben Sherman <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Ben Sherman <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent 4daab01 commit 0070c1b
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 28 deletions.
7 changes: 6 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ The following settings are available:
`aws.client.proxyPassword`
: The password to use when connecting through a proxy.

`aws.client.requesterPays`
: :::{versionadded} 24.05.0-edge
:::
: Enable the requester pays feature for S3 buckets.

`aws.client.s3PathStyleAccess`
: Enable the use of path-based access model that is used to specify the address of an object in S3-compatible storage systems.

Expand Down Expand Up @@ -2136,4 +2141,4 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>` feature.
: When `true`, enables {ref}`topic channels <channel-topic>` feature.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
final debug = opts.debug ? ' --debug' : ''
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}"
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}${requesterPays}"
final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $aws ${workDir}/${TaskRun.CMD_RUN} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
return cmd
}
Expand All @@ -365,7 +366,8 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
final cli = opts.getS5cmdPath()
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final cmd = "trap \"{ ret=\$?; $cli cp${sse}${kms} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''
final cmd = "trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
return cmd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ class AwsOptions implements CloudTransferOptions {
return awsConfig.s3Config.getDebug()
}

Boolean getRequesterPays() {
return awsConfig.s3Config.getRequesterPays()
}

String getAwsCli() {
def result = getCliPath()
if( !result ) result = 'aws'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class AwsS3Config {

private Boolean anonymous

private Boolean requesterPays

AwsS3Config(Map opts) {
this.debug = opts.debug as Boolean
this.endpoint = opts.endpoint ?: SysEnv.get('AWS_S3_ENDPOINT')
Expand All @@ -60,6 +62,7 @@ class AwsS3Config {
this.pathStyleAccess = opts.s3PathStyleAccess as Boolean
this.anonymous = opts.anonymous as Boolean
this.s3Acl = parseS3Acl(opts.s3Acl as String)
this.requesterPays = opts.requesterPays as Boolean
}

private String parseStorageClass(String value) {
Expand Down Expand Up @@ -115,6 +118,10 @@ class AwsS3Config {
return anonymous
}

Boolean getRequesterPays() {
return requesterPays
}

boolean isCustomEndpoint() {
endpoint && !endpoint.contains(".amazonaws.com")
}
Expand Down
17 changes: 14 additions & 3 deletions plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
Expand Down Expand Up @@ -94,7 +95,7 @@
public class S3Client {

private static final Logger log = LoggerFactory.getLogger(S3Client.class);

private AmazonS3 client;

private CannedAccessControlList cannedAcl;
Expand All @@ -111,6 +112,8 @@ public class S3Client {

private Integer uploadMaxThreads = 10;

private Boolean isRequesterPaysEnabled = false;

public S3Client(AmazonS3 client) {
this.client = client;
}
Expand Down Expand Up @@ -140,7 +143,8 @@ public ObjectListing listObjects(ListObjectsRequest request) {
* @see com.amazonaws.services.s3.AmazonS3Client#getObject(String, String)
*/
public S3Object getObject(String bucketName, String key) {
return client.getObject(bucketName, key);
GetObjectRequest req = new GetObjectRequest(bucketName, key, isRequesterPaysEnabled);
return client.getObject(req);
}
/**
* @see com.amazonaws.services.s3.AmazonS3Client#putObject(String, String, File)
Expand Down Expand Up @@ -282,6 +286,13 @@ public void setStorageEncryption(String alg) {
log.debug("Setting S3 SSE storage encryption algorithm={}", alg);
}

public void setRequesterPaysEnabled(String requesterPaysEnabled) {
if( requesterPaysEnabled == null )
return;
this.isRequesterPaysEnabled = Boolean.valueOf(requesterPaysEnabled);
log.debug("Setting S3 requester pays enabled={}", isRequesterPaysEnabled);
}

public void setUploadChunkSize(String value) {
if( value==null )
return;
Expand Down Expand Up @@ -527,7 +538,7 @@ public void downloadDirectory(S3Path source, File targetFile) throws IOException
// see https://github.com/aws/aws-sdk-java/issues/1321
//
// just traverse to source path a copy all files
//
//
final Path target = targetFile.toPath();
final List<Download> allDownloads = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,29 @@

/**
* Spec:
*
*
* URI: s3://[endpoint]/{bucket}/{key} If endpoint is missing, it's assumed to
* be the default S3 endpoint (s3.amazonaws.com)
*
*
* FileSystem roots: /{bucket}/
*
*
* Treatment of S3 objects: - If a key ends in "/" it's considered a directory
* *and* a regular file. Otherwise, it's just a regular file. - It is legal for
* a key "xyz" and "xyz/" to exist at the same time. The latter is treated as a
* directory. - If a file "a/b/c" exists but there's no "a" or "a/b/", these are
* considered "implicit" directories. They can be listed, traversed and deleted.
*
*
* Deviations from FileSystem provider API: - Deleting a file or directory
* always succeeds, regardless of whether the file/directory existed before the
* operation was issued i.e. Files.delete() and Files.deleteIfExists() are
* equivalent.
*
*
*
*
* Future versions of this provider might allow for a strict mode that mimics
* the semantics of the FileSystem provider API on a best effort basis, at an
* increased processing cost.
*
*
*
*
*/
public class S3FileSystemProvider extends FileSystemProvider implements FileSystemTransferAware {

Expand Down Expand Up @@ -468,9 +468,9 @@ public long position() throws IOException {
@Override
public void createDirectory(Path dir, FileAttribute<?>... attrs)
throws IOException {

// FIXME: throw exception if the same key already exists at amazon s3

S3Path s3Path = (S3Path) dir;

Preconditions.checkArgument(attrs.length == 0,
Expand Down Expand Up @@ -547,7 +547,7 @@ public void copy(Path source, Path target, CopyOption... options)

S3Client client = s3Source.getFileSystem() .getClient();
Properties props = s3Target.getFileSystem().properties();

final ObjectMetadata sourceObjMetadata = s3Source.getFileSystem().getClient().getObjectMetadata(s3Source.getBucket(), s3Source.getKey());
final S3MultipartOptions opts = props != null ? new S3MultipartOptions(props) : new S3MultipartOptions();
final long maxSize = opts.getMaxCopySize();
Expand Down Expand Up @@ -845,6 +845,7 @@ protected S3FileSystem createFileSystem(URI uri, AwsConfig awsConfig) {
client.setKmsKeyId(props.getProperty("storage_kms_key_id"));
client.setUploadChunkSize(props.getProperty("upload_chunk_size"));
client.setUploadMaxThreads(props.getProperty("upload_max_threads"));
client.setRequesterPaysEnabled(props.getProperty("requester_pays_enabled"));

if( props.getProperty("glacier_auto_retrieval") != null )
log.warn("Glacier auto-retrieval is no longer supported, config option `aws.client.glacierAutoRetrieval` will be ignored");
Expand All @@ -866,7 +867,7 @@ protected String getProp(Properties props, String... keys) {
}
return null;
}

/**
* find /amazon.properties in the classpath
* @return Properties amazon.properties
Expand All @@ -879,12 +880,12 @@ protected Properties loadAmazonProperties() {
if (in != null){
props.load(in);
}

} catch (IOException e) {}

return props;
}

// ~~~

private <T> void verifySupportedOptions(Set<? extends T> allowedOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class S3BashLib extends BashFunLib<S3BashLib> {
private String retryMode
private String s5cmdPath
private String acl = ''
private String requesterPays = ''

S3BashLib withCliPath(String cliPath) {
if( cliPath )
Expand Down Expand Up @@ -77,13 +78,18 @@ class S3BashLib extends BashFunLib<S3BashLib> {
this.s5cmdPath = value
return this
}

S3BashLib withAcl(CannedAccessControlList value) {
if( value )
this.acl = "--acl $value "
return this
}

S3BashLib withRequesterPays(Boolean value) {
this.requesterPays = value ? "--request-payer requester " : ''
return this
}

protected String retryEnv() {
if( !retryMode )
return ''
Expand All @@ -106,11 +112,11 @@ class S3BashLib extends BashFunLib<S3BashLib> {
local name=\$1
local s3path=\$2
if [[ "\$name" == - ]]; then
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass - "\$s3path"
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass - "\$s3path"
elif [[ -d "\$name" ]]; then
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli s3 cp --only-show-errors --recursive ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
else
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli s3 cp --only-show-errors ${debug}${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
fi
}
Expand Down Expand Up @@ -144,11 +150,11 @@ class S3BashLib extends BashFunLib<S3BashLib> {
if [[ "\$name" == - ]]; then
local tmp=\$(nxf_mktemp)
cp /dev/stdin \$tmp/\$name
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass \$tmp/\$name "\$s3path"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass \$tmp/\$name "\$s3path"
elif [[ -d "\$name" ]]; then
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass "\$name/" "\$s3path/\$name/"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name/" "\$s3path/\$name/"
else
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}--storage-class $storageClass "\$name" "\$s3path/\$name"
$cli cp ${acl}${storageEncryption}${storageKmsKeyId}${requesterPays}--storage-class $storageClass "\$name" "\$s3path/\$name"
fi
}
Expand Down Expand Up @@ -187,6 +193,7 @@ class S3BashLib extends BashFunLib<S3BashLib> {
.withDebug( opts.debug )
.withS5cmdPath( opts.s5cmdPath )
.withAcl( opts.s3Acl )
.withRequesterPays( opts.requesterPays )
}

static String script(AwsOptions opts) {
Expand Down

0 comments on commit 0070c1b

Please sign in to comment.