Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamic bucket and default bucket in S3 sink #4402

Merged
merged 3 commits into from
Apr 11, 2024

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Apr 9, 2024

Description

This change adds support for passing a dynamic bucket expression to route data to different buckets based on Events. It also supports a default_bucket

Check List

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

public class BufferUtilities {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this package private.

@@ -143,11 +143,12 @@ public interface Event extends Serializable {
* of a Data Prepper expression
* @param format input format
* @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions
* @param replacementForFailures - The String to use as a replacement for when keys in Events can't be found
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just call this defaultValue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that does make more sense I think

* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
*/
String formatString(String format, ExpressionEvaluator expressionEvaluator);
String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can override this to avoid the changes that have the explicit null.

String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures);

default String formatString(final String format, final ExpressionEvaluator expressionEvaluator) {
  return formatString(format, expressionEvaluator, null);
}

@Test
public void testBuild_withFormatStringWithValueNotFound_and_replacement_failure() {

final String replacementForMissingKeys = "REPLACED";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a random string here.

@@ -54,7 +54,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

try {
final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator);
final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line, and many others, can be removed by making the override I suggested above.

@@ -42,6 +46,8 @@ public class S3OutputStream extends PositionOutputStream {
*/
private final String key;

private String targetBucket;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this field. Just use the existing bucket field and make it non-final.

/**
* The default bucket to send to when upload fails with dynamic bucket
*/
private String defaultBucket;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this value final.

createMultipartUpload();
} catch (final S3Exception e) {
if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) {
targetBucket = defaultBucket;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createMultipartUpload method can be updated to take bucket as parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really help because we are going to use a non-final bucket name since we need the bucket in other locations in the code

@@ -46,6 +46,6 @@ public void releaseEventHandles(final boolean result) {

@Override
public int compareTo(final S3Group o) {
return Long.compare(o.getBuffer().getSize(), buffer.getSize());
return Long.compare(buffer.getSize(), o.getBuffer().getSize());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bug but we were sorting in order, now we sort in reverse order

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for remembering this!

@dinujoh , For reference: #4385 (comment)

@@ -56,7 +57,7 @@ public Collection<S3Group> getS3GroupEntries() {
}

public Collection<S3Group> getS3GroupsSortedBySize() {
return allGroups.values().stream().sorted().collect(Collectors.toList());
return allGroups.values().stream().sorted(Collections.reverseOrder()).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what the reason for reverse order ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we swapped the comparator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Test
void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_default_bucket_throws_exception() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can combine this test and the one below into a @ParameterizedTest.

}

@Test
void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_sends_to_default_bucket() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests can be combined using @ParameterizedTest

}

@Test
void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_or_no_such_bucket_throws_exception() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also want a test case that has a defaultBucket, but an exception that is not Access Denied or NoSuchBucket. That should throw and not call putObject.

Copy link
Member Author

@graytaylor0 graytaylor0 Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what this test was supposed to be but looks like I didn't set the default bucket. I will turn it into a parameterized test

void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_default_bucket_throws_exception() {
when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenThrow(NoSuchBucketException.class);

assertThrows(NoSuchBucketException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also put a verification on the number of calls to putObject. There should be only one.

@@ -46,6 +46,6 @@ public void releaseEventHandles(final boolean result) {

@Override
public int compareTo(final S3Group o) {
return Long.compare(o.getBuffer().getSize(), buffer.getSize());
return Long.compare(buffer.getSize(), o.getBuffer().getSize());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for remembering this!

@dinujoh , For reference: #4385 (comment)

@@ -56,7 +57,7 @@ public Collection<S3Group> getS3GroupEntries() {
}

public Collection<S3Group> getS3GroupsSortedBySize() {
return allGroups.values().stream().sorted().collect(Collectors.toList());
return allGroups.values().stream().sorted(Collections.reverseOrder()).collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Taylor Gray <[email protected]>
@graytaylor0 graytaylor0 force-pushed the DynamicBucketSupport branch from 72254d5 to 5d63680 Compare April 11, 2024 19:39
@graytaylor0 graytaylor0 requested a review from dlvenable April 11, 2024 20:46
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @graytaylor0 !

@graytaylor0 graytaylor0 merged commit 02aef54 into opensearch-project:main Apr 11, 2024
46 of 47 checks passed
@graytaylor0 graytaylor0 deleted the DynamicBucketSupport branch April 11, 2024 21:51
@kkondaka kkondaka added this to the v2.8 milestone May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants