Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add different types of authentication for azure blob storage #845

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,16 @@
# Optional. Azure blob storage client configuration.
# ABS client won't be enabled if it isn't given.
"azureStorage": {
# Azure blob storage account names
"storageAccountNames": ["storageAccount1", "storageAccount2"]
"accounts": [
# Example public account with no auth
{ "name": "storageAccount1"},

# Example private account using default auth chain -> https://learn.microsoft.com/en-us/java/api/com.azure.identity.defaultazurecredential?view=azure-java-stable
{ "name": "storageAccount2", "auth": { "type": "default"} },

# Example private account using SAS token auth
{ "name": "storageAccount3", "auth": { "type": "sas", "value": "tokenValue"}}
]
}
}

Expand Down
12 changes: 10 additions & 2 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,16 @@
# Optional. Azure blob storage client configuration.
# ABS client won't be enabled if it isn't given.
"azureStorage": {
# Azure blob storage account names
"storageAccountNames": ["storageAccount1", "storageAccount2"]
"accounts": [
# Example public account with no auth
{ "name": "storageAccount1"},

# Example private account using default auth chain -> https://learn.microsoft.com/en-us/java/api/com.azure.identity.defaultazurecredential?view=azure-java-stable
{ "name": "storageAccount2", "auth": { "type": "default"} },

# Example private account using SAS token auth
{ "name": "storageAccount3", "auth": { "type": "sas", "value": "tokenValue"}}
]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@ import cats.data.Validated.{Invalid, Valid}
import cats.effect._
import cats.implicits._
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.{BlobServiceClientBuilder, BlobUrlParts}
import com.azure.storage.blob.{BlobServiceAsyncClient, BlobServiceClientBuilder, BlobUrlParts}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BlobStorageClients.AzureStorage
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
import fs2.Stream

import java.net.URI
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object AzureStorageClient {

def mk[F[_]: ConcurrentEffect](storageAccountNames: List[String]): Resource[F, Client[F]] =
def mk[F[_]: ConcurrentEffect](config: AzureStorage): Resource[F, Client[F]] =
for {
stores <- createStores(storageAccountNames)
stores <- createStores(config)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
uri.toString.contains("core.windows.net")

def download(uri: URI): Stream[F, Byte] = {
val inputParts = BlobUrlParts.parse(uri.toString)
stores.get(inputParts.getAccountName) match {
case None => Stream.raiseError[F](new Exception(s"AzureStore for storage account name '${inputParts.getAccountName}' isn't found"))
case None =>
Stream.raiseError[F](new Exception(s"AzureStore for storage account name '${inputParts.getAccountName}' isn't found"))
case Some(store) =>
Authority
.parse(inputParts.getBlobContainerName)
Expand All @@ -48,27 +51,47 @@ object AzureStorageClient {
}
}

private def createStores[F[_]: ConcurrentEffect: Async](storageAccountNames: List[String]): Resource[F, Map[String, AzureStore[F]]] =
storageAccountNames.map(a => createStore(a).map(b => (a, b))).sequence.map(_.toMap)
private def createStores[F[_]: ConcurrentEffect: Async](config: AzureStorage): Resource[F, Map[String, AzureStore[F]]] =
config.accounts
.map { account =>
createStore(account).map(store => (account.name, store))
}
.sequence
.map(_.toMap)

private def createStore[F[_]: ConcurrentEffect: Async](storageAccountName: String): Resource[F, AzureStore[F]] =
private def createStore[F[_]: ConcurrentEffect: Async](account: AzureStorage.Account): Resource[F, AzureStore[F]] =
for {
client <- Resource.eval {
ConcurrentEffect[F].delay {
val builder = new BlobServiceClientBuilder().credential(new DefaultAzureCredentialBuilder().build)
val storageEndpoint = createStorageEndpoint(storageAccountName)
builder.endpoint(storageEndpoint).buildAsyncClient()
}
}
client <- createClient(account)
store <- AzureStore
.builder[F](client)
.build
.fold(
errors => Resource.eval(ConcurrentEffect[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))),
s => Resource.pure[F, AzureStore[F]](s)
)
.builder[F](client)
.build
.fold(
errors => Resource.eval(ConcurrentEffect[F].raiseError(errors.reduce(Throwables.collapsingSemigroup))),
s => Resource.pure[F, AzureStore[F]](s)
)
} yield store

private def createClient[F[_]: ConcurrentEffect: Async](account: AzureStorage.Account): Resource[F, BlobServiceAsyncClient] =
Resource.eval {
ConcurrentEffect[F].delay {
createClientBuilder(account)
.endpoint(createStorageEndpoint(account.name))
.buildAsyncClient()
}
}

private def createClientBuilder(account: AzureStorage.Account): BlobServiceClientBuilder = {
val builder = new BlobServiceClientBuilder()
account.auth match {
case Some(AzureStorage.Account.Auth.DefaultCredentialsChain) =>
builder.credential(new DefaultAzureCredentialBuilder().build)
case Some(AzureStorage.Account.Auth.SasToken(tokenValue)) =>
builder.sasToken(tokenValue)
case None =>
builder
}
}

private def createStorageEndpoint(storageAccountName: String): String =
s"https://$storageAccountName.blob.core.windows.net"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import java.util.UUID
import cats.syntax.either._

import scala.concurrent.duration.{Duration, FiniteDuration}

import _root_.io.circe.{Decoder, DecodingFailure, Encoder}
import _root_.io.circe.{Codec, Decoder, DecodingFailure, Encoder}
import _root_.io.circe.generic.extras.semiauto._
import _root_.io.circe.config.syntax._

Expand Down Expand Up @@ -492,8 +491,47 @@ object io {
azureStorage: Option[BlobStorageClients.AzureStorage]
)
object BlobStorageClients {
case class AzureStorage(storageAccountNames: List[String])

case class AzureStorage(accounts: List[AzureStorage.Account])
object AzureStorage {

final case class Account(name: String, auth: Option[Account.Auth])
object Account {

sealed trait Auth
object Auth {
final case object DefaultCredentialsChain extends Auth
final case class SasToken(value: String) extends Auth
}
}
}

implicit val sasTokenDecoder: Codec[AzureStorage.Account.Auth.SasToken] =
deriveConfiguredCodec

implicit val accountAuthDecoder: Decoder[AzureStorage.Account.Auth] =
Decoder.instance { cursor =>
val typeCur = cursor.downField("type")
typeCur.as[String].map(_.toLowerCase) match {
case Right("default") =>
Right(AzureStorage.Account.Auth.DefaultCredentialsChain)
case Right("sas") =>
cursor.as[AzureStorage.Account.Auth.SasToken]
case Right(other) =>
Left(
DecodingFailure(s"Storage account authentication type '$other' is not supported yet. Supported types: 'default', 'sas'",
typeCur.history
)
)
case Left(other) =>
Left(other)
}
}

implicit val accountAuthEncoder: Encoder[AzureStorage.Account.Auth] =
deriveConfiguredEncoder
implicit val storageAccountCodec: Codec[AzureStorage.Account] =
deriveConfiguredCodec
implicit val azureStorageDecoder: Decoder[AzureStorage] =
deriveConfiguredDecoder[AzureStorage]
implicit val azureStorageEncoder: Encoder[AzureStorage] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object Main extends IOApp.WithContext {
private def createBlobStorageClient(conf: BlobStorageClientsConfig): List[Blocker => Resource[IO, Client[IO]]] = {
val gcs = if (conf.gcs) Some((b: Blocker) => Resource.eval(GcsClient.mk[IO](b))) else None
val aws = if (conf.s3) Some((_: Blocker) => S3Client.mk[IO]) else None
val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s.storageAccountNames))
val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s))
List(gcs, aws, azure).flatten
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import cats.effect.testing.specs2.CatsIO
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Sentry}
import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BlobStorageClients.AzureStorage
import org.specs2.mutable.Specification

class ConfigSpec extends Specification with CatsIO {
Expand Down Expand Up @@ -124,7 +124,15 @@ class ConfigSpec extends Specification with CatsIO {
io.BlobStorageClients(
gcs = true,
s3 = true,
azureStorage = Some(io.BlobStorageClients.AzureStorage(List("storageAccount1", "storageAccount2")))
azureStorage = Some(
io.BlobStorageClients.AzureStorage(
List(
AzureStorage.Account(name = "storageAccount1", auth = None),
AzureStorage.Account(name = "storageAccount2", auth = Some(AzureStorage.Account.Auth.DefaultCredentialsChain)),
AzureStorage.Account(name = "storageAccount3", auth = Some(AzureStorage.Account.Auth.SasToken("tokenValue")))
)
)
)
)
)
ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import com.snowplowanalytics.snowplow.enrich.nsq.generated.BuildInfo

object Containers {

//TODO Tests fail with the latest 1.3.0 version!
private val nsqVersion = "v1.2.1"

case class NetworkInfo(
networkAlias: String,
broadcastAddress: String,
Expand Down Expand Up @@ -135,7 +138,7 @@ object Containers {
Resource.make (
Sync[F].delay {
val container = FixedHostPortGenericContainer(
imageName = "nsqio/nsq:latest",
imageName = s"nsqio/nsq:$nsqVersion",
command = Seq(
"/nsqlookupd",
s"--broadcast-address=${networkInfo.broadcastAddress}",
Expand Down Expand Up @@ -163,7 +166,7 @@ object Containers {
Resource.make(
Sync[F].delay {
val container = FixedHostPortGenericContainer(
imageName = "nsqio/nsq:latest",
imageName = s"nsqio/nsq:$nsqVersion",
command = Seq(
"/nsqd",
s"--broadcast-address=${networkInfo.broadcastAddress}",
Expand Down Expand Up @@ -196,7 +199,7 @@ object Containers {
Resource.make(
Sync[F].delay {
val container = GenericContainer(
dockerImage = "nsqio/nsq:latest",
dockerImage = s"nsqio/nsq:$nsqVersion",
command = Seq(
"/nsq_to_nsq",
s"--nsqd-tcp-address=$sourceAddress",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object Main extends IOApp.WithContext {
private def createBlobStorageClient(conf: BlobStorageClientsConfig): List[Blocker => Resource[IO, Client[IO]]] = {
val gcs = if (conf.gcs) Some((b: Blocker) => Resource.eval(GcsClient.mk[IO](b))) else None
val aws = if (conf.s3) Some((_: Blocker) => S3Client.mk[IO]) else None
val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s.storageAccountNames))
val azure = conf.azureStorage.map(s => (_: Blocker) => AzureStorageClient.mk[IO](s))
List(gcs, aws, azure).flatten
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BackoffPolicy
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, Sentry}
import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.BlobStorageClients.AzureStorage
import org.specs2.mutable.Specification

class ConfigSpec extends Specification with CatsIO {
Expand Down Expand Up @@ -136,7 +136,15 @@ class ConfigSpec extends Specification with CatsIO {
io.BlobStorageClients(
gcs = true,
s3 = true,
azureStorage = Some(io.BlobStorageClients.AzureStorage(List("storageAccount1", "storageAccount2")))
azureStorage = Some(
io.BlobStorageClients.AzureStorage(
List(
AzureStorage.Account(name = "storageAccount1", auth = None),
AzureStorage.Account(name = "storageAccount2", auth = Some(AzureStorage.Account.Auth.DefaultCredentialsChain)),
AzureStorage.Account(name = "storageAccount3", auth = Some(AzureStorage.Account.Auth.SasToken("tokenValue")))
)
)
)
)
)
ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected))
Expand Down
Loading