Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Replace Scalazon with Kinesis library calls to attempt to isolate rec…
Browse files Browse the repository at this point in the history
…ords issue. (#1)
  • Loading branch information
Brandon Amos committed Jan 2, 2014
1 parent fd59316 commit c940592
Showing 1 changed file with 54 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.kinesis.consumer

// Java
import scala.collection.JavaConversions._
import java.nio.ByteBuffer

// Amazon
Expand All @@ -21,12 +22,13 @@ import com.amazonaws.auth.{
BasicAWSCredentials,
ClasspathPropertiesFileCredentialsProvider
}

// Scalazon (for Kinesis interaction)
import io.github.cloudify.scala.aws.kinesis.Client
import io.github.cloudify.scala.aws.kinesis.Client.ImplicitExecution._
import io.github.cloudify.scala.aws.kinesis.Definitions.{Stream,PutResult}
import io.github.cloudify.scala.aws.kinesis.KinesisDsl._
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.{
DescribeStreamRequest,
GetShardIteratorRequest,
GetShardIteratorResult,
GetRecordsRequest
}

// Config
import com.typesafe.config.Config
Expand All @@ -42,6 +44,8 @@ import scala.concurrent.duration._
// Thrift.
import org.apache.thrift.TDeserializer

import scala.collection.mutable.MutableList

/**
* The core logic for the Kinesis event consumer.
*/
Expand All @@ -62,15 +66,53 @@ case class StreamConsumer(config: Config) {
}

// Initialize
private implicit val kinesis = createKinesisClient(ConsumerConfig.awsAccessKey, ConsumerConfig.awsSecretKey)
private var stream: Option[Stream] = None
private val kinesis = new AmazonKinesisClient(new BasicAWSCredentials(ConsumerConfig.awsAccessKey, ConsumerConfig.awsSecretKey))
private var stream: Option[String] = None
private val thriftDeserializer = new TDeserializer()

val printData: (Array[Byte] => Unit) =
if (ConsumerConfig.streamDataType == "string") printDataString
else if (ConsumerConfig.streamDataType == "thrift") printDataThrift
else throw new RuntimeException(
"data-type configuration must be 'string' or 'thrift'.")

// Print all records in the current stream.
def printRecords() {
if (stream.isEmpty) {
stream = Some(Kinesis.stream(ConsumerConfig.streamName))
val describeStreamRequest = new DescribeStreamRequest()
describeStreamRequest.setStreamName( ConsumerConfig.streamName )
val describeStreamResult = kinesis.describeStream(
describeStreamRequest)

val shard = describeStreamResult.getStreamDescription().getShards().get(0)

val getShardIteratorRequest = new GetShardIteratorRequest()
getShardIteratorRequest.setStreamName(ConsumerConfig.streamName)
getShardIteratorRequest.setShardId(shard.getShardId())

getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON")
//getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER")
//getShardIteratorRequest.setStartingSequenceNumber("49535335286294925926483070100675855584759759966276943873")

val getShardIteratorResult = kinesis.getShardIterator(
getShardIteratorRequest);
val shardIterator = getShardIteratorResult.getShardIterator();

val getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(1000);

val getRecordsResult = kinesis.getRecords(getRecordsRequest);
val records = getRecordsResult.getRecords();
var count = 0
for (record <- records) {
println("sequenceNumber: " + record.getSequenceNumber)
printData(record.getData().array())
println("partitionKey: " + record.getPartitionKey)
count += 1
}
println(s"Read $count records.")

/*
val getRecords = for {
shards <- stream.get.shards.list
iterators <- Future.sequence(shards.map {
Expand All @@ -82,11 +124,6 @@ case class StreamConsumer(config: Config) {
} yield records
val recordChunks = Await.result(getRecords, 30.seconds)
val printData: (Array[Byte] => Unit) =
if (ConsumerConfig.streamDataType == "string") printDataString
else if (ConsumerConfig.streamDataType == "thrift") printDataThrift
else throw new RuntimeException(
"data-type configuration must be 'string' or 'thrift'.")
for (recordChunk <- recordChunks) {
println("==Record chunk.")
for (record <- recordChunk.records) {
Expand All @@ -95,39 +132,14 @@ case class StreamConsumer(config: Config) {
println("partitionKey: " + record.partitionKey)
}
}
*/
}

def printDataString(data: Array[Byte]) = println("data: " + new String(data))

def printDataThrift(data: Array[Byte]) = {
var deserializedData: generated.StreamData = new generated.StreamData()
thriftDeserializer.deserialize(deserializedData, data)
println("data: " + data)
println("data: " + deserializedData)
}

/**
* Creates a new Kinesis client from provided AWS access key and secret
* key. If both are set to "cpf", then authenticate using the classpath
* properties file.
*
* @return the initialized AmazonKinesisClient
*/
private[consumer] def createKinesisClient(
accessKey: String, secretKey: String): Client =
if (isCpf(accessKey) && isCpf(secretKey)) {
Client.fromCredentials(new ClasspathPropertiesFileCredentialsProvider())
} else if (isCpf(accessKey) || isCpf(secretKey)) {
throw new RuntimeException("access-key and secret-key must both be set to 'cpf', or neither of them")
} else {
Client.fromCredentials(accessKey, secretKey)
}

/**
* Is the access/secret key set to the special value "cpf" i.e. use
* the classpath properties file for credentials.
*
* @param key The key to check
* @return true if key is cpf, false otherwise
*/
private[consumer] def isCpf(key: String): Boolean = (key == "cpf")
}

0 comments on commit c940592

Please sign in to comment.