Skip to content

Commit

Permalink
Try the best to fix s3 options.
Browse files Browse the repository at this point in the history
Try our best in the plugin to fill the s3 options in the nfs field.
Use the aws credentials in Nextflow.
Fill the read/write mode option if it's not available.
  • Loading branch information
jealous committed Aug 28, 2023
1 parent 00b5836 commit 565f07a
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 6 deletions.
75 changes: 69 additions & 6 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import nextflow.exception.AbortOperationException
import nextflow.io.BucketParser
import org.apache.commons.lang.StringUtils

import java.nio.file.Path

/**
* @author Cedric Zhuang <[email protected]>
*/
Expand Down Expand Up @@ -55,8 +53,6 @@ class FloatConf {
String extraOptions
String commonExtra

Path floatBin

float timeFactor = 1

/**
Expand All @@ -72,8 +68,8 @@ class FloatConf {
}
FloatConf ret = new FloatConf()

ret.initFloatConf(config.float as Map)
ret.initAwsConf(config)
ret.initFloatConf(config.float as Map)

return ret
}
Expand Down Expand Up @@ -115,6 +111,12 @@ class FloatConf {
return "$nfs:${workDir.path}"
}

private String parseNfs(String nfsOption) {
def vol = new DataVolume(nfsOption)
vol.setS3Credentials(s3accessKey, s3secretKey)
return vol.toString()
}

private def initFloatConf(Map floatNode) {
if (!floatNode) {
return
Expand All @@ -130,7 +132,7 @@ class FloatConf {
.collect { it.trim() }
.findAll { it.size() > 0 }
}
this.nfs = floatNode.nfs
this.nfs = parseNfs(floatNode.nfs as String)

if (floatNode.vmPolicy) {
this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map)
Expand Down Expand Up @@ -217,3 +219,64 @@ class FloatConf {
return getCliPrefix(address).join(" ")
}
}

class DataVolume {
private Map<String, String> options
private URI uri

DataVolume(String s) {
options = [:]
if (!s) {
uri = new URI("")
return
}
def opStart = s.indexOf("[")
def opEnd = s.indexOf("]")
if (opStart == 0 && opEnd != -1) {
def opStr = s.substring(1, opEnd)
for (String op : opStr.split(",")) {
def tokens = op.split("=")
if (tokens.size() < 2) {
continue
}
options[tokens[0]] = tokens[1]
}
uri = new URI(s.substring(opEnd + 1))
} else {
uri = new URI(s)
}
}

def setS3Credentials(String key, String secret) {
if (scheme != "s3") {
return
}
final accessKey = "accessKey"
final secretKey = "secret"
if (!options.containsKey(accessKey) || !options.containsKey(secretKey)) {
if (key != null) {
options[accessKey] = key
}
if (secret != null) {
options[secretKey] = secret
}
}
if (!options.containsKey('mode')) {
options['mode'] = "rw"
}
}

def getScheme() {
return uri.scheme
}

String toString() {
List<String> ops = []
options.forEach { k, v -> ops.add("$k=$v") }
if (ops.size() == 0) {
return uri.toString()
}
Collections.sort(ops)
return "[${ops.join(",")}]${uri}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,85 @@ class FloatConfTest extends BaseTest {
[cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]'
[cpu:[step:50]] | '[cpu.step=50]'
}

def "update s3 credentials"() {
given:
setEnv('AWS_ACCESS_KEY_ID', 'x')
setEnv('AWS_SECRET_ACCESS_KEY', 'y')
def fConf = FloatConf.getConf(
[float: [nfs: 's3://1.2.3.4/work/dir:/local']])

when:
def workDir = new URI('file:///local/here')
def volume = fConf.getWorkDirVol(workDir)

then:
volume == '[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/work/dir:/local'
}
}


class DataVolumeTest extends BaseTest {
def "parse nfs volume" (){
given:
def nfs = "nfs://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(nfs)

then:
vol.scheme == "nfs"
vol.toString() == nfs
}

def "parse s3 without credentials" () {
given:
def s3 = "[mode=rw]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)

then:
vol.scheme == "s3"
vol.toString() == s3
}

def "existing s3 credentials" () {
given:
def s3 = "[accessKey=a,mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials("x", "y")

then:
vol.scheme == "s3"
vol.toString() == s3
}

def "update s3 credentials" () {
given:
def s3 = "[secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials("x", "y")

then:
vol.scheme == "s3"
vol.toString() == "[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/my/dir:/mnt/point"
}

def "update s3 credentials" () {
given:
def s3 = "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials(null, null)

then:
vol.scheme == "s3"
vol.toString() == "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"
}
}

0 comments on commit 565f07a

Please sign in to comment.