diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index 92d58da..95f8c17 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -21,8 +21,6 @@ import nextflow.exception.AbortOperationException import nextflow.io.BucketParser import org.apache.commons.lang.StringUtils -import java.nio.file.Path - /** * @author Cedric Zhuang */ @@ -55,8 +53,6 @@ class FloatConf { String extraOptions String commonExtra - Path floatBin - float timeFactor = 1 /** @@ -72,8 +68,8 @@ class FloatConf { } FloatConf ret = new FloatConf() - ret.initFloatConf(config.float as Map) ret.initAwsConf(config) + ret.initFloatConf(config.float as Map) return ret } @@ -115,6 +111,12 @@ class FloatConf { return "$nfs:${workDir.path}" } + private String parseNfs(String nfsOption) { + def vol = new DataVolume(nfsOption) + vol.setS3Credentials(s3accessKey, s3secretKey) + return vol.toString() + } + private def initFloatConf(Map floatNode) { if (!floatNode) { return @@ -130,7 +132,7 @@ class FloatConf { .collect { it.trim() } .findAll { it.size() > 0 } } - this.nfs = floatNode.nfs + this.nfs = parseNfs(floatNode.nfs as String) if (floatNode.vmPolicy) { this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map) @@ -217,3 +219,64 @@ class FloatConf { return getCliPrefix(address).join(" ") } } + +class DataVolume { + private Map options + private URI uri + + DataVolume(String s) { + options = [:] + if (!s) { + uri = new URI("") + return + } + def opStart = s.indexOf("[") + def opEnd = s.indexOf("]") + if (opStart == 0 && opEnd != -1) { + def opStr = s.substring(1, opEnd) + for (String op : opStr.split(",")) { + def tokens = op.split("=") + if (tokens.size() < 2) { + continue + } + options[tokens[0]] = tokens[1] + } + uri = new URI(s.substring(opEnd + 1)) + } else { + uri = new URI(s) + } + } + + def setS3Credentials(String key, String secret) { + if (scheme != "s3") { + return + } + final accessKey = "accessKey" + final secretKey = "secret" + if (!options.containsKey(accessKey) || !options.containsKey(secretKey)) { + if (key != null) { + options[accessKey] = key + } + if (secret != null) { + options[secretKey] = secret + } + } + if (!options.containsKey('mode')) { + options['mode'] = "rw" + } + } + + def getScheme() { + return uri.scheme + } + + String toString() { + List ops = [] + options.forEach { k, v -> ops.add("$k=$v") } + if (ops.size() == 0) { + return uri.toString() + } + Collections.sort(ops) + return "[${ops.join(",")}]${uri}" + } +} \ No newline at end of file diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy index 0305a35..81423cc 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy @@ -210,4 +210,85 @@ class FloatConfTest extends BaseTest { [cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]' [cpu:[step:50]] | '[cpu.step=50]' } + + def "update s3 credentials"() { + given: + setEnv('AWS_ACCESS_KEY_ID', 'x') + setEnv('AWS_SECRET_ACCESS_KEY', 'y') + def fConf = FloatConf.getConf( + [float: [nfs: 's3://1.2.3.4/work/dir:/local']]) + + when: + def workDir = new URI('file:///local/here') + def volume = fConf.getWorkDirVol(workDir) + + then: + volume == '[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/work/dir:/local' + } } + + +class DataVolumeTest extends BaseTest { + def "parse nfs volume" (){ + given: + def nfs = "nfs://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(nfs) + + then: + vol.scheme == "nfs" + vol.toString() == nfs + } + + def "parse s3 without credentials" () { + given: + def s3 = "[mode=rw]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + + then: + vol.scheme == "s3" + vol.toString() == s3 + } + + def "existing s3 credentials" () { + given: + def s3 = "[accessKey=a,mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials("x", "y") + + then: + vol.scheme == "s3" + vol.toString() == s3 + } + + def "update s3 credentials" () { + given: + def s3 = "[secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials("x", "y") + + then: + vol.scheme == "s3" + vol.toString() == "[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/my/dir:/mnt/point" + } + + def "update s3 credentials" () { + given: + def s3 = "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials(null, null) + + then: + vol.scheme == "s3" + vol.toString() == "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + } +} \ No newline at end of file