Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DRS publishing #5

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/nf-ga4gh/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ dependencies {
api 'com.google.code.gson:gson:2.10.1'
api 'io.gsonfire:gson-fire:1.8.3'
api 'org.threeten:threetenbp:1.3.5'
implementation 'com.joyent.util:fast-md5:2.7.1'
implementation "io.nextflow:nf-amazon:2.1.4"
implementation "org.apache.httpcomponents:httpmime:4.5.14"
implementation "org.apache.httpcomponents:httpclient:4.5.14"

// test configuration
testImplementation "org.apache.groovy:groovy:4.0.21"
Expand Down
218 changes: 218 additions & 0 deletions plugins/nf-ga4gh/src/main/nextflow/ga4gh/drs/client/DrsClient.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* MIT License
*
* Copyright (c) 2024 Nicolas Vannieuwkerke
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice (including the next
* paragraph) shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
nvnieuwk marked this conversation as resolved.
Show resolved Hide resolved
package nextflow.ga4gh.drs.client

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

import java.time.Instant
import java.lang.Long
import java.io.File
import java.io.IOException
import java.io.OutputStream
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse
import java.net.http.HttpResponse.BodyHandlers
import java.nio.file.Path
import java.nio.channels.Channels
import java.nio.channels.Pipe
import java.nio.charset.StandardCharsets

import org.apache.http.HttpEntity
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.StringBody

import nextflow.ga4gh.drs.exceptions.DrsObjectPublishingException
import nextflow.ga4gh.drs.exceptions.DrsAuthenticationException
import nextflow.ga4gh.drs.config.DrsConfig
import nextflow.ga4gh.drs.utils.DrsUtils

/**
* Define the DRS client
*
* @author Nicolas Vannieuwkerke <[email protected]>
*/
@Slf4j
@CompileStatic
class DrsClient {

String authHeader
long authExpiration

String user
String password
String endpoint

/**
* Create a DRS client based on a DrsConfig object
*
* @param config The DrsConfig object.
*/
DrsClient(DrsConfig config) {
this.user = config.user
this.password = config.password
this.endpoint = config.endpoint
refreshToken()
}

/**
* Check if the token is expired or expires soon. If so, the token should be refreshed
*
*/
private void checkToken() {
long epoch = Instant.now().toEpochMilli()
// Refresh the token if it expires in less than one minute
if(authExpiration - epoch < 1*60*100) {
refreshToken()
log.debug("Refreshed DRS token")
}
}

/**
* Refresh the DRS token
*
*/
private void refreshToken() {
// Create the form with the username and password
HttpEntity httpEntity = MultipartEntityBuilder.create()
.addPart(
"username",
new StringBody(
this.user,
ContentType.create("application/x-www-form-urlencoded", StandardCharsets.UTF_8)
)
)
.addPart(
"password",
new StringBody(
this.password,
ContentType.create("application/x-www-form-urlencoded", StandardCharsets.UTF_8)
)
)
.build()

// Efficiently stream the form
Pipe pipe = Pipe.open()
new Thread(() -> {
try (OutputStream outputStream = Channels.newOutputStream(pipe.sink())) {
httpEntity.writeTo(outputStream)
}
}).start()

// Do a POST request to get the bearer token and add it the a header value
HttpClient httpClient = HttpClient.newHttpClient()

HttpRequest request = HttpRequest.newBuilder(new URI("${this.endpoint}/token".toString()))
.header("Content-Type", httpEntity.getContentType().getValue())
.version(HttpClient.Version.HTTP_1_1)
.POST(BodyPublishers.ofInputStream(() -> Channels.newInputStream(pipe.source()))).build()

HttpResponse<String> responseBody = httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8))

Map responseMap = (Map) new JsonSlurper().parseText(responseBody.body())
if(responseMap.containsKey("status_code") && responseMap.status_code != 200) {
throw new DrsAuthenticationException(responseMap.msg.toString())
}
this.authHeader = "Bearer ${responseMap.access_token}"
this.authExpiration = Instant.now().toEpochMilli() + Long.parseLong(responseMap.expires_in.toString()) * 60 * 100
}

/**
* Upload a DRS object
*
* @param obj A Groovy map with the structure of a DRS object
*/
public String uploadObject(Map obj) {
checkToken()

// Upload the DRS Object using a POST request
HttpClient client = HttpClient.newHttpClient()
String objBody = new JsonBuilder(obj).toPrettyString()
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("${this.endpoint}/ga4gh/drs/v1/objects"))
.header("Authorization", this.authHeader)
.header("Content-Type", "application/json")
.version(HttpClient.Version.HTTP_1_1)
.POST(HttpRequest.BodyPublishers.ofString(objBody))
.build()

HttpResponse<String> responseBody = client.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8))

Map responseMap = (Map) new JsonSlurper().parseText(responseBody.body())
Integer statusCode = responseBody.statusCode()

// Return the DRS id if the object has been created or when the object exists
switch(statusCode) {
case 201:
return responseMap.object_id
case 409:
List accessMethods = obj.access_methods as List
String url = accessMethods[0]['access_url']['url']
log.debug("DRS object for '${url}' already exists. Skipping the upload of this object")
return getIdFromUrl(url)
default:
throw new DrsObjectPublishingException("Received an error when publishing a DRS object (Status code: ${statusCode}): ${responseBody.body()}")
}
}

/**
* Get a DRS id based on a publish URL
*
* @param url The URL to get the DRS id from
*/
public String getIdFromUrl(String url) {
checkToken()
String sample = new DrsUtils().getSampleName(url as Path)

HttpClient client = HttpClient.newHttpClient()
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("${this.endpoint}/ga4gh/drs/v1/objects?alias=${sample}"))
.header("Authorization", this.authHeader)
.version(HttpClient.Version.HTTP_1_1)
.GET()
.build()

HttpResponse<String> responseBody = client.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8))

List responseObjects = (List) new JsonSlurper().parseText(responseBody.body())
Integer statusCode = responseBody.statusCode()

Map correctObj = (Map) responseObjects.find { obj ->
List urls = obj['access_methods']['access_url']['url'] as List
return urls.contains(url)
}
if(correctObj) {
return correctObj.id
}
throw new DrsObjectPublishingException("Creating a DRS object for ${url} returned a 'object already exists' response, but the object couldn't be found with an alias search on '${sample}'")
}

}
111 changes: 111 additions & 0 deletions plugins/nf-ga4gh/src/main/nextflow/ga4gh/drs/config/DrsConfig.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* MIT License
*
* Copyright (c) 2024 Nicolas Vannieuwkerke
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice (including the next
* paragraph) shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package nextflow.ga4gh.drs.config

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import java.util.regex.Matcher
import java.util.regex.Pattern

import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.ClientConfiguration

import nextflow.ga4gh.drs.exceptions.DrsConfigException
import nextflow.cloud.aws.AwsClientFactory
import nextflow.cloud.aws.config.AwsConfig

/**
* Define the plugin configuration values.
*
* The configuration values can be extracted from the map and will be stored as
* on the instance.
*
*
* TODO: Describe the configuration of your actual implementation.
*
* @author Nicolas Vannieuwkerke <[email protected]>
*/
@Slf4j
@CompileStatic
class DrsConfig {

Boolean enabled
String endpoint
String endpointNoProtocol
String user
String password
AmazonS3 s3Client
List<String> allowedExtensions
String run
String summary

/**
* Construct a configuration instance.
*
* @param map A nextflow plugin wrapper instance.
*/
DrsConfig(Map map = [:]) {
this.enabled = map.navigate("drs.enabled") ?: false
this.endpoint = map.navigate("drs.endpoint") ?: System.getenv("DRS_URL") ?: ""
this.endpointNoProtocol = this.endpoint.split("://")[-1]
this.user = map.navigate("drs.user") ?: System.getenv("DRS_USERNAME") ?: ""
this.password = map.navigate("drs.password") ?: System.getenv("DRS_PASSWORD") ?: ""
Map awsConfig = (Map) map.navigate("aws") ?: [:]
awsConfig.region = awsConfig.region ?: "uz"
this.s3Client = new AwsClientFactory(new AwsConfig(awsConfig)).getS3Client()
this.allowedExtensions = map.navigate("drs.allowedExtensions") as List<String> ?: []
this.run = map.navigate("drs.run") ?: "" // TODO implement a more dynamic way for pipeline runs with more than one sequencer run
this.summary = map.navigate("drs.summary") ?: ""

// Some failsafe options to prevent weird errors
if(!this.enabled) { return }

if(!this.endpoint) {
throw new DrsConfigException("Please provide a DRS endpoint with the drs.endpoint configuration option or with the DRS_URL environment variable")
}

if(!this.user) {
throw new DrsConfigException("Unable to get the DRS username. Make sure the drs.user configuration option or the DRS_USERNAME environment variable is set")
}

if(!this.password) {
throw new DrsConfigException("Unable to get the DRS password. Make sure the drs.password configuration option or the DRS_PASSWORD environment variable is set")
}

if(this.summary) {
String summaryExtension = this.summary.tokenize(".").last() ?: ""
List allowedSummaryExtensions = ["csv", "tsv", "json", "yaml", "yml"]
if(!allowedSummaryExtensions.contains(summaryExtension)) {
throw new DrsConfigException("Unrecognized extension used for the DRS summary file. The extension (${summaryExtension}) should be on of these: ${allowedSummaryExtensions.join(',')}")
}
}

if(!this.run) {
throw new DrsConfigException("Please provide a run with the `drs.run` configuration option")
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package nextflow.ga4gh.drs.exceptions

import groovy.transform.CompileStatic
import nextflow.exception.AbortOperationException
/**
* Exception thrown for errors while creating a DrsConfig object
*
* @author Nicolas Vannieuwkerke <[email protected]>
*/
@CompileStatic
class DrsAuthenticationException extends AbortOperationException {

DrsAuthenticationException(String message) {
super("Something went wrong during the DRS authentication: " + message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package nextflow.ga4gh.drs.exceptions

import groovy.transform.CompileStatic
import nextflow.exception.AbortOperationException
/**
* Exception thrown for errors while creating a DrsConfig object
*
* @author Nicolas Vannieuwkerke <[email protected]>
*/
@CompileStatic
class DrsConfigException extends AbortOperationException {

DrsConfigException(String message) {
super(message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package nextflow.ga4gh.drs.exceptions

import groovy.transform.CompileStatic
import nextflow.exception.AbortOperationException
/**
* Exception thrown for errors while creating a DRS object
*
* @author Nicolas Vannieuwkerke <[email protected]>
*/
@CompileStatic
class DrsObjectCreationException extends AbortOperationException {

DrsObjectCreationException(String message) {
super(message)
}
}
Loading