From 0d0a30f10a5701188c19b019f6ed455ed58b0cc2 Mon Sep 17 00:00:00 2001 From: Ben Fradet Date: Tue, 27 Jun 2017 15:14:03 +0100 Subject: [PATCH] Bump Kinesis Client Library to 1.7.5 (closes #55) --- project/Dependencies.scala | 2 +- .../s3/CredentialsLookup.scala | 2 +- .../s3/S3Emitter.scala | 18 +++++++------ .../s3/SinkApp.scala | 2 +- .../s3/sinks/KinesisSink.scala | 27 ++++++++++++------- 5 files changed, 30 insertions(+), 21 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index eb674a3..fcd8ce8 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,7 +21,7 @@ object Dependencies { object V { // Java val slf4j = "1.7.6" - val kinesisClient = "1.4.0" + val kinesisClient = "1.7.5" val kinesisConnector = "1.1.2" val hadoop = "2.7.3" val elephantbird = "4.15" diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala index bdc93a5..d0f34ca 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/CredentialsLookup.scala @@ -42,7 +42,7 @@ object CredentialsLookup { "access-key and secret-key must both be set to 'default', or neither" ) } else if (isIam(a) && isIam(s)) { - new InstanceProfileCredentialsProvider() + InstanceProfileCredentialsProvider.getInstance() } else if (isIam(a) || isIam(s)) { throw new RuntimeException("access-key and secret-key must both be set to 'iam', or neither") } else if (isEnv(a) && isEnv(s)) { diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala index 083e771..466179a 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/S3Emitter.scala @@ -30,16 +30,15 @@ import org.apache.hadoop.conf.Configuration import com.hadoop.compression.lzo.LzopCodec // Elephant bird -import com.twitter.elephantbird.mapreduce.io.{ - ThriftBlockWriter -} +import com.twitter.elephantbird.mapreduce.io.ThriftBlockWriter // Logging -import org.apache.commons.logging.{Log,LogFactory} +import org.apache.commons.logging.{Log, LogFactory} // AWS libs import com.amazonaws.AmazonServiceException -import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.model.ObjectMetadata // AWS Kinesis connector libs @@ -91,9 +90,12 @@ class S3Emitter(config: KinesisConnectorConfiguration, badSink: ISink, serialize private val TstampFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(DateTimeZone.UTC) val bucket = config.S3_BUCKET - val log = LogFactory.getLog(classOf[S3Emitter]) - val client = new AmazonS3Client(config.AWS_CREDENTIALS_PROVIDER) - client.setEndpoint(config.S3_ENDPOINT) + val log = LogFactory.getLog(getClass) + val client = AmazonS3ClientBuilder + .standard() + .withCredentials(config.AWS_CREDENTIALS_PROVIDER) + .withEndpointConfiguration(new EndpointConfiguration(config.S3_ENDPOINT, config.REGION_NAME)) + .build() val dateFormat = new SimpleDateFormat("yyyy-MM-dd"); diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala index 6fca69f..a2c7c0f 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/SinkApp.scala @@ -88,7 +88,7 @@ object SinkApp extends App { val credentials = CredentialsLookup.getCredentialsProvider(credentialConfig.getString("access-key"), credentialConfig.getString("secret-key")) - val badSink = new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkName, tracker) + val badSink = new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkRegion, kinesisSinkName, tracker) val serializer = conf.getConfig("sink").getConfig("s3").getString("format") match { case "lzo" => LzoSerializer diff --git a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala index 603877e..5842746 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/sinks/KinesisSink.scala @@ -26,16 +26,14 @@ import java.nio.ByteBuffer import scala.util.Random // Amazon -import com.amazonaws.services.kinesis.model._ import com.amazonaws.auth.AWSCredentialsProvider -import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.AmazonKinesis -import com.amazonaws.regions._ +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder +import com.amazonaws.services.kinesis.model._ // Concurrent libraries -import scala.concurrent.{Future,Await,TimeoutException} +import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ import scala.util.{Success, Failure} // Logging @@ -49,18 +47,27 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker * * @param provider AWSCredentialsProvider * @param endpoint Kinesis stream endpoint + * @param region Kinesis stream region * @param name Kinesis stream name * @param config Configuration for the Kinesis stream */ -class KinesisSink(provider: AWSCredentialsProvider, endpoint: String, name: String, tracker: Option[Tracker]) - extends ISink { +class KinesisSink( + provider: AWSCredentialsProvider, + endpoint: String, + region: String, + name: String, + tracker: Option[Tracker] +) extends ISink { private lazy val log = LoggerFactory.getLogger(getClass()) import log.{error, debug, info, trace} // Explicitly create a client so we can configure the end point - val client = new AmazonKinesisClient(provider) - client.setEndpoint(endpoint) + val client = AmazonKinesisClientBuilder + .standard() + .withCredentials(provider) + .withEndpointConfiguration(new EndpointConfiguration(endpoint, region)) + .build() if (!streamExists(name)) { throw new RuntimeException(s"Cannot write because stream $name doesn't exist or is not active")