Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Storage: Azure storage support #3253 #3254

Merged
merged 81 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 72 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
06eac15
Setting up module for Azure storage #3253
sfali Aug 23, 2024
42b8505
Azure storage common settings #3253
sfali Aug 23, 2024
4412dec
Some common functions #3253
sfali Aug 23, 2024
dc26cea
Supporting model classes #3253
sfali Aug 23, 2024
882afcd
Exception representing returned error #3253
sfali Aug 23, 2024
5a8728e
Helper class to sign and populate Authorization header #3253
sfali Aug 23, 2024
3616c5f
Implementation of Azure storage functions #3253
sfali Aug 23, 2024
7626390
Scala and Java facade to implementation #3253
sfali Aug 23, 2024
f1de209
Introducing Akka stream attributes in order to configure http request…
sfali Aug 24, 2024
426b800
Resolve settings from Akka stream attributes #3253
sfali Aug 24, 2024
e41869e
Added support for SharedKeyLite authorization #3253
sfali Aug 24, 2024
5d3b55d
Added support for SAS authorization #3253
sfali Aug 24, 2024
dcc9c21
use option for null String #3253
sfali Aug 24, 2024
582edb3
function to get default settings #3253
sfali Aug 24, 2024
7dd77c6
Support for SAS authorization #3253
sfali Aug 24, 2024
d29f3f9
Splitting service operations based on service type #3253
sfali Aug 24, 2024
997746a
Splitting service operations based on service type #3253
sfali Aug 24, 2024
34832f6
StorageException test #3253
sfali Aug 25, 2024
6cc3254
creating constants for authorization types #3253
sfali Aug 25, 2024
d51c06a
implicit class to handle validations #3253
sfali Aug 25, 2024
7b3ed57
validations of configs and falling back to default values #3253
sfali Aug 25, 2024
07e64a5
StorageSettings tests #3253
sfali Aug 25, 2024
44c2f84
support for create container operation #3253
sfali Aug 25, 2024
93c2aab
support for endpoint URL for local testing #3253
sfali Aug 25, 2024
7ac0aa4
using end point if defined to create URI #3253
sfali Aug 25, 2024
9f013cb
move signer out of retry flow, we don't need to sign every time #3253
sfali Aug 27, 2024
9b81f84
copy right header #3253
sfali Aug 27, 2024
3ad11ea
pass implicit ActorSystem #3253
sfali Aug 27, 2024
790fd5a
parse content MD5 as well #3253
sfali Aug 27, 2024
d140558
populate environment variables to run test #3253
sfali Aug 27, 2024
db681c8
Integration tests #3253
sfali Aug 27, 2024
a46d27c
Added documentation for storage connector #3253
sfali Aug 27, 2024
a9f31ea
setting mimaPreviousArtifacts to make build work #3253
sfali Aug 27, 2024
9c1dd26
Added missing endPointUrl #3253
sfali Aug 28, 2024
5576940
minor return types fix #3253
sfali Aug 28, 2024
b281abb
base class for wire mock #3253
sfali Aug 28, 2024
e0afb81
Scala and Java mock tests for documentation #3253
sfali Aug 28, 2024
6a5f80c
fix java file paths #3253
sfali Aug 28, 2024
b96ed37
fix compilation #3253
sfali Aug 28, 2024
bc45086
fix compilations #3253
sfali Aug 28, 2024
5006117
fix documentation #3253
sfali Aug 28, 2024
9be3bf8
minor typo #3253
sfali Aug 28, 2024
b8600fc
extracted some constants to be used in sub classes #3253
sfali Aug 28, 2024
be29661
FileService test #3253
sfali Aug 28, 2024
49deaae
introducing factory class to create headers #3253
sfali Aug 28, 2024
cf48d55
updated implementation to get headers from dsl #3253
sfali Aug 28, 2024
c10b3d1
rename "putBlob" to "putBlockBlob" #3253
sfali Aug 28, 2024
96601a8
functions to put (create) page and append block #3253
sfali Aug 28, 2024
a4e0687
unit tests for put (create) page and append block #3253
sfali Aug 29, 2024
dc22e08
fix return types #3253
sfali Aug 29, 2024
562950e
rename in order to accommodate page blob header #3253
sfali Aug 29, 2024
e90a717
fix compilation #3253
sfali Aug 29, 2024
70466ce
Headers for page blob update #3253
sfali Aug 29, 2024
f167907
updated rename field #3253
sfali Aug 29, 2024
c750818
extracted common function and delegate all functions to this common f…
sfali Aug 29, 2024
c8afd96
clean up, headers move to separate class #3253
sfali Aug 29, 2024
ba27834
fix compiler warning #3253
sfali Aug 29, 2024
31bdf55
using ScalaIgnore #3253
sfali Aug 29, 2024
d1b5410
fix link #3253
sfali Aug 29, 2024
9882881
introduction of request builder #3253
sfali Aug 31, 2024
9358d5e
test for loading settings from custom path #3253
sfali Aug 31, 2024
2a0220b
remove unused import #3253
sfali Aug 31, 2024
9329318
Signer spec #3253
sfali Sep 2, 2024
e770b53
Fix for multiple content length header #3253
sfali Sep 4, 2024
1cbb77e
Clean up duplicate code #3253
sfali Sep 5, 2024
da8d82f
Add more test #3253
sfali Sep 7, 2024
9fb81d1
remove support for SharedKeyLite #3253
sfali Sep 7, 2024
b18fd50
Update azure-storage/src/test/resources/logback-test.xml
sfali Sep 9, 2024
67c2eaa
Update azure-storage/src/main/resources/reference.conf
sfali Sep 9, 2024
4cfe27a
Update azure-storage/src/main/scala/akka/stream/alpakka/azure/storage…
sfali Sep 9, 2024
f5971a1
Update azure-storage/src/main/scala/akka/stream/alpakka/azure/storage…
sfali Sep 9, 2024
1eaf5df
Update azure-storage/src/main/scala/akka/stream/alpakka/azure/storage…
sfali Sep 9, 2024
18f4336
Update azure-storage/src/main/scala/akka/stream/alpakka/azure/storage…
sfali Sep 10, 2024
fcd4326
Update azure-storage/src/main/scala/akka/stream/alpakka/azure/storage…
sfali Sep 10, 2024
948db31
Fail safe error message #3253
sfali Sep 10, 2024
60f9ae9
Merge remote-tracking branch 'origin/3253_azure_storage' into 3253_az…
sfali Sep 10, 2024
689cd53
Delete Container support #3253
sfali Sep 11, 2024
d6d9ed0
Create & Delete directory support #3253
sfali Sep 11, 2024
1c9c3cf
Merge branch 'master' into 3253_azure_storage
sfali Sep 30, 2024
43f347f
fix import #3253
sfali Sep 30, 2024
1e07711
Merge branch 'master' into 3253_azure_storage
sfali Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions azure-storage/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
alpakka {
azure-storage {
api-version = "2024-11-04"
api-version = ${?AZURE_STORAGE_API_VERSION}
signing-algorithm = "HmacSHA256"

# for local testing via emulator
# endpoint-url = ""

#azure-credentials
credentials {
# valid values are anon (annonymous), SharedKey, and sas
authorization-type = anon
authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

# required for all authorization types
account-name = ""
account-name = ${?AZURE_STORAGE_ACCOUNT_NAME}

# Account key is required for SharedKey or SharedKeyLite authorization
account-key = none
account-key = ${?AZURE_STORAGE_ACCOUNT_KEY}

# SAS token for sas authorization
sas-token = ""
sas-token = ${?AZURE_STORAGE_SAS_TOKEN}
}
#azure-credentials

# Default settings corresponding to automatic retry of requests in an Azure Blob Storage stream.
retry-settings {
# The maximum number of additional attempts (following transient errors) that will be made to process a given
# request before giving up.
max-retries = 3

# The minimum delay between request retries.
min-backoff = 200ms

# The maximum delay between request retries.
max-backoff = 10s

# Random jitter factor applied to retry delay calculation.
random-factor = 0.0
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.stream.Attributes
import akka.stream.Attributes.Attribute

/**
* Akka Stream attributes that are used when materializing AzureStorage stream blueprints.
*/
object StorageAttributes {

/**
* Settings to use for the Azure Blob Storage stream
*/
def settings(settings: StorageSettings): Attributes = Attributes(StorageSettingsValue(settings))

/**
* Config path which will be used to resolve required AzureStorage settings
*/
def settingsPath(path: String): Attributes = Attributes(StorageSettingsPath(path))

/**
* Default settings
*/
def defaultSettings: Attributes = Attributes(StorageSettingsPath.Default)
}

final class StorageSettingsPath private (val path: String) extends Attribute
object StorageSettingsPath {
sfali marked this conversation as resolved.
Show resolved Hide resolved
val Default: StorageSettingsPath = StorageSettingsPath(StorageSettings.ConfigPath)

def apply(path: String) = new StorageSettingsPath(path)
}

final class StorageSettingsValue private (val settings: StorageSettings) extends Attribute
object StorageSettingsValue {
sfali marked this conversation as resolved.
Show resolved Hide resolved
def apply(settings: StorageSettings) = new StorageSettingsValue(settings)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.http.scaladsl.model.StatusCode

import scala.util.{Failure, Success, Try}
import scala.xml.{Elem, NodeSeq, XML}

final case class StorageException(statusCode: StatusCode,
errorCode: String,
errorMessage: String,
resourceName: Option[String],
resourceValue: Option[String],
reason: Option[String])
extends RuntimeException(errorMessage) {

override def toString: String =
s"""StorageException(
|statusCode=$statusCode,
| errorCode=$errorCode,
| errorMessage=$errorMessage,
| resourceName=$resourceName,
| resourceValue=$resourceValue,
| reason=$reason
|)""".stripMargin.replaceAll(System.lineSeparator(), "")
}

object StorageException {
def apply(statusCode: StatusCode,
errorCode: String,
errorMessage: String,
resourceName: Option[String],
resourceValue: Option[String],
reason: Option[String]): StorageException =
new StorageException(statusCode, errorCode, errorMessage, resourceName, resourceValue, reason)

def apply(response: String, statusCode: StatusCode): StorageException = {
def getOptionalValue(xmlResponse: Elem, elementName: String, fallBackElementName: Option[String]) = {
val element = xmlResponse \ elementName
val node =
if (element.nonEmpty) element
else if (fallBackElementName.nonEmpty) xmlResponse \ fallBackElementName.get
else NodeSeq.Empty

emptyStringToOption(node.text)
}

Try {
val utf8_bom = "\uFEFF"
val sanitizedResponse = if (response.startsWith(utf8_bom)) response.substring(1) else response
val xmlResponse = XML.loadString(sanitizedResponse)
StorageException(
statusCode = statusCode,
errorCode = (xmlResponse \ "Code").text,
errorMessage = (xmlResponse \ "Message").text,
resourceName = getOptionalValue(xmlResponse, "QueryParameterName", Some("HeaderName")),
resourceValue = getOptionalValue(xmlResponse, "QueryParameterValue", Some("HeaderValue")),
reason = getOptionalValue(xmlResponse, "Reason", Some("AuthenticationErrorDetail"))
)
} match {
case Failure(ex) =>
StorageException(
statusCode = statusCode,
errorCode = Option(ex.getMessage).getOrElse("null"),
errorMessage = Option(response).getOrElse("null"),
resourceName = None,
resourceValue = None,
reason = None
)
case Success(value) => value
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage

import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}

/**
* Manages one [[StorageSettings]] per `ActorSystem`.
*/
final class StorageExt private (sys: ExtendedActorSystem) extends Extension {
val settings: StorageSettings = settings(StorageSettings.ConfigPath)

def settings(prefix: String): StorageSettings = StorageSettings(sys.settings.config.getConfig(prefix))
}

object StorageExt extends ExtensionId[StorageExt] with ExtensionIdProvider {
override def lookup: StorageExt.type = StorageExt
override def createExtension(system: ExtendedActorSystem) = new StorageExt(system)

/**
* Java API.
* Get the Storage extension with the classic actors API.
*/
override def get(system: ActorSystem): StorageExt = super.apply(system)

/**
* Java API.
* Get the Storage extension with the new actors API.
*/
override def get(system: ClassicActorSystemProvider): StorageExt = super.apply(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package headers

import akka.annotation.InternalApi
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader

import java.security.MessageDigest
import java.util.{Base64, Objects}

sealed abstract class ServerSideEncryption {
@InternalApi private[storage] def headers: Seq[HttpHeader]
}

object ServerSideEncryption {
def customerKey(key: String, hash: Option[String]): ServerSideEncryption = new CustomerKey(key, hash)
def customerKey(key: String): ServerSideEncryption = customerKey(key, None)
}

final class CustomerKey private[headers] (val key: String, val hash: Option[String] = None)
extends ServerSideEncryption {
override private[storage] def headers: Seq[HttpHeader] = Seq(
RawHeader("x-ms-encryption-algorithm", "AES256"),
RawHeader("x-ms-encryption-key", key),
RawHeader("x-ms-encryption-key-sha256", hash.getOrElse(createHash))
)

override def equals(obj: Any): Boolean =
obj match {
case other: CustomerKey => key == other.key && hash == other.hash
case _ => false
}

override def hashCode(): Int = Objects.hash(key, hash)

override def toString: String =
s"""ServerSideEncryption.CustomerKeys(
|key=$key,
| hash=$hash
|)
|""".stripMargin.replaceAll(System.lineSeparator(), "")

private def createHash = {
val messageDigest = MessageDigest.getInstance("SHA-256")
val decodedKey = messageDigest.digest(Base64.getDecoder.decode(key))
Base64.getEncoder.encodeToString(decodedKey)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package headers

import akka.annotation.InternalApi
import akka.http.scaladsl.model.{ContentType, HttpHeader}
import akka.http.scaladsl.model.headers.{CustomHeader, RawHeader}

private[storage] case class CustomContentTypeHeader(contentType: ContentType) extends CustomHeader {
override def name(): String = "Content-Type"

override def value(): String = contentType.value

override def renderInRequests(): Boolean = true

override def renderInResponses(): Boolean = true
}

private[storage] case class CustomContentLengthHeader(contentLength: Long) extends CustomHeader {
override def name(): String = "Content-Length"

override def value(): String = contentLength.toString

override def renderInRequests(): Boolean = true

override def renderInResponses(): Boolean = true
}

private[storage] case class BlobTypeHeader(blobType: String) {
@InternalApi private[storage] def header: HttpHeader = RawHeader(BlobTypeHeaderKey, blobType)
}

object BlobTypeHeader {
private[storage] val BlockBlobHeader = new BlobTypeHeader(BlockBlobType)
private[storage] val PageBlobHeader = new BlobTypeHeader(PageBlobType)
private[storage] val AppendBlobHeader = new BlobTypeHeader(AppendBlobType)
}

private[storage] case class RangeWriteTypeHeader(headerName: String, writeType: String) {
@InternalApi private[storage] def header: HttpHeader = RawHeader(headerName, writeType)
}

object RangeWriteTypeHeader {
private[storage] val UpdateFileHeader = new RangeWriteTypeHeader(FileWriteTypeHeaderKey, "update")
private[storage] val ClearFileHeader = new RangeWriteTypeHeader(FileWriteTypeHeaderKey, "clear")
private[storage] val UpdatePageHeader = new RangeWriteTypeHeader(PageWriteTypeHeaderKey, "update")
private[storage] val ClearPageHeader = new RangeWriteTypeHeader(PageWriteTypeHeaderKey, "clear")
}
Loading
Loading