diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index 92d58da..7c9f2b3 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -21,8 +21,6 @@ import nextflow.exception.AbortOperationException import nextflow.io.BucketParser import org.apache.commons.lang.StringUtils -import java.nio.file.Path - /** * @author Cedric Zhuang */ @@ -55,8 +53,6 @@ class FloatConf { String extraOptions String commonExtra - Path floatBin - float timeFactor = 1 /** @@ -72,8 +68,8 @@ class FloatConf { } FloatConf ret = new FloatConf() - ret.initFloatConf(config.float as Map) ret.initAwsConf(config) + ret.initFloatConf(config.float as Map) return ret } @@ -115,6 +111,12 @@ class FloatConf { return "$nfs:${workDir.path}" } + private String parseNfs(String nfsOption) { + def vol = new DataVolume(nfsOption) + vol.setS3Credentials(s3accessKey, s3secretKey) + return vol.toString() + } + private def initFloatConf(Map floatNode) { if (!floatNode) { return @@ -130,7 +132,7 @@ class FloatConf { .collect { it.trim() } .findAll { it.size() > 0 } } - this.nfs = floatNode.nfs + this.nfs = parseNfs(floatNode.nfs as String) if (floatNode.vmPolicy) { this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map) @@ -217,3 +219,64 @@ class FloatConf { return getCliPrefix(address).join(" ") } } + +class DataVolume { + private Map options + private URI uri + + DataVolume(String s) { + options = [:] + if (!s) { + uri = new URI("") + return + } + def opStart = s.indexOf("[") + def opEnd = s.indexOf("]") + if (opStart == 0 && opEnd != -1) { + def opStr = s.substring(1, opEnd) + for (String op : opStr.split(",")) { + def tokens = op.split("=") + if (tokens.size() < 2) { + continue + } + options[tokens[0]] = tokens[1] + } + uri = new URI(s.substring(opEnd + 1)) + } else { + uri = new URI(s) + } + } + + def setS3Credentials(String key, String secret) { + if (scheme != "s3") { + return + } + final accessKey = "accessKey" + final secretKey = "secret" + if (!options.containsKey(accessKey) || !options.containsKey(secretKey)) { + if (key != null) { + options[accessKey] = key + } + if (secret != null) { + options[secretKey] = secret + } + } + if (!options.containsKey('mode')) { + options['mode'] = "rw" + } + } + + def getScheme() { + return uri.scheme + } + + String toString() { + List ops = [] + options.forEach { k, v -> ops.add("$k=$v") } + if (ops.size() == 0) { + return uri.toString() + } + Collections.sort(ops) + return "[${ops.join(",")}]${uri}" + } +} diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index e2c2e47..99e55d9 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -270,8 +270,6 @@ class FloatGridExecutor extends AbstractGridExecutor { validate(task) - final jobName = floatJobs.getNfJobID(task.id) - final String tag = "${FloatConf.NF_JOB_ID}:${jobName}" final container = task.getContainer() if (!container) { throw new AbortOperationException("container is empty. " + @@ -395,6 +393,7 @@ class FloatGridExecutor extends AbstractGridExecutor { - exit status : $ret - output : """.stripIndent() + log.warn m } return ret }.collect() diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy index 7bf8c6e..e8f48b8 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy @@ -19,20 +19,13 @@ import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.SysEnv import nextflow.util.IniFile -import org.eclipse.jgit.internal.storage.file.Pack - -import javax.net.ssl.HostnameVerifier -import javax.net.ssl.HttpsURLConnection -import javax.net.ssl.SSLContext -import javax.net.ssl.SSLSession -import javax.net.ssl.TrustManager -import javax.net.ssl.X509TrustManager + +import javax.net.ssl.* import java.nio.file.Path import java.nio.file.Paths import java.security.SecureRandom import java.security.cert.X509Certificate - class CmdRes { CmdRes(int exit, String out) { this.exit = exit @@ -79,10 +72,10 @@ class Global { } // Install the all-trusting trust manager - SSLContext sc = SSLContext.getInstance("SSL"); + SSLContext sc = SSLContext.getInstance("SSL") sc.init(null, trustAllCerts, new SecureRandom()) def dftFactory = HttpsURLConnection.getDefaultSSLSocketFactory() - HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()) // Create all-trusting host name verifier HostnameVerifier allHostsValid = new HostnameVerifier() { @@ -91,7 +84,7 @@ class Global { // Install the all-trusting host verifier def dftVerifier = HttpsURLConnection.getDefaultHostnameVerifier() - HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); + HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid) url.openConnection().with { conn -> filename.toFile().withOutputStream { out -> diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy index 0305a35..2041d21 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatConfTest.groovy @@ -210,4 +210,85 @@ class FloatConfTest extends BaseTest { [cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]' [cpu:[step:50]] | '[cpu.step=50]' } + + def "update s3 credentials"() { + given: + setEnv('AWS_ACCESS_KEY_ID', 'x') + setEnv('AWS_SECRET_ACCESS_KEY', 'y') + def fConf = FloatConf.getConf( + [float: [nfs: 's3://1.2.3.4/work/dir:/local']]) + + when: + def workDir = new URI('file:///local/here') + def volume = fConf.getWorkDirVol(workDir) + + then: + volume == '[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/work/dir:/local' + } +} + + +class DataVolumeTest extends BaseTest { + def "parse nfs volume" (){ + given: + def nfs = "nfs://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(nfs) + + then: + vol.scheme == "nfs" + vol.toString() == nfs + } + + def "parse s3 without credentials" () { + given: + def s3 = "[mode=rw]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + + then: + vol.scheme == "s3" + vol.toString() == s3 + } + + def "existing s3 credentials" () { + given: + def s3 = "[accessKey=a,mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials("x", "y") + + then: + vol.scheme == "s3" + vol.toString() == s3 + } + + def "update s3 credentials" () { + given: + def s3 = "[secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials("x", "y") + + then: + vol.scheme == "s3" + vol.toString() == "[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/my/dir:/mnt/point" + } + + def "update s3 credentials" () { + given: + def s3 = "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + + when: + def vol = new DataVolume(s3) + vol.setS3Credentials(null, null) + + then: + vol.scheme == "s3" + vol.toString() == "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point" + } } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 928ae6e..2de823c 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -507,4 +507,4 @@ class FloatGridExecutorTest extends FloatBaseTest { then: str == "-p *** key1=*** key2=***" } -} \ No newline at end of file +}