Skip to content

Commit

Permalink
Bump Kinesis Client Library to 1.7.5 (closes #55)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Jun 29, 2017
1 parent 70b9176 commit 0d0a30f
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Dependencies {
object V {
// Java
val slf4j = "1.7.6"
val kinesisClient = "1.4.0"
val kinesisClient = "1.7.5"
val kinesisConnector = "1.1.2"
val hadoop = "2.7.3"
val elephantbird = "4.15"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object CredentialsLookup {
"access-key and secret-key must both be set to 'default', or neither"
)
} else if (isIam(a) && isIam(s)) {
new InstanceProfileCredentialsProvider()
InstanceProfileCredentialsProvider.getInstance()
} else if (isIam(a) || isIam(s)) {
throw new RuntimeException("access-key and secret-key must both be set to 'iam', or neither")
} else if (isEnv(a) && isEnv(s)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ import org.apache.hadoop.conf.Configuration
import com.hadoop.compression.lzo.LzopCodec

// Elephant bird
import com.twitter.elephantbird.mapreduce.io.{
ThriftBlockWriter
}
import com.twitter.elephantbird.mapreduce.io.ThriftBlockWriter

// Logging
import org.apache.commons.logging.{Log,LogFactory}
import org.apache.commons.logging.{Log, LogFactory}

// AWS libs
import com.amazonaws.AmazonServiceException
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.ObjectMetadata

// AWS Kinesis connector libs
Expand Down Expand Up @@ -91,9 +90,12 @@ class S3Emitter(config: KinesisConnectorConfiguration, badSink: ISink, serialize
private val TstampFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(DateTimeZone.UTC)

val bucket = config.S3_BUCKET
val log = LogFactory.getLog(classOf[S3Emitter])
val client = new AmazonS3Client(config.AWS_CREDENTIALS_PROVIDER)
client.setEndpoint(config.S3_ENDPOINT)
val log = LogFactory.getLog(getClass)
val client = AmazonS3ClientBuilder
.standard()
.withCredentials(config.AWS_CREDENTIALS_PROVIDER)
.withEndpointConfiguration(new EndpointConfiguration(config.S3_ENDPOINT, config.REGION_NAME))
.build()

val dateFormat = new SimpleDateFormat("yyyy-MM-dd");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object SinkApp extends App {

val credentials = CredentialsLookup.getCredentialsProvider(credentialConfig.getString("access-key"), credentialConfig.getString("secret-key"))

val badSink = new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkName, tracker)
val badSink = new KinesisSink(credentials, kinesisSinkEndpoint, kinesisSinkRegion, kinesisSinkName, tracker)

val serializer = conf.getConfig("sink").getConfig("s3").getString("format") match {
case "lzo" => LzoSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@ import java.nio.ByteBuffer
import scala.util.Random

// Amazon
import com.amazonaws.services.kinesis.model._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.AmazonKinesis
import com.amazonaws.regions._
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.model._

// Concurrent libraries
import scala.concurrent.{Future,Await,TimeoutException}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Success, Failure}

// Logging
Expand All @@ -49,18 +47,27 @@ import com.snowplowanalytics.snowplow.scalatracker.Tracker
*
* @param provider AWSCredentialsProvider
* @param endpoint Kinesis stream endpoint
* @param region Kinesis stream region
* @param name Kinesis stream name
* @param config Configuration for the Kinesis stream
*/
class KinesisSink(provider: AWSCredentialsProvider, endpoint: String, name: String, tracker: Option[Tracker])
extends ISink {
class KinesisSink(
provider: AWSCredentialsProvider,
endpoint: String,
region: String,
name: String,
tracker: Option[Tracker]
) extends ISink {

private lazy val log = LoggerFactory.getLogger(getClass())
import log.{error, debug, info, trace}

// Explicitly create a client so we can configure the end point
val client = new AmazonKinesisClient(provider)
client.setEndpoint(endpoint)
val client = AmazonKinesisClientBuilder
.standard()
.withCredentials(provider)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, region))
.build()

if (!streamExists(name)) {
throw new RuntimeException(s"Cannot write because stream $name doesn't exist or is not active")
Expand Down

0 comments on commit 0d0a30f

Please sign in to comment.