-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change s3 sink client to async client #4425
Change s3 sink client to async client #4425
Conversation
Signed-off-by: Taylor Gray <[email protected]>
c10e3d7
to
22f42dd
Compare
@@ -77,7 +77,7 @@ public S3Sink(final PluginSetting pluginSetting, | |||
final OutputCodec testCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); | |||
sinkInitialized = Boolean.FALSE; | |||
|
|||
final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); | |||
final S3AsyncClient s3Client = ClientFactory.createS3AsyncClient(s3SinkConfig, awsCredentialsSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have option to choose the type of client? (ie support both sync and async client)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make this configurable or dynamic based on the buffer_type, but I would say that it doesn't make sense to make this configurable for users
@@ -152,6 +155,7 @@ public void setUp() { | |||
s3region = System.getProperty("tests.s3sink.region"); | |||
|
|||
s3Client = S3Client.builder().region(Region.of(s3region)).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove s3Client ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3Client is used in the test to validate objects
|
||
return new S3SinkService(s3SinkConfig, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is keyGenerator used ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's used by S3GroupIdentifierFactory in this test
try { | ||
flushBufferAndRewind(); | ||
} catch (ExecutionException | InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is client handling the exception ?
multipartUploadResponseCompletableFuture.join(); | ||
|
||
runOnCompletion.accept(true); | ||
return multipartUploadResponseCompletableFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The execution is complete in line 200. Why do we want to return the future here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return null here
private static ClientOverrideConfiguration createOverrideConfiguration(final S3SinkConfig s3SinkConfig) { | ||
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(s3SinkConfig.getMaxConnectionRetries()).build(); | ||
final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(s3SinkConfig.getMaxConnectionRetries() * s3SinkConfig.getMaxUploadRetries()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these too many retries ? What is the default value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we were doing this number of retries * the max upload retries, but we were just doing it manually. So to get the same number of retries just using the client, we would multiply like this
.thenRun(() -> LOG.debug("All {} requests to S3 have completed", completableFutures.size())) | ||
.join(); | ||
} catch (final Exception e) { | ||
LOG.warn("There was an exception while waiting for all requests to complete", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there data loss if ack is not enabled ?
LOG.trace("Flush to S3 check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", | ||
s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration()); | ||
if (forceFlush || ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) { | ||
|
||
s3GroupManager.removeGroup(s3Group); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If negative ack is received, this group will be created on retry ?
} | ||
}; | ||
|
||
final Optional<CompletableFuture<?>> completableFuture = s3Group.getBuffer().flushToS3(consumeOnGroupCompletion, this::handleFailures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this optional ?
} while (!isUploadedToS3); | ||
return isUploadedToS3; | ||
private void handleFailures(final Throwable e) { | ||
LOG.error("Exception occurred while uploading records to s3 bucket: {}", e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should s3Group.releaseEventHandles(false);
called here ?
} | ||
} | ||
|
||
final boolean[] defaultBucketAttempted = new boolean[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this an array ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has to be final since it's in the callback
Signed-off-by: Taylor Gray <[email protected]>
Description
This change modifies the clients in the s3 sink to use the async client instead of the synchronous client to improve performance when writing many objects from different groups with dynamic path_prefix
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.