Skip to content

Commit

Permalink
Download float cli from op-center.
Browse files Browse the repository at this point in the history
When the plugin starts, it checks if float cli is available in path.
If not, try to download it from the op-center.

The binary is kept in the plugin's directory.
  • Loading branch information
jealous committed Aug 28, 2023
1 parent a80cfa8 commit 00b5836
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 46 deletions.
60 changes: 60 additions & 0 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatBin.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.memverge.nextflow

import groovy.util.logging.Slf4j
import org.apache.commons.lang.SystemUtils

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.regex.Pattern

@Slf4j
class FloatBin {
private static final binName = 'float'

static Path get(String opCenterAddr) {
if (!opCenterAddr) {
return Paths.get(binName)
}
def ret = getFloatBinPath()
if (ret == null) {
final URL src = getDownloadUrl(opCenterAddr)
final Path pluginsDir = Global.getPluginsDir()
ret = pluginsDir.resolve(binName)
try {
Global.download(src, ret)
ret.setExecutable(true)
} catch (Exception ex) {
log.warn("download ${binName} failed: ${ex.message}")
return Paths.get(binName)
}
}
return ret
}

private static URL getDownloadUrl(String opCenter) {
if (SystemUtils.IS_OS_WINDOWS) {
return new URL("https://${opCenter}/float.windows_amd64")
} else if (SystemUtils.IS_OS_LINUX) {
return new URL("https://${opCenter}/float")
} else if (SystemUtils.IS_OS_MAC) {
return new URL("https://${opCenter}/float.darwin_amd64")
}
throw new UnsupportedOperationException("OS not supported")
}

private static Path getFloatBinPath() {
final sep = Pattern.quote(File.pathSeparator)
def paths = Arrays.asList(System.getenv("PATH").split(sep))
paths = new ArrayList<String>(paths)
paths.add(Global.getPluginsDir().toString())
for (String path : paths) {
def floatPath = Paths.get(path).resolve(binName)
if (Files.exists(floatPath)) {
return floatPath
}
}
log.info "${binName} binary not found"
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import nextflow.exception.AbortOperationException
import nextflow.io.BucketParser
import org.apache.commons.lang.StringUtils

import java.nio.file.Path

/**
* @author Cedric Zhuang <[email protected]>
*/
Expand Down Expand Up @@ -53,6 +55,8 @@ class FloatConf {
String extraOptions
String commonExtra

Path floatBin

float timeFactor = 1

/**
Expand Down Expand Up @@ -196,8 +200,9 @@ class FloatConf {
if (StringUtils.length(address) == 0) {
address = addresses[0]
}
def bin = FloatBin.get(address)
List<String> ret = [
"float",
bin.toString(),
"-a",
address,
"-u",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,18 +387,14 @@ class FloatGridExecutor extends AbstractGridExecutor {
void killTask(def floatJobID) {
def cmdList = killTaskCommands(floatJobID)
cmdList.parallelStream().map { cmd ->
def proc = new ProcessBuilder(cmd).redirectErrorStream(true).start()
proc.waitForOrKill(10_000)
def ret = proc.exitValue()
def ret = Global.execute(cmd).exit
if (ret != 0) {
def m = """\
Unable to kill pending jobs
- cmd executed: ${toLogStr(cmd)}}
- exit status : $ret
- output :
""".stripIndent()
m += proc.text.indent(' ')
log.debug(m)
}
return ret
}.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FloatTaskHandler extends GridTaskHandler {
// both exit status and job rc code are empty
if (task.exitStatus == null) {
if (st.isError()) {
task.exitStatus = Integer.MAX_VALUE
task.exitStatus = 1
} else {
task.exitStatus = 0
}
Expand Down
121 changes: 100 additions & 21 deletions plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ package com.memverge.nextflow

import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.util.IniFile

import org.eclipse.jgit.internal.storage.file.Pack

import javax.net.ssl.HostnameVerifier
import javax.net.ssl.HttpsURLConnection
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManager
import javax.net.ssl.X509TrustManager
import java.nio.file.Path
import java.nio.file.Paths
import java.security.SecureRandom
import java.security.cert.X509Certificate


class CmdRes {
Expand Down Expand Up @@ -55,13 +65,83 @@ class Global {
return new CmdRes(exit, out)
}

@PackageScope
static def download(URL url, Path filename) {
// Create a trust manager that does not validate certificate chains
TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
X509Certificate[] getAcceptedIssuers() { return null }

void checkClientTrusted(X509Certificate[] certs, String authType) {}

void checkServerTrusted(X509Certificate[] certs, String authType) {}
}
}

// Install the all-trusting trust manager
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, trustAllCerts, new SecureRandom())
def dftFactory = HttpsURLConnection.getDefaultSSLSocketFactory()
HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());

// Create all-trusting host name verifier
HostnameVerifier allHostsValid = new HostnameVerifier() {
boolean verify(String hostname, SSLSession session) { return true }
}

// Install the all-trusting host verifier
def dftVerifier = HttpsURLConnection.getDefaultHostnameVerifier()
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);

url.openConnection().with { conn ->
filename.toFile().withOutputStream { out ->
conn.inputStream.with { inp ->
out << inp
inp.close()
}
}
}
log.info "download ${url} to ${filename}"
// restore the factory and verifier
HttpsURLConnection.setDefaultSSLSocketFactory(dftFactory)
HttpsURLConnection.setDefaultHostnameVerifier(dftVerifier)
}

@PackageScope
static Path getPluginsDir() {
Map<String, String> env = SysEnv.get()
final dir = env.get('NXF_PLUGINS_DIR')
if (dir) {
log.trace "Detected NXF_PLUGINS_DIR=$dir"
return Paths.get(dir)
} else if (env.containsKey('NXF_HOME')) {
log.trace "Detected NXF_HOME - Using ${env.NXF_HOME}/plugins"
return Paths.get(env.NXF_HOME, 'plugins')
} else {
log.trace "Using local plugins directory"
return Paths.get('plugins')
}
}

@PackageScope
static List<String> getAwsCredentials(Map env, Map config) {

def home = Paths.get(System.properties.get('user.home') as String)
def files = [ home.resolve('.aws/credentials'), home.resolve('.aws/config') ]
def files = [home.resolve('.aws/credentials'), home.resolve('.aws/config')]
getAwsCredentials0(env, config, files)
}

static def setEnv(String key, String value) {
try {
def env = System.getenv()
def cl = env.getClass()
def field = cl.getDeclaredField("m")
field.setAccessible(true)
def writableEnv = (Map<String, String>) field.get(env)
writableEnv.put(key, value)
} catch (Exception e) {
throw new IllegalStateException("Failed to set environment variable", e)
}
}

/**
Expand All @@ -74,19 +154,19 @@ class Global {
* @param env The system environment map
* @param config The nextflow config object map
* @return A pair where the first element is the access key and the second the secret key or
* {@code null} if the credentials are missing
* {@code null} if the credentials are missing
*/
@PackageScope
static List<String> getAwsCredentials0( Map env, Map config, List<Path> files = []) {
static List<String> getAwsCredentials0(Map env, Map config, List<Path> files = []) {

String a
String b

if( config && config.aws instanceof Map ) {
a = ((Map)config.aws).accessKey
b = ((Map)config.aws).secretKey
if (config && config.aws instanceof Map) {
a = ((Map) config.aws).accessKey
b = ((Map) config.aws).secretKey

if( a && b ) {
if (a && b) {
log.debug "Using AWS credentials defined in nextflow config file"
return [a, b]
}
Expand All @@ -95,46 +175,45 @@ class Global {

// as define by amazon doc
// http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html
if( env && (a=env.AWS_ACCESS_KEY_ID) && (b=env.AWS_SECRET_ACCESS_KEY) ) {
if (env && (a = env.AWS_ACCESS_KEY_ID) && (b = env.AWS_SECRET_ACCESS_KEY)) {
log.debug "Using AWS credentials defined by environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY"
return [a, b]
}

if( env && (a=env.AWS_ACCESS_KEY) && (b=env.AWS_SECRET_KEY) ) {
if (env && (a = env.AWS_ACCESS_KEY) && (b = env.AWS_SECRET_KEY)) {
log.debug "Using AWS credentials defined by environment variables AWS_ACCESS_KEY and AWS_SECRET_KEY"
return [a, b]
}

for( Path it : files ) {
for (Path it : files) {
final conf = new IniFile(it)
final profile = getAwsProfile0(env, config)
final section = conf.section(profile)
if( (a=section.aws_access_key_id) && (b=section.aws_secret_access_key) ) {
if ((a = section.aws_access_key_id) && (b = section.aws_secret_access_key)) {
final token = section.aws_session_token
if( token ) {
if (token) {
log.debug "Using AWS temporary session credentials defined in `$profile` section in file: ${conf.file}"
return [a,b,token]
}
else {
return [a, b, token]
} else {
log.debug "Using AWS credential defined in `$profile` section in file: ${conf.file}"
return [a,b]
return [a, b]
}
}
}

return null
}

static protected String getAwsProfile0(Map env, Map<String,Object> config) {
static protected String getAwsProfile0(Map env, Map<String, Object> config) {

final profile = config?.navigate('aws.profile')
if( profile )
if (profile)
return profile

if( env?.containsKey('AWS_PROFILE'))
if (env?.containsKey('AWS_PROFILE'))
return env.get('AWS_PROFILE')

if( env?.containsKey('AWS_DEFAULT_PROFILE'))
if (env?.containsKey('AWS_DEFAULT_PROFILE'))
return env.get('AWS_DEFAULT_PROFILE')

return 'default'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger

class BaseTest extends Specification {
def setEnv(String key, String value) {
try {
def env = System.getenv()
def cl = env.getClass()
def field = cl.getDeclaredField("m")
field.setAccessible(true)
def writableEnv = (Map<String, String>) field.get(env)
writableEnv.put(key, value)
} catch (Exception e) {
throw new IllegalStateException("Failed to set environment variable", e)
}
Global.setEnv(key, value)
}
}

Expand All @@ -54,6 +45,7 @@ class FloatBaseTest extends BaseTest {
def workDir = '/mnt/nfs/shared'
def taskID = new TaskId(55)
def uuid = UUID.fromString("00000000-0000-0000-0000-000000000000")
def bin = FloatBin.get("").toString()
private AtomicInteger taskSerial = new AtomicInteger()

class FloatTestExecutor extends FloatGridExecutor {
Expand Down Expand Up @@ -108,7 +100,7 @@ class FloatBaseTest extends BaseTest {

def submitCmd(Map param = [:]) {
def taskIndex = param.taskIndex?:'1'
return ['float', '-a', param.addr ?: addr,
return [bin, '-a', param.addr ?: addr,
'-u', user,
'-p', pass,
'submit',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.memverge.nextflow
import nextflow.exception.AbortOperationException

class FloatConfTest extends BaseTest {
private def bin = FloatBin.get("").toString()

def "one op-center in the address"() {
given:
def conf = [
Expand Down Expand Up @@ -68,7 +70,7 @@ class FloatConfTest extends BaseTest {
def fConf = FloatConf.getConf(conf)

then:
fConf.getCli() == "float -a 1.2.3.4 " +
fConf.getCli() == "${bin} -a 1.2.3.4 " +
"-u admin -p password"

}
Expand All @@ -84,7 +86,7 @@ class FloatConfTest extends BaseTest {
def fConf = FloatConf.getConf(conf)

then:
fConf.getCli("1.1.1.1") == "float -a 1.1.1.1 " +
fConf.getCli("1.1.1.1") == "${bin} -a 1.1.1.1 " +
"-u admin -p password"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest {
def cmd2 = cmdMap['fb']

then:
cmd1.join(' ') == "float -a fa -u ${user} -p ${pass} " +
cmd1.join(' ') == "${bin} -a fa -u ${user} -p ${pass} " +
"list --format json"
cmd2.join(' ') == "float -a fb -u ${user} -p ${pass} " +
cmd2.join(' ') == "${bin} -a fb -u ${user} -p ${pass} " +
"list --format json"
}

Expand Down
Loading

0 comments on commit 00b5836

Please sign in to comment.