Skip to content

Commit

Permalink
Merge branch 'master' of github.com:lensesio/stream-reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstevenson committed Oct 29, 2020
2 parents 032d359 + 5c56c81 commit b8e1bf8
Show file tree
Hide file tree
Showing 26 changed files with 293 additions and 70 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ A collection of components to build a real time ingestion pipeline.

## Release Notes

**2.2.0**

* AWS S3 Sink Connector
* Prevent null pointer exception in converters when maps are presented will null values
* Offset reader optimisation to reduce S3 load
* Ensuring that commit only occurs after the preconfigured time interval when using WITH_FLUSH_INTERVAL
* AWS S3 Source Connector (New Connector)
* Cassandra Source Connector
* Add Bucket Timeseries Mode
* Reduction of logging noise
* Proper handling of uninitialized connections on task stop()
* Elasticsearch Sink Connector
* Update default port
* Hive Sink
* Improve Orc format handling
* Fixing issues with partitioning by non-string keys
* Hive Source
* Ensuring newly written files can be read by the hive connector by introduction of a refresh frequency configuration option.
* Redis Sink
* Correct Redis writer initialisation


**2.1.0**

* AWS S3 Sink Connector
Expand Down
32 changes: 30 additions & 2 deletions kafka-connect-aws-s3/README-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,44 @@ An example configuration is provided:
topics=$TOPIC_NAME
tasks.max=1
connect.s3.kcql=insert into $BUCKET_NAME:$PREFIX_NAME select * from $TOPIC_NAME STOREAS `parquet` WITH_FLUSH_COUNT = 5000
aws.region=eu-west-1
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY
aws.auth.mode=Credentials

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.


### Auth Mode configuration

2 Authentication modes are available:

#### Credentials

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY


#### Default

In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.

aws.auth.mode=Default

The credentials will be discovered through the default chain, in this order:

> * **Environment Variables** - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
> * **Java System Properties** - aws.accessKeyId and aws.secretKey
> * **Web Identity Token credentials** from the environment or container
> * **Credential profiles file** at the default location (~/.aws/credentials)
> * **EC2 Credentials** delivered through the Amazon EC2 container service
> * **Instance profile credentials** delivered through the Amazon EC2 metadata service
The full details of the default chain are available on [S3 Documentation](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)


### Sink Format configuration
Expand Down
32 changes: 30 additions & 2 deletions kafka-connect-aws-s3/README-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ An example configuration is provided:
connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasks.max=1
connect.s3.kcql=insert into $TOPIC_NAME select * from $BUCKET_NAME:$PREFIX_NAME STOREAS `parquet`
aws.region=eu-west-1
aws.secret.key=SECRET_KEY
aws.access.key=ACCESS_KEY
aws.auth.mode=Credentials
Expand All @@ -24,9 +23,38 @@ An example configuration is provided:

You should replace $BUCKET_NAME, $PREFIX_NAME and $TOPIC_NAME with the names of the bucket, desired prefix and topic.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.


### Auth Mode configuration

2 Authentication modes are available:

#### Credentials

ACCESS_KEY and SECRET_KEY are credentials generated within AWS IAM and must be set and configured with permissions to write to the desired S3 bucket.

Please read below for a detailed explanation of these and other options, including the meaning of WITH_FLUSH_COUNT and its alternatives.
aws.auth.mode=Credentials
aws.access.key=ACCESS_KEY
aws.secret.key=SECRET_KEY


#### Default

In this auth mode no credentials need be supplied. If no auth mode is specified, then this default will be used.

aws.auth.mode=Default

The credentials will be discovered through the default chain, in this order:

> * **Environment Variables** - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
> * **Java System Properties** - aws.accessKeyId and aws.secretKey
> * **Web Identity Token credentials** from the environment or container
> * **Credential profiles file** at the default location (~/.aws/credentials)
> * **EC2 Credentials** delivered through the Amazon EC2 container service
> * **Instance profile credentials** delivered through the Amazon EC2 metadata service
The full details of the default chain are available on [S3 Documentation](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)


### Source Format configuration
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-aws-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ project(":kafka-connect-aws-s3") {

ext {
awsSdkVersion = "1.11.762"
jCloudsSdkVersion = "2.2.0"
jCloudsSdkVersion = "2.2.1"
enumeratumVersion = "1.5.15"
catsVersion = "1.2.0"
parquetVersion = "1.11.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@ package io.lenses.streamreactor.connect.aws.s3.auth

import java.util.Properties

import com.amazonaws.auth.{AWSCredentialsProvider, AWSSessionCredentials, DefaultAWSCredentialsProviderChain}
import com.google.common.base.Supplier
import io.lenses.streamreactor.connect.aws.s3.config.{AuthMode, S3Config}
import org.jclouds.ContextBuilder
import org.jclouds.aws.domain.SessionCredentials
import org.jclouds.blobstore.BlobStoreContext

import org.jclouds.domain.Credentials

object AwsContextCreator {

def fromConfig(awsConfig: S3Config): BlobStoreContext = {
def DefaultCredentialsFn : () => AWSCredentialsProvider = {
() => new DefaultAWSCredentialsProviderChain()
}

val (access, secret) = getAuthModeFn(awsConfig)
}

class AwsContextCreator(credentialsProviderFn: () => AWSCredentialsProvider) {

private val missingCredentialsError = "Configured to use credentials however one or both of `AWS_ACCESS_KEY` or `AWS_SECRET_KEY` are missing."

def fromConfig(awsConfig: S3Config): BlobStoreContext = {

val contextBuilder = ContextBuilder
.newBuilder("aws-s3")
.credentials(access, secret)
.credentialsSupplier(credentialsSupplier(awsConfig))

awsConfig.customEndpoint.foreach(contextBuilder.endpoint)

Expand All @@ -44,19 +55,27 @@ object AwsContextCreator {

}

private def getAuthModeFn(awsConfig: S3Config): (String, String) =

private def credentialsSupplier(awsConfig: S3Config): Supplier[Credentials] = {
awsConfig.authMode match {
case AuthMode.Credentials => credentialsConfigFn(awsConfig: S3Config)
case _ => throwErrorConfigFn(awsConfig.authMode)
case AuthMode.Credentials => credentialsFromConfig(awsConfig: S3Config)
case _ => credentialsFromDefaultChain()
}
}

private def credentialsConfigFn = (awsConfig: S3Config) => (
awsConfig.accessKey,
awsConfig.secretKey
private def credentialsFromConfig(awsConfig: S3Config): Supplier[Credentials] = () => new Credentials(
awsConfig.accessKey.filter(_.trim.nonEmpty).getOrElse(throw new IllegalArgumentException(missingCredentialsError)),
awsConfig.secretKey.filter(_.trim.nonEmpty).getOrElse(throw new IllegalArgumentException(missingCredentialsError))
)

private def throwErrorConfigFn(authMode: AuthMode) = throw new NotImplementedError(s"The auth mode $authMode is not currently supported")
private def credentialsFromDefaultChain(): Supplier[Credentials] = () => {
val credentialsProvider = credentialsProviderFn()
val credentials = Option(credentialsProvider.getCredentials)
.getOrElse(throw new IllegalStateException("No credentials found on default provider chain."))
credentials match {
case credentials: AWSSessionCredentials => SessionCredentials.builder().accessKeyId(credentials.getAWSAccessKeyId).secretAccessKey(credentials.getAWSSecretKey).sessionToken(credentials.getSessionToken).build()
case _ => new Credentials(credentials.getAWSAccessKeyId, credentials.getAWSSecretKey)
}
}

private def createOverride() = {
val overrides = new Properties()
Expand All @@ -68,5 +87,4 @@ object AwsContextCreator {
blobStoreContext.close()
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ object AuthMode extends Enum[AuthMode] {

case object Credentials extends AuthMode

case object EC2 extends AuthMode

case object ECS extends AuthMode

case object Env extends AuthMode
case object Default extends AuthMode

}

Expand Down Expand Up @@ -120,21 +116,19 @@ object Format extends Enum[Format] {

object S3Config {
def apply(props: Map[String, String]): S3Config = S3Config(
props.getOrElse(AWS_REGION, throw new IllegalArgumentException("No AWS_REGION supplied")),
props.getOrElse(AWS_ACCESS_KEY, throw new IllegalArgumentException("No AWS_ACCESS_KEY supplied")),
props.getOrElse(AWS_SECRET_KEY, throw new IllegalArgumentException("No AWS_SECRET_KEY supplied")),
props.get(AWS_ACCESS_KEY),
props.get(AWS_SECRET_KEY),
AuthMode.withNameInsensitive(
props.getOrElse(AUTH_MODE, throw new IllegalArgumentException("No AUTH_MODE supplied"))
props.getOrElse(AUTH_MODE, AuthMode.Default.toString)
),
props.get(CUSTOM_ENDPOINT),
props.getOrElse(ENABLE_VIRTUAL_HOST_BUCKETS, "false").toBoolean,
)
}

case class S3Config(
region: String,
accessKey: String,
secretKey: String,
accessKey: Option[String],
secretKey: Option[String],
authMode: AuthMode,
customEndpoint: Option[String] = None,
enableVirtualHostBuckets: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ object S3ConfigDef {
import S3ConfigSettings._

val config: ConfigDef = new ConfigDef()
.define(
AWS_REGION,
Type.STRING,
"",
Importance.HIGH,
"AWS region the Secrets manager is in"
)
.define(
AWS_ACCESS_KEY,
Type.STRING,
Expand All @@ -52,14 +45,14 @@ object S3ConfigDef {
.define(
AUTH_MODE,
Type.STRING,
AuthMode.Credentials.toString,
AuthMode.Default.toString,
Importance.HIGH,
"Authenticate mode, 'env', 'credentials', 'ec2' or 'eks'"
"Authenticate mode, 'env' or 'default'"
)
.define(
CUSTOM_ENDPOINT,
Type.STRING,
AuthMode.Credentials.toString,
"",
Importance.LOW,
"Custom S3-compatible endpoint (usually for testing)"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ object S3ConfigSettings {

val CONNECTOR_PREFIX = "connect.s3"

val AWS_REGION: String = "aws.region"
val AWS_ACCESS_KEY: String = "aws.access.key"
val AWS_SECRET_KEY: String = "aws.secret.key"
val AUTH_MODE: String = "aws.auth.mode"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.lenses.streamreactor.connect.aws.s3.model

import com.amazonaws.services.s3.internal.BucketNameUtils


case object BucketAndPrefix {
def apply(bucketAndPath: String): BucketAndPrefix = {
Expand All @@ -31,6 +33,9 @@ case class BucketAndPrefix(
bucket: String,
prefix: Option[String]
) {

BucketNameUtils.validateBucketName(bucket)

prefix
.filter(_.contains("/"))
.foreach(_ => throw new IllegalArgumentException("Nested prefix not currently supported"))
Expand All @@ -39,4 +44,8 @@ case class BucketAndPrefix(
case class BucketAndPath(
bucket: String,
path: String
)
) {

BucketNameUtils.validateBucketName(bucket)

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,30 @@ class S3SinkTask extends SinkTask {

override def version(): String = manifest.version()

def validateBuckets(storageInterface: StorageInterface, config: S3SinkConfig) = {
config.bucketOptions.foreach(
bucketOption => {
val bucketAndPrefix = bucketOption.bucketAndPrefix
storageInterface.list(bucketAndPrefix)
}
)
}

override def start(props: util.Map[String, String]): Unit = {

logger.debug(s"Received call to S3SinkTask.start with ${props.size()} properties")

val awsConfig = S3SinkConfig(props.asScala.toMap)


storageInterface = new MultipartBlobStoreStorageInterface(AwsContextCreator.fromConfig(awsConfig.s3Config))
val awsContextCreator = new AwsContextCreator(AwsContextCreator.DefaultCredentialsFn)
storageInterface = new MultipartBlobStoreStorageInterface(awsContextCreator.fromConfig(awsConfig.s3Config))

val configs = Option(context).flatMap(c => Option(c.configs())).filter(_.isEmpty == false).getOrElse(props)

config = S3SinkConfig(configs.asScala.toMap)

validateBuckets(storageInterface, config)

writerManager = S3WriterManager.from(config)(storageInterface)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class S3SourceTask extends SourceTask {

logger.debug(s"Received call to S3SinkTask.start with ${props.size()} properties")

val awsConfig = S3SinkConfig(props.asScala.toMap)
val awsConfig = S3SourceConfig(props.asScala.toMap)

storageInterface = new MultipartBlobStoreStorageInterface(AwsContextCreator.fromConfig(awsConfig.s3Config))
val awsContextCreator = new AwsContextCreator(AwsContextCreator.DefaultCredentialsFn)
storageInterface = new MultipartBlobStoreStorageInterface(awsContextCreator.fromConfig(awsConfig.s3Config))
sourceLister = new S3SourceLister()

val configs = Option(context).flatMap(c => Option(c.configs())).filter(_.isEmpty == false).getOrElse(props)
Expand Down
Loading

0 comments on commit b8e1bf8

Please sign in to comment.