Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Java SDK v2 and dependency upgrade #614

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/kinesis/core/kinesis_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct EndpointConfiguration {
sts_endpoint_(sts_endpoint) {}
};

const constexpr char* kVersion = "0.15.13N";
const constexpr char* kVersion = "1.0.0N";
const std::unordered_map< std::string, EndpointConfiguration > kRegionEndpointOverride = {
{ "cn-north-1", { "kinesis.cn-north-1.amazonaws.com.cn", "monitoring.cn-north-1.amazonaws.com.cn" } },
{ "cn-northwest-1", { "kinesis.cn-northwest-1.amazonaws.com.cn", "monitoring.cn-northwest-1.amazonaws.com.cn" } }
Expand Down
38 changes: 8 additions & 30 deletions java/amazon-kinesis-producer-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<version>3.11.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
Expand All @@ -38,14 +38,12 @@
</plugins>
</build>
<name>KinesisProducerLibrary Sample Application</name>
<properties>
<aws-java-sdk.version>1.12.772</aws-java-sdk.version>
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.14.8</version>
<version>3.0.1</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
Expand All @@ -56,33 +54,13 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.15.13</version>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,111 +18,127 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.SingleStreamTracker;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/**
* If you haven't looked at {@link SampleProducer}, do so first.
*
*
* <p>
* As mentioned in SampleProducer, we will check that all records are received
* correctly by the KCL by verifying that there are no gaps in the sequence
* numbers.
*
*
* <p>
* As the consumer runs, it will periodically log a message indicating the
* number of gaps it found in the sequence numbers. A gap is when the difference
* between two consecutive elements in the sorted list of seen sequence numbers
* is greater than 1.
*
*
* <p>
* Over time the number of gaps should converge to 0. You should also observe
* that the range of sequence numbers seen is equal to the number of records put
* by the SampleProducer.
*
*
* <p>
* If the stream contains data from multiple runs of SampleProducer, you should
* observe the SampleConsumer detecting this and resetting state to only count
* the latest run.
*
*
* <p>
* Note if you kill the SampleConsumer halfway and run it again, the number of
* gaps may never converge to 0. This is because checkpoints may have been made
* such that some records from the producer's latest run are not processed
* again. If you observe this, simply run the producer to completion again
* without terminating the consumer.
*
*
* <p>
* The consumer continues running until manually terminated, even if there are
* no more records to consume.
*
* @see SampleProducer
* @author chaodeng
*
* @author chaodeng
* @see SampleProducer
*/
public class SampleConsumer implements IRecordProcessorFactory {
public class SampleConsumer implements ShardRecordProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class);

// All records from a run of the producer have the same timestamp in their
// partition keys. Since this value increases for each run, we can use it
// determine which run is the latest and disregard data from earlier runs.
private final AtomicLong largestTimestamp = new AtomicLong(0);

// List of record sequence numbers we have seen so far.
private final List<Long> sequenceNumbers = new ArrayList<>();

// A mutex for largestTimestamp and sequenceNumbers. largestTimestamp is
// nevertheless an AtomicLong because we cannot capture non-final variables
// in the child class.
private final Object lock = new Object();

@Override
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}

/**
* One instance of RecordProcessor is created for every shard in the stream.
* All instances of RecordProcessor share state by capturing variables from
* the enclosing SampleConsumer instance. This is a simple way to combine
* the data from multiple shards.
*/
private class RecordProcessor implements IRecordProcessor {
private class RecordProcessor implements ShardRecordProcessor {
@Override
public void initialize(String shardId) {}
public void initialize(InitializationInput initializationInput) {
}

@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
public void processRecords(ProcessRecordsInput processRecordsInput) {
long timestamp = 0;
List<Long> seqNos = new ArrayList<>();
for (Record r : records) {

for (KinesisClientRecord r : processRecordsInput.records()) {
// Get the timestamp of this run from the partition key.
timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));
timestamp = Math.max(timestamp, Long.parseLong(r.partitionKey()));

// Extract the sequence number. It's encoded as a decimal
// string and placed at the beginning of the record data,
// followed by a space. The rest of the record data is padding
// that we will simply discard.
try {
byte[] b = new byte[r.getData().remaining()];
r.getData().get(b);
byte[] b = new byte[r.data().remaining()];
r.data().get(b);
seqNos.add(Long.parseLong(new String(b, "UTF-8").split(" ")[0]));
} catch (Exception e) {
log.error("Error parsing record", e);
System.exit(1);
}
}

synchronized (lock) {
if (largestTimestamp.get() < timestamp) {
log.info(String.format(
Expand All @@ -131,32 +147,45 @@ public void processRecords(List<Record> records, IRecordProcessorCheckpointer ch
largestTimestamp.set(timestamp);
sequenceNumbers.clear();
}

// Only add to the shared list if our data is from the latest run.
if (largestTimestamp.get() == timestamp) {
sequenceNumbers.addAll(seqNos);
Collections.sort(sequenceNumbers);
}
}

try {
checkpointer.checkpoint();
processRecordsInput.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during ProcessRecords", e);
}
}

@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Shutting down, reason: " + reason);
private void checkpoint(RecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Checkpointing, reason: {}", reason);
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during Shutdown", e);
}
}

@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}

@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
checkpoint(shardEndedInput.checkpointer(), ShutdownReason.SHARD_END);
}

@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
checkpoint(shutdownRequestedInput.checkpointer(), ShutdownReason.REQUESTED);
}
}

/**
* Log a message indicating the current state.
*/
Expand All @@ -165,12 +194,12 @@ public void logResults() {
if (largestTimestamp.get() == 0) {
return;
}

if (sequenceNumbers.size() == 0) {
log.info("No sequence numbers found for current run.");
return;
}

// The producer assigns sequence numbers starting from 1, so we
// start counting from one before that, i.e. 0.
long last = 0;
Expand All @@ -181,49 +210,47 @@ public void logResults() {
}
last = sn;
}

log.info(String.format(
"Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
}
}

@Override
public IRecordProcessor createProcessor() {
return this.new RecordProcessor();
}


/** The main method.
* @param args The command line args for the Sample Producer.
* @param args The command line args for the Sample Producer.
* The main method takes 2 optional position parameters:
* 1. The stream name to use (test is default)
* 2. The region name to use (us-west-1 in default)
*/
public static void main(String[] args) {
int argIndex=0;
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(
"KinesisProducerLibSampleConsumer",
SampleProducerConfig.getArgIfPresent(args, argIndex++, SampleProducerConfig.STREAM_NAME_DEFAULT),
new DefaultAWSCredentialsProviderChain(),
"KinesisProducerLibSampleConsumer")
.withRegionName(SampleProducerConfig.getArgIfPresent(args, argIndex++, SampleProducerConfig.REGION_DEFAULT))
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final String streamName = SampleProducerConfig.getArgIfPresent(args, argIndex++, SampleProducerConfig.STREAM_NAME_DEFAULT);
final Region region = Region.of(SampleProducerConfig.getArgIfPresent(args, argIndex++, SampleProducerConfig.REGION_DEFAULT));

final SampleConsumer consumer = new SampleConsumer();

final KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
final DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
final CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();

final ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, "KinesisProducerLibSampleConsumer", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), consumer);

Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
consumer.logResults();
}
}, 10, 1, TimeUnit.SECONDS);

new Worker.Builder()
.recordProcessorFactory(consumer)
.config(config)
.build()
.run();

new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().streamTracker(new SingleStreamTracker(streamName,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON))))
.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
Expand Down
Loading