Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try the best to fix s3 options. #61

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import nextflow.exception.AbortOperationException
import nextflow.io.BucketParser
import org.apache.commons.lang.StringUtils

import java.nio.file.Path

/**
* @author Cedric Zhuang <[email protected]>
*/
Expand Down Expand Up @@ -55,8 +53,6 @@ class FloatConf {
String extraOptions
String commonExtra

Path floatBin

float timeFactor = 1

/**
Expand All @@ -72,8 +68,8 @@ class FloatConf {
}
FloatConf ret = new FloatConf()

ret.initFloatConf(config.float as Map)
ret.initAwsConf(config)
ret.initFloatConf(config.float as Map)

return ret
}
Expand Down Expand Up @@ -115,6 +111,12 @@ class FloatConf {
return "$nfs:${workDir.path}"
}

private String parseNfs(String nfsOption) {
def vol = new DataVolume(nfsOption)
vol.setS3Credentials(s3accessKey, s3secretKey)
return vol.toString()
}

private def initFloatConf(Map floatNode) {
if (!floatNode) {
return
Expand All @@ -130,7 +132,7 @@ class FloatConf {
.collect { it.trim() }
.findAll { it.size() > 0 }
}
this.nfs = floatNode.nfs
this.nfs = parseNfs(floatNode.nfs as String)

if (floatNode.vmPolicy) {
this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map)
Expand Down Expand Up @@ -217,3 +219,64 @@ class FloatConf {
return getCliPrefix(address).join(" ")
}
}

class DataVolume {
private Map<String, String> options
private URI uri

DataVolume(String s) {
options = [:]
if (!s) {
uri = new URI("")
return
}
def opStart = s.indexOf("[")
def opEnd = s.indexOf("]")
if (opStart == 0 && opEnd != -1) {
def opStr = s.substring(1, opEnd)
for (String op : opStr.split(",")) {
def tokens = op.split("=")
if (tokens.size() < 2) {
continue
}
options[tokens[0]] = tokens[1]
}
uri = new URI(s.substring(opEnd + 1))
} else {
uri = new URI(s)
}
}

def setS3Credentials(String key, String secret) {
if (scheme != "s3") {
return
}
final accessKey = "accessKey"
final secretKey = "secret"
if (!options.containsKey(accessKey) || !options.containsKey(secretKey)) {
if (key != null) {
options[accessKey] = key
}
if (secret != null) {
options[secretKey] = secret
}
}
if (!options.containsKey('mode')) {
options['mode'] = "rw"
}
}

def getScheme() {
return uri.scheme
}

String toString() {
List<String> ops = []
options.forEach { k, v -> ops.add("$k=$v") }
if (ops.size() == 0) {
return uri.toString()
}
Collections.sort(ops)
return "[${ops.join(",")}]${uri}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ class FloatGridExecutor extends AbstractGridExecutor {

validate(task)

final jobName = floatJobs.getNfJobID(task.id)
final String tag = "${FloatConf.NF_JOB_ID}:${jobName}"
final container = task.getContainer()
if (!container) {
throw new AbortOperationException("container is empty. " +
Expand Down Expand Up @@ -395,6 +393,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
- exit status : $ret
- output :
""".stripIndent()
log.warn m
}
return ret
}.collect()
Expand Down
17 changes: 5 additions & 12 deletions plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,13 @@ import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.util.IniFile
import org.eclipse.jgit.internal.storage.file.Pack

import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager

import javax.net.ssl.*
import java.nio.file.Path
import java.nio.file.Paths
import java.security.SecureRandom
import java.security.cert.X509Certificate


class CmdRes {
CmdRes(int exit, String out) {
this.exit = exit
Expand Down Expand Up @@ -79,10 +72,10 @@ class Global {
}

// Install the all-trusting trust manager
SSLContext sc = SSLContext.getInstance("SSL");
SSLContext sc = SSLContext.getInstance("SSL")
sc.init(null, trustAllCerts, new SecureRandom())
def dftFactory = HttpsURLConnection.getDefaultSSLSocketFactory()
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory())

// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
Expand All @@ -91,7 +84,7 @@ class Global {

// Install the all-trusting host verifier
def dftVerifier = HttpsURLConnection.getDefaultHostnameVerifier()
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid)

url.openConnection().with { conn ->
filename.toFile().withOutputStream { out ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,85 @@ class FloatConfTest extends BaseTest {
[cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]'
[cpu:[step:50]] | '[cpu.step=50]'
}

def "update s3 credentials"() {
given:
setEnv('AWS_ACCESS_KEY_ID', 'x')
setEnv('AWS_SECRET_ACCESS_KEY', 'y')
def fConf = FloatConf.getConf(
[float: [nfs: 's3://1.2.3.4/work/dir:/local']])

when:
def workDir = new URI('file:///local/here')
def volume = fConf.getWorkDirVol(workDir)

then:
volume == '[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/work/dir:/local'
}
}


class DataVolumeTest extends BaseTest {
def "parse nfs volume" (){
given:
def nfs = "nfs://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(nfs)

then:
vol.scheme == "nfs"
vol.toString() == nfs
}

def "parse s3 without credentials" () {
given:
def s3 = "[mode=rw]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)

then:
vol.scheme == "s3"
vol.toString() == s3
}

def "existing s3 credentials" () {
given:
def s3 = "[accessKey=a,mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials("x", "y")

then:
vol.scheme == "s3"
vol.toString() == s3
}

def "update s3 credentials" () {
given:
def s3 = "[secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials("x", "y")

then:
vol.scheme == "s3"
vol.toString() == "[accessKey=x,mode=rw,secret=y]s3://1.2.3.4/my/dir:/mnt/point"
}

def "update s3 credentials" () {
given:
def s3 = "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"

when:
def vol = new DataVolume(s3)
vol.setS3Credentials(null, null)

then:
vol.scheme == "s3"
vol.toString() == "[mode=rw,secret=s]s3://1.2.3.4/my/dir:/mnt/point"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,4 @@ class FloatGridExecutorTest extends FloatBaseTest {
then:
str == "-p *** key1=*** key2=***"
}
}
}