Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access Google Cloud Storage via NIO #6775

Merged
merged 35 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c02b285
[WIP] Access Google Cloud Storage via NIO
fm3 Jan 23, 2023
3ff126c
info request
fm3 Jan 23, 2023
38e6f9e
storage options
fm3 Jan 26, 2023
e772e06
use credentials, try without gzip
fm3 Jan 26, 2023
583ac2e
WIP: streamline credential passing
fm3 Jan 27, 2023
9b20a1c
wip: create gs file systems on demand
fm3 Jan 30, 2023
36570e3
compile
fm3 Jan 30, 2023
ff81f85
bucket name
fm3 Jan 30, 2023
3b4b400
handling of path vs uri for gcs case
fm3 Jan 30, 2023
0a562be
remove unused code
fm3 Jan 30, 2023
07f7d28
cleanup file system instantiation
fm3 Jan 31, 2023
0863ba7
Merge branch 'master' into google-cloud
fm3 Jan 31, 2023
c94a4c1
allow legacy credentials, rename things
fm3 Feb 1, 2023
f5417ef
Merge branch 'master' into google-cloud
fm3 Feb 1, 2023
caa7f1f
cache credentials, use alfu cache for file systems
fm3 Feb 1, 2023
847c3ff
re-add gunzip
fm3 Feb 1, 2023
4959e1d
rename credential type enum values
fm3 Feb 2, 2023
9d1abea
Add create route for google service account credentials
fm3 Feb 2, 2023
7856982
Add create route for google service account credentials
fm3 Feb 2, 2023
65b3959
Merge branch 'google-cloud' of github.com:scalableminds/webknossos in…
philippotto Feb 2, 2023
4ca0295
fix enum names
fm3 Feb 2, 2023
e27e750
hide Optional hint in add-zarr view; also reset original input url wh…
philippotto Feb 3, 2023
9dd28c0
format backend
philippotto Feb 3, 2023
2def32b
pr feedback part 2: messages
fm3 Feb 6, 2023
3004be0
Merge branch 'google-cloud' of github.com:scalableminds/webknossos in…
fm3 Feb 6, 2023
ae88223
changelog
fm3 Feb 6, 2023
6f2553b
Merge branch 'master' into google-cloud
fm3 Feb 6, 2023
795d07c
Update CHANGELOG.unreleased.md
philippotto Feb 6, 2023
82e4af2
catch parsing of invalid json
philippotto Feb 6, 2023
3945a9c
Merge branch 'google-cloud' of github.com:scalableminds/webknossos in…
philippotto Feb 6, 2023
cdab9ba
lint
philippotto Feb 6, 2023
0fdf55b
fix typo
philippotto Feb 6, 2023
163f824
adapt docs
philippotto Feb 6, 2023
11706e5
update and mention supported protocols in UI
philippotto Feb 6, 2023
e12bfca
Merge branch 'master' into google-cloud
fm3 Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions app/models/binary/credential/CredentialDAO.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package models.binary.credential

import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.storage.{AnyCredential, HttpBasicAuthCredential, S3AccessKeyCredential}
import com.scalableminds.webknossos.datastore.storage.{
FileSystemCredential,
GoogleServiceAccountCredential,
HttpBasicAuthCredential,
S3AccessKeyCredential
}
import com.scalableminds.webknossos.schema.Tables.{Credentials, CredentialsRow}
import net.liftweb.util.Helpers.tryo
import play.api.libs.json.Json
import utils.sql.{SecuredSQLDAO, SqlClient, SqlToken}
import utils.ObjectId

Expand Down Expand Up @@ -42,6 +49,18 @@ class CredentialDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContex
r._Organization
)

private def parseAsGoogleServiceAccountCredential(r: CredentialsRow): Fox[GoogleServiceAccountCredential] =
for {
secret <- r.secret.toFox
secretJson <- tryo(Json.parse(secret)).toFox
} yield
GoogleServiceAccountCredential(
r.name,
secretJson,
r._User,
r._Organization
)

def insertOne(_id: ObjectId, credential: HttpBasicAuthCredential): Fox[Unit] =
for {
_ <- run(q"""insert into webknossos.credentials(_id, type, name, identifier, secret, _user, _organization)
Expand All @@ -51,19 +70,30 @@ class CredentialDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContex
def insertOne(_id: ObjectId, credential: S3AccessKeyCredential): Fox[Unit] =
for {
_ <- run(q"""insert into webknossos.credentials(_id, type, name, identifier, secret, _user, _organization)
values(${_id}, ${CredentialType.S3_Access_Key}, ${credential.name}, ${credential.keyId}, ${credential.key}, ${credential.user}, ${credential.organization})""".asUpdate)
values(${_id}, ${CredentialType.S3_Access_Key}, ${credential.name}, ${credential.accessKeyId}, ${credential.secretAccessKey}, ${credential.user}, ${credential.organization})""".asUpdate)
} yield ()

def insertOne(_id: ObjectId, credential: GoogleServiceAccountCredential): Fox[Unit] =
for {
_ <- run(q"""insert into webknossos.credentials(_id, type, name, secret, _user, _organization)
values(${_id}, ${CredentialType.GCS}, ${credential.name}, ${credential.secretJson.toString}, ${credential.user}, ${credential.organization})""".asUpdate)
} yield ()

def findOne(id: ObjectId): Fox[AnyCredential] =
def findOne(id: ObjectId): Fox[FileSystemCredential] =
for {
r <- run(q"select $columns from webknossos.credentials_ where _id = $id".as[CredentialsRow])
firstRow <- r.headOption.toFox
parsed <- parseAnyCredential(firstRow)
} yield parsed

private def parseAnyCredential(r: CredentialsRow): Fox[AnyCredential] =
r.`type` match {
case "HTTP_Basic_Auth" => parseAsHttpBasicAuthCredential(r)
case "S3_Access_Key" => parseAsS3AccessKeyCredential(r)
}
private def parseAnyCredential(r: CredentialsRow): Fox[FileSystemCredential] =
for {
typeParsed <- CredentialType.fromString(r.`type`).toFox
parsed <- typeParsed match {
case CredentialType.HTTP_Basic_Auth => parseAsHttpBasicAuthCredential(r)
case CredentialType.S3_Access_Key => parseAsS3AccessKeyCredential(r)
case CredentialType.GCS => parseAsGoogleServiceAccountCredential(r)
case _ => Fox.failure(s"Unknown credential type: ${r.`type`}")
}
} yield parsed
}
69 changes: 38 additions & 31 deletions app/models/binary/credential/CredentialService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,59 @@ package models.binary.credential

import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.storage.{
FileSystemCredential,
FileSystemsHolder,
GoogleServiceAccountCredential,
HttpBasicAuthCredential,
S3AccessKeyCredential
}
import net.liftweb.util.Helpers.tryo
import play.api.libs.json.Json
import utils.ObjectId

import java.net.URI
import javax.inject.Inject
import scala.concurrent.ExecutionContext

class CredentialService @Inject()(credentialDao: CredentialDAO) {
class CredentialService @Inject()(credentialDAO: CredentialDAO) {

def createCredential(uri: URI,
username: Option[String],
password: Option[String],
user: String,
organization: String)(implicit ec: ExecutionContext): Fox[Option[ObjectId]] = {
val scheme = uri.getScheme
scheme match {
def createCredentialOpt(uri: URI,
credentialIdentifier: Option[String],
credentialSecret: Option[String],
userId: ObjectId,
organizationId: ObjectId): Option[FileSystemCredential] =
uri.getScheme match {
case FileSystemsHolder.schemeHttps =>
username match {
case Some(u) =>
val _id = ObjectId.generate
for {
_ <- credentialDao.insertOne(
_id,
HttpBasicAuthCredential(uri.toString, u, password.getOrElse(""), user, organization))
} yield Some(_id)
case None => Fox.successful(None)
}
credentialIdentifier.map(
username =>
HttpBasicAuthCredential(uri.toString,
username,
credentialSecret.getOrElse(""),
userId.toString,
organizationId.toString))
case FileSystemsHolder.schemeS3 =>
username match {
case Some(keyId) =>
password match {
case Some(secretKey) =>
val _id = ObjectId.generate
for {
_ <- credentialDao.insertOne(
_id,
S3AccessKeyCredential(uri.toString, keyId, secretKey, user, organization))
} yield Some(_id)
case None => Fox.successful(None)
}
case None => Fox.successful(None)
(credentialIdentifier, credentialSecret) match {
case (Some(keyId), Some(secretKey)) =>
Some(S3AccessKeyCredential(uri.toString, keyId, secretKey, userId.toString, organizationId.toString))
case _ => None
}
case FileSystemsHolder.schemeGS =>
for {
secret <- credentialSecret
secretJson <- tryo(Json.parse(secret)).toOption
} yield GoogleServiceAccountCredential(uri.toString, secretJson, userId.toString, organizationId.toString)
}

def insertOne(credential: FileSystemCredential)(implicit ec: ExecutionContext): Fox[ObjectId] = {
val _id = ObjectId.generate
for {
_ <- credential match {
case c: HttpBasicAuthCredential => credentialDAO.insertOne(_id, c)
case c: S3AccessKeyCredential => credentialDAO.insertOne(_id, c)
case c: GoogleServiceAccountCredential => credentialDAO.insertOne(_id, c)
case _ => Fox.failure("Unknown credential type")
}
} yield _id
}

}
36 changes: 20 additions & 16 deletions app/models/binary/explore/ExploreRemoteLayerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.scalableminds.webknossos.datastore.dataformats.zarr._
import com.scalableminds.webknossos.datastore.datareaders.n5.N5Header
import com.scalableminds.webknossos.datastore.datareaders.zarr._
import com.scalableminds.webknossos.datastore.models.datasource._
import com.scalableminds.webknossos.datastore.storage.FileSystemsHolder
import com.scalableminds.webknossos.datastore.storage.{FileSystemsHolder, RemoteSourceDescriptor}
import com.typesafe.scalalogging.LazyLogging
import models.binary.credential.CredentialService
import models.user.User
Expand All @@ -23,7 +23,9 @@ import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
import scala.util.Try

case class ExploreRemoteDatasetParameters(remoteUri: String, user: Option[String], password: Option[String])
case class ExploreRemoteDatasetParameters(remoteUri: String,
credentialIdentifier: Option[String],
credentialSecret: Option[String])

object ExploreRemoteDatasetParameters {
implicit val jsonFormat: OFormat[ExploreRemoteDatasetParameters] = Json.format[ExploreRemoteDatasetParameters]
Expand All @@ -39,8 +41,8 @@ class ExploreRemoteLayerService @Inject()(credentialService: CredentialService)
exploredLayersNested <- Fox.serialCombined(urisWithCredentials)(
parameters =>
exploreRemoteLayersForUri(parameters.remoteUri,
parameters.user,
parameters.password,
parameters.credentialIdentifier,
parameters.credentialSecret,
reportMutable,
requestIdentity))
layersWithVoxelSizes = exploredLayersNested.flatten
Expand Down Expand Up @@ -138,25 +140,27 @@ class ExploreRemoteLayerService @Inject()(credentialService: CredentialService)

private def exploreRemoteLayersForUri(
layerUri: String,
user: Option[String],
password: Option[String],
credentialIdentifier: Option[String],
credentialSecret: Option[String],
reportMutable: ListBuffer[String],
requestingUser: User)(implicit ec: ExecutionContext): Fox[List[(DataLayer, Vec3Double)]] =
for {
remoteSource <- tryo(RemoteSourceDescriptor(new URI(normalizeUri(layerUri)), user, password)).toFox ?~> s"Received invalid URI: $layerUri"
credentialId <- credentialService.createCredential(
new URI(normalizeUri(layerUri)),
user,
password,
requestingUser._id.toString,
requestingUser._organization.toString) ?~> "Failed to set up remote file system credentaial"
fileSystem <- FileSystemsHolder.getOrCreate(remoteSource).toFox ?~> "Failed to set up remote file system"
remotePath <- tryo(fileSystem.getPath(remoteSource.remotePath)) ?~> "Failed to get remote path"
uri <- tryo(new URI(normalizeUri(layerUri))) ?~> s"Received invalid URI: $layerUri"
credentialOpt = credentialService.createCredentialOpt(uri,
credentialIdentifier,
credentialSecret,
requestingUser._id,
requestingUser._organization)
remoteSource = RemoteSourceDescriptor(uri, credentialOpt)
credentialId <- Fox.runOptional(credentialOpt)(c => credentialService.insertOne(c)) ?~> "Failed to store file system credential"
fileSystem <- FileSystemsHolder.getOrCreate(remoteSource) ?~> "Failed to set up remote file system"
remotePath <- tryo(fileSystem.getPath(FileSystemsHolder.pathFromUri(remoteSource.uri))) ?~> "Failed to get remote path"
fm3 marked this conversation as resolved.
Show resolved Hide resolved
layersWithVoxelSizes <- exploreRemoteLayersForRemotePath(
remotePath,
credentialId.map(_.toString),
reportMutable,
List(new ZarrArrayExplorer, new NgffExplorer, new N5ArrayExplorer, new N5MultiscalesExplorer))
List(new ZarrArrayExplorer, new NgffExplorer, new N5ArrayExplorer, new N5MultiscalesExplorer)
)
} yield layersWithVoxelSizes

private def normalizeUri(uri: String): String =
Expand Down
7 changes: 6 additions & 1 deletion app/models/binary/explore/N5ArrayExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ class N5ArrayExplorer extends RemoteLayerExplorer {
elementClass <- n5Header.elementClass ?~> "failed to read element class from n5 header"
guessedAxisOrder = AxisOrder.asZyxFromRank(n5Header.rank)
boundingBox <- n5Header.boundingBox(guessedAxisOrder) ?~> "failed to read bounding box from zarr header. Make sure data is in (T/C)ZYX format"
magLocator = MagLocator(Vec3Int.ones, Some(remotePath.toString), None, Some(guessedAxisOrder), None, credentialId)
magLocator = MagLocator(Vec3Int.ones,
Some(remotePath.toUri.toString),
None,
Some(guessedAxisOrder),
None,
credentialId)
layer: N5Layer = if (looksLikeSegmentationLayer(name, elementClass)) {
N5SegmentationLayer(name, boundingBox, elementClass, List(magLocator), largestSegmentId = None)
} else N5DataLayer(name, Category.color, boundingBox, elementClass, List(magLocator))
Expand Down
2 changes: 1 addition & 1 deletion app/models/binary/explore/N5MultiscalesExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class N5MultiscalesExplorer extends RemoteLayerExplorer with FoxImplicits {
elementClass <- n5Header.elementClass ?~> s"failed to read element class from n5 header at $headerPath"
boundingBox <- n5Header.boundingBox(axisOrder) ?~> s"failed to read bounding box from n5 header at $headerPath"
} yield
MagWithAttributes(MagLocator(mag, Some(magPath.toString), None, Some(axisOrder), None, credentialId),
MagWithAttributes(MagLocator(mag, Some(magPath.toUri.toString), None, Some(axisOrder), None, credentialId),
magPath,
elementClass,
boundingBox)
Expand Down
9 changes: 5 additions & 4 deletions app/models/binary/explore/NgffExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ class NgffExplorer extends RemoteLayerExplorer {
elementClass <- zarrHeader.elementClass ?~> s"failed to read element class from zarr header at $zarrayPath"
boundingBox <- zarrHeader.boundingBox(axisOrder) ?~> s"failed to read bounding box from zarr header at $zarrayPath"
} yield
MagWithAttributes(MagLocator(mag, Some(magPath.toString), None, Some(axisOrder), channelIndex, credentialId),
magPath,
elementClass,
boundingBox)
MagWithAttributes(
MagLocator(mag, Some(magPath.toUri.toString), None, Some(axisOrder), channelIndex, credentialId),
magPath,
elementClass,
boundingBox)

private def extractAxisOrder(axes: List[NgffAxis]): Fox[AxisOrder] = {
def axisMatches(axis: NgffAxis, name: String) = axis.name.toLowerCase == name && axis.`type` == "space"
Expand Down
4 changes: 3 additions & 1 deletion app/models/binary/explore/RemoteLayerExplorer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models.binary.explore

import com.scalableminds.util.geometry.{BoundingBox, Vec3Double}
import com.scalableminds.util.io.ZipIO
import com.scalableminds.util.tools.{Fox, FoxImplicits, JsonHelper}
import com.scalableminds.webknossos.datastore.dataformats.MagLocator
import com.scalableminds.webknossos.datastore.models.datasource.{DataLayer, ElementClass}
Expand All @@ -24,7 +25,8 @@ trait RemoteLayerExplorer extends FoxImplicits {

protected def parseJsonFromPath[T: Reads](path: Path): Fox[T] =
for {
fileAsString <- tryo(new String(Files.readAllBytes(path), StandardCharsets.UTF_8)).toFox ?~> "Failed to read remote file"
fileBytes <- tryo(ZipIO.tryGunzip(Files.readAllBytes(path))) ?~> "Failed to read remote file"
fileAsString <- tryo(new String(fileBytes, StandardCharsets.UTF_8)).toFox ?~> "Failed to read remote file"
parsed <- JsonHelper.parseAndValidateJson[T](fileAsString) ?~> "Failed to parse or validate json against data schema"
} yield parsed

Expand Down
7 changes: 6 additions & 1 deletion app/models/binary/explore/ZarrArrayExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ class ZarrArrayExplorer extends RemoteLayerExplorer {
elementClass <- zarrHeader.elementClass ?~> "failed to read element class from zarr header"
guessedAxisOrder = AxisOrder.asZyxFromRank(zarrHeader.rank)
boundingBox <- zarrHeader.boundingBox(guessedAxisOrder) ?~> "failed to read bounding box from zarr header. Make sure data is in (T/C)ZYX format"
magLocator = MagLocator(Vec3Int.ones, Some(remotePath.toString), None, Some(guessedAxisOrder), None, credentialId)
magLocator = MagLocator(Vec3Int.ones,
Some(remotePath.toUri.toString),
None,
Some(guessedAxisOrder),
None,
credentialId)
layer: ZarrLayer = if (looksLikeSegmentationLayer(name, elementClass)) {
ZarrSegmentationLayer(name, boundingBox, elementClass, List(magLocator), largestSegmentId = None)
} else ZarrDataLayer(name, Category.color, boundingBox, elementClass, List(magLocator))
Expand Down
3 changes: 0 additions & 3 deletions conf/META-INF/services/java.nio.file.spi.FileSystemProvider

This file was deleted.

4 changes: 2 additions & 2 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,5 @@ pidfile.path = "/dev/null"


# uncomment these lines for faster restart during local backend development (but beware the then-missing features):
#slick.checkSchemaOnStartup = false
#play.modules.disabled += "play.modules.swagger.SwaggerModule"
slick.checkSchemaOnStartup = false
play.modules.disabled += "play.modules.swagger.SwaggerModule"
4 changes: 2 additions & 2 deletions frontend/javascripts/admin/admin_rest_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1525,8 +1525,8 @@ export async function exploreRemoteDataset(
data: credentials
? remoteUris.map((uri) => ({
remoteUri: uri.trim(),
user: credentials.username,
password: credentials.pass,
credentialIdentifier: credentials.username,
credentialSecret: credentials.pass,
}))
: remoteUris.map((uri) => ({ remoteUri: uri.trim() })),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ function AddZarrLayer({
(userInput.indexOf("https://") !== 0 && userInput.indexOf("s3://") === 0)
) {
setSelectedProtocol(userInput.indexOf("https://") === 0 ? "https" : "s3");
setShowCredentialsFields(userInput.indexOf("s3://") === 0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that all supported schemas now support their form of credentials so this is no longer needed

} else {
throw new Error("Dataset URL must employ either the https:// or s3:// protocol.");
}
Expand Down
6 changes: 5 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ object Dependencies {
private val jackson = "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.12.7"
private val commonsCompress = "org.apache.commons" % "commons-compress" % "1.21"
private val jwt = "com.github.jwt-scala" %% "jwt-play-json" % "9.1.1"
private val googleCloudStorage = "com.google.cloud" % "google-cloud-storage" % "2.13.1"
private val googleCloudStorageNio = "com.google.cloud" % "google-cloud-nio" % "0.123.28"

private val sql = Seq(
"com.typesafe.slick" %% "slick" % "3.3.3",
Expand Down Expand Up @@ -104,7 +106,9 @@ object Dependencies {
tika,
jblosc,
scalajHttp,
commonsCompress
commonsCompress,
googleCloudStorage,
googleCloudStorageNio
)

val webknossosTracingstoreDependencies: Seq[ModuleID] = Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ addSbtPlugin("com.sksamuel.scapegoat" %% "sbt-scapegoat" % "1.1.1")
addSbtPlugin("net.vonbuchholtz" % "sbt-dependency-check" % "3.1.3")

//protocol buffers
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3"
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.12"
21 changes: 21 additions & 0 deletions util/src/main/scala/com/scalableminds/util/io/ZipIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,27 @@ object ZipIO extends LazyLogging {
}
}

def tryGunzip(possiblyCompressed: Array[Byte]): Array[Byte] =
tryo(gunzip(possiblyCompressed)).toOption.getOrElse(possiblyCompressed)

def gunzip(compressed: Array[Byte]): Array[Byte] = {
val is = new GZIPInputStream(new ByteArrayInputStream(compressed))
val os = new ByteArrayOutputStream()
try {
val buffer = new Array[Byte](1024)
fm3 marked this conversation as resolved.
Show resolved Hide resolved
var len = 0
do {
len = is.read(buffer)
if (len > 0)
os.write(buffer, 0, len)
} while (len > 0)
os.toByteArray
} finally {
is.close()
os.close()
}
}

def zipToTempFile(files: List[File]): File = {
val outfile = File.createTempFile("data", System.nanoTime().toString + ".zip")
val zip: OpenZip = startZip(new FileOutputStream(outfile))
Expand Down
Loading