From 3bf5a29104ae16b12e2a91b331950025305bd004 Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Thu, 13 Jul 2023 15:50:22 +0800 Subject: [PATCH] [GH-38] Track job with tags For now, we use the name of the job to track the ownership of the job. But many user want to give their own job names. It's better to use the tag to track the source of the job. And user could always add more tags for their own usage. Add `nf-job-id` tag to track the nf task info. --- .../com/memverge/nextflow/CmdResult.groovy | 5 +- .../com/memverge/nextflow/FloatConf.groovy | 1 + .../nextflow/FloatGridExecutor.groovy | 5 +- .../com/memverge/nextflow/FloatJobs.groovy | 2 +- .../memverge/nextflow/CmdResultTest.groovy | 16 +++- .../FloatGridExecutorMultiOCTest.groovy | 8 +- .../nextflow/FloatGridExecutorTest.groovy | 78 +++++++++++++------ 7 files changed, 78 insertions(+), 37 deletions(-) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/CmdResult.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/CmdResult.groovy index b7dc391..004825b 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/CmdResult.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/CmdResult.groovy @@ -94,8 +94,9 @@ class CmdResult { for (i in obj) { def id = i.id as String def status = i.status as String - def taskID = i.name as String - if (id && status) { + def tags = i.customTags as Map + String taskID = tags ? tags[FloatConf.NF_JOB_ID] : "" + if (id && status && taskID) { ret[id] = new JobStatus(taskID, status) } } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index 086f370..aa08a79 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -29,6 +29,7 @@ class FloatConf { static String MMC_USERNAME = "MMC_USERNAME" static String MMC_PASSWORD = "MMC_PASSWORD" static String ADDR_SEP = "," + static String NF_JOB_ID = "nf-job-id" /** credentials for op center */ String username diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index bf24910..083c599 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -200,6 +200,7 @@ class FloatGridExecutor extends AbstractGridExecutor { @Override List getSubmitCommandLine(TaskRun task, Path scriptFile) { validateTaskConf(task.config) + String tag = "${FloatConf.NF_JOB_ID}:${floatJobs.getJobName(task.id)}" def cmd = getSubmitCmdPrefix() cmd << 'sbatch' cmd << '--dataVolume' @@ -212,8 +213,8 @@ class FloatGridExecutor extends AbstractGridExecutor { cmd << getMem(task) cmd << '--job' cmd << scriptFile.toString() - cmd << '--name' - cmd << floatJobs.getJobName(task.id) + cmd << '--customTag' + cmd << tag cmd.addAll(getExtra(task)) log.info "[float] submit job: ${toCmdString(cmd)}" return cmd diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy index 24e1a88..fbb797d 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy @@ -42,7 +42,7 @@ class FloatJobs { job2oc = new ConcurrentHashMap<>() task2workDir = new ConcurrentHashMap<>() ocs = ocAddresses - def charset = (('A'..'Z')+('0'..'9')).join('') + def charset = (('a'..'z')+('0'..'9')).join('') taskPrefix = RandomStringUtils.random( 6, charset.toCharArray()) } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/CmdResultTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/CmdResultTest.groovy index 8d2ba06..2c6eefe 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/CmdResultTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/CmdResultTest.groovy @@ -116,7 +116,11 @@ class CmdResultTest extends Specification { "name": "cactus-c5d.large", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "FailToExecute" + "status": "FailToExecute", + "customTags": { + "nf-job-id": "job-a", + "a": "apple" + } }, { "id": "u5x3sSLe0p3OznGavmYu3", @@ -124,7 +128,11 @@ class CmdResultTest extends Specification { "workingHost": "3.143.251.235 (2Core4GB/Spot)", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Executing" + "status": "Executing", + "customTags": { + "b": "banana", + "nf-job-id": "job-b" + } } ]""" def stMap = res.getQStatus() @@ -133,9 +141,9 @@ class CmdResultTest extends Specification { then: st1.status == 'FailToExecute' - st1.taskID == 'cactus-c5d.large' + st1.taskID == 'job-a' st2.status == 'Executing' - st2.taskID == 'cactus-t3a.medium' + st2.taskID == 'job-b' } def "get queue empty"() { diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy index fc17cc9..b01fab4 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy @@ -69,11 +69,11 @@ class FloatGridExecutorMultiOCTest extends Specification { cmd1.join(' ') == "float -a fb -u admin -p password sbatch " + "--dataVolume ${dataVol} --image ${image} " + "--cpu ${cpu} --mem ${mem} --job ${script}" + - " --name tJob-${taskID}" + " --customTag nf-job-id:tJob-${taskID}" cmd2.join(' ') == "float -a fa -u admin -p password sbatch " + "--dataVolume ${dataVol} --image ${image} " + "--cpu ${cpu} --mem ${mem} --job ${script}" + - " --name tJob-${taskID}" + " --customTag nf-job-id:tJob-${taskID}" } def "get queue status commands"() { @@ -116,10 +116,10 @@ class FloatGridExecutorMultiOCTest extends Specification { cmd1.join(' ') == "float -a fb -u admin -p password sbatch " + "--dataVolume ${dataVol} --image ${image} " + "--cpu ${cpu} --mem ${mem} --job ${script}" + - " --name tJob-${taskID}" + " --customTag nf-job-id:tJob-${taskID}" cmd2.join(' ') == "float -a fa -u admin -p password sbatch " + "--dataVolume ${dataVol} --image ${image} " + "--cpu ${cpu} --mem ${mem} --job ${script}" + - " --name tJob-${taskID}" + " --customTag nf-job-id:tJob-${taskID}" } } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index b80e73a..4bb6f17 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -53,7 +53,7 @@ class FloatGridExecutorTest extends Specification { } def jobID(TaskId id) { - return "$tJob-$id" + return "${FloatConf.NF_JOB_ID}:$tJob-$id" } def "get the prefix of kill command"() { @@ -133,7 +133,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "add default local mount point"() { @@ -160,7 +160,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "use cpus, memory and container"() { @@ -187,7 +187,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', '8', '--mem', '16', '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "add common extras"() { @@ -216,7 +216,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', '2', '--mem', '4', '--job', script, - '--name', jobID(taskID), + '--customTag', jobID(taskID), '-t', 'small', '-f'].join(" ") } @@ -242,7 +242,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', '2', '--mem', '4', '--job', script, - '--name', jobID(taskID), + '--customTag', jobID(taskID), '-f', '-t', 'small'].join(' ') } @@ -259,7 +259,7 @@ class FloatGridExecutorTest extends Specification { when: task.config = [nfs : nfs, - extra: '--name hello',] as TaskConfig + extra: '--customTag hello',] as TaskConfig task.id = taskID def cmd = exec.getSubmitCommandLine(task, Paths.get(script)) @@ -273,9 +273,9 @@ class FloatGridExecutorTest extends Specification { '--cpu', '2', '--mem', '4', '--job', script, - '--name', jobID(taskID), + '--customTag', jobID(taskID), '-t', 'small', '-f', - '--name', 'hello'].join(' ') + '--customTag', 'hello'].join(' ') } def "config level cpu and memory"() { @@ -307,7 +307,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "use default cpu, memory and image"() { @@ -337,7 +337,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', '2', '--mem', '4', '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "use default nfs and work dir"() { @@ -367,7 +367,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') } def "parse data volume"() { @@ -399,7 +399,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID), + '--customTag', jobID(taskID), '-M', 'cpu.upperBoundDuration=5s', '--dataVolume', '"[size=50]":/BWA_BASE'].join(' ') } @@ -414,70 +414,100 @@ class FloatGridExecutorTest extends Specification { "name": "tJob-0", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Submitted" + "status": "Submitted", + "customTags": { + "nf-job-id": "tJob-0" + } }, { "id": "task1", "name": "tJob-1", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Initializing" + "status": "Initializing", + "customTags": { + "nf-job-id": "tJob-1" + } }, { "id": "task2", "name": "tJob-2", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Executing" + "status": "Executing", + "customTags": { + "nf-job-id": "tJob-2" + } }, { "id": "task3", "name": "tJob-3", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Floating" + "status": "Floating", + "customTags": { + "nf-job-id": "tJob-3" + } }, { "id": "task4", "name": "tJob-4", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Completed" + "status": "Completed", + "customTags": { + "nf-job-id": "tJob-4" + } }, { "id": "task5", "name": "tJob-5", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Cancelled" + "status": "Cancelled", + "customTags": { + "nf-job-id": "tJob-5" + } }, { "id": "task6", "name": "tJob-6", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "FailToComplete" + "status": "FailToComplete", + "customTags": { + "nf-job-id": "tJob-6" + } }, { "id": "task7", "name": "tJob-7", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "FailToExecute" + "status": "FailToExecute", + "customTags": { + "nf-job-id": "tJob-7" + } }, { "id": "task8", "name": "tJob-8", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Completed" + "status": "Completed", + "customTags": { + "nf-job-id": "tJob-8" + } }, { "id": "task9", "name": "tJob-9", "user": "admin", "imageID": "docker.io/memverge/cactus:latest", - "status": "Starting" + "status": "Starting", + "customTags": { + "nf-job-id": "tJob-9" + } } ] """.stripIndent() @@ -555,7 +585,7 @@ class FloatGridExecutorTest extends Specification { '--cpu', cpu.toString(), '--mem', mem.toString(), '--job', script, - '--name', jobID(taskID)].join(' ') + '--customTag', jobID(taskID)].join(' ') cleanup: setEnv('MMC_ADDRESS', '')