From 2848c15bdbf672656796fa89e4a9ccc2ffcf80d6 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 3 Apr 2024 13:51:08 -0500 Subject: [PATCH 1/3] Add support for dynamic bucket in S3 sink Signed-off-by: Taylor Gray --- .../dataprepper/model/event/Event.java | 3 +- .../dataprepper/model/event/JacksonEvent.java | 14 +- .../model/event/JacksonEventTest.java | 19 +- .../kafka/producer/KafkaCustomProducer.java | 2 +- .../mutateevent/AddEntryProcessor.java | 2 +- .../sink/opensearch/OpenSearchSink.java | 12 +- .../sink/opensearch/OpenSearchSinkTest.java | 8 +- .../dataprepper/plugins/sink/s3/S3SinkIT.java | 4 +- .../plugins/codec/parquet/S3OutputStream.java | 51 ++++-- .../plugins/sink/s3/KeyGenerator.java | 1 + .../dataprepper/plugins/sink/s3/S3Sink.java | 5 + .../plugins/sink/s3/S3SinkConfig.java | 11 ++ .../sink/s3/accumulator/BufferFactory.java | 2 +- .../sink/s3/accumulator/BufferUtilities.java | 42 +++++ .../s3/accumulator/CodecBufferFactory.java | 7 +- .../accumulator/CompressionBufferFactory.java | 7 +- .../sink/s3/accumulator/InMemoryBuffer.java | 14 +- .../s3/accumulator/InMemoryBufferFactory.java | 7 +- .../sink/s3/accumulator/LocalFileBuffer.java | 15 +- .../accumulator/LocalFileBufferFactory.java | 7 +- .../accumulator/MultipartBufferFactory.java | 7 +- .../sink/s3/accumulator/ObjectKey.java | 6 +- .../plugins/sink/s3/grouping/S3Group.java | 2 +- .../sink/s3/grouping/S3GroupIdentifier.java | 8 +- .../s3/grouping/S3GroupIdentifierFactory.java | 8 +- .../sink/s3/grouping/S3GroupManager.java | 5 +- .../codec/parquet/S3OutputStreamTest.java | 164 ++++++++++++++++++ .../plugins/sink/s3/S3SinkTest.java | 7 + .../s3/accumulator/BufferUtilitiesTest.java | 132 ++++++++++++++ .../CompressionBufferFactoryTest.java | 24 ++- .../InMemoryBufferFactoryTest.java | 2 +- .../s3/accumulator/InMemoryBufferTest.java | 18 +- .../LocalFileBufferFactoryTest.java | 2 +- .../s3/accumulator/LocalFileBufferTest.java | 5 +- .../sink/s3/accumulator/ObjectKeyTest.java | 9 +- .../s3/grouping/S3GroupIdentifierTest.java | 10 +- .../sink/s3/grouping/S3GroupManagerTest.java | 22 ++- .../plugins/sink/s3/grouping/S3GroupTest.java | 4 +- 38 files changed, 574 insertions(+), 94 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index 04139a1669..b5d0f9a23f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -143,11 +143,12 @@ public interface Event extends Serializable { * of a Data Prepper expression * @param format input format * @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions + * @param replacementForFailures - The String to use as a replacement for when keys in Events can't be found * @return returns a string with no formatted parts, returns null if no value is found * @throws RuntimeException if the input string is not properly formatted * @since 2.1 */ - String formatString(String format, ExpressionEvaluator expressionEvaluator); + String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures); /** * Returns event handle diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index ff60157b7e..8591334bcd 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -320,7 +320,7 @@ public String getAsJsonString(final String key) { */ @Override public String formatString(final String format) { - return formatStringInternal(format, null); + return formatStringInternal(format, null, null); } /** @@ -333,11 +333,11 @@ public String formatString(final String format) { * @throws RuntimeException if the format is incorrect or the value is not a string */ @Override - public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) { - return formatStringInternal(format, expressionEvaluator); + public String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures) { + return formatStringInternal(format, expressionEvaluator, replacementForFailures); } - private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) { + private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures) { int fromIndex = 0; String result = ""; int position = 0; @@ -361,7 +361,11 @@ private String formatStringInternal(final String format, final ExpressionEvaluat if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) { val = expressionEvaluator.evaluate(name, this); } else { - throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); + if (replacementForFailures == null) { + throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); + } + + val = replacementForFailures; } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index b51f947f0d..f3ac15b0b4 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -603,7 +603,7 @@ public void formatString_with_expression_evaluator_catches_exception_when_Event_ when(expressionEvaluator.evaluate(invalidKeyExpression, event)).thenReturn(invalidKeyExpressionResult); when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult); - assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString))); + assertThat(event.formatString(formatString, expressionEvaluator, null), is(equalTo(finalString))); } @Test @@ -630,7 +630,7 @@ public void testBuild_withFormatStringWithExpressionEvaluator() { verify(expressionEvaluator, never()).evaluate(eq("foo"), any(Event.class)); when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult); - assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString))); + assertThat(event.formatString(formatString, expressionEvaluator, null), is(equalTo(finalString))); } @ParameterizedTest @@ -662,6 +662,21 @@ public void testBuild_withFormatStringWithValueNotFound() { assertThrows(EventKeyNotFoundException.class, () -> event.formatString("test-${boo}-string")); } + @Test + public void testBuild_withFormatStringWithValueNotFound_and_replacement_failure() { + + final String replacementForMissingKeys = "REPLACED"; + final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}"; + final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); + event = JacksonEvent.builder() + .withEventType(eventType) + .withData(jsonString) + .getThis() + .build(); + final String result = event.formatString("test-${boo}-string", expressionEvaluator, replacementForMissingKeys); + assertThat(result, equalTo("test-" + replacementForMissingKeys + "-string")); + } + @Test public void testBuild_withFormatStringWithInvalidFormat() { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 56e9783d2b..baa15b68eb 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -112,7 +112,7 @@ public void produceRawData(final byte[] bytes, final String key) throws Exceptio public void produceRecords(final Record record) throws Exception { bufferedEventHandles.add(record.getData().getEventHandle()); Event event = getEvent(record); - final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator); + final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator, null); try { if (Objects.equals(serdeFormat, MessageFormat.JSON.toString())) { publishJsonMessage(record, key); diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index b501eea7e0..1d7f170199 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -54,7 +54,7 @@ public Collection> doExecute(final Collection> recor } try { - final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator); + final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator, null); final String metadataKey = entry.getMetadataKey(); Object value; if (!Objects.isNull(entry.getValueExpression())) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index e1547a925c..29726b13cf 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -391,7 +391,7 @@ public void doOutput(final Collection> records) { final SerializedJson document = getDocument(event); String indexName = configuredIndexAlias; try { - indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); + indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator, null)); } catch (final Exception e) { LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); dynamicIndexDroppedEvents.increment(); @@ -403,8 +403,8 @@ public void doOutput(final Collection> records) { String versionExpressionEvaluationResult = null; if (versionExpression != null) { try { - versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); - version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); + versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator, null); + version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator, null)); } catch (final NumberFormatException e) { final String errorMessage = String.format( "Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult @@ -433,7 +433,7 @@ public void doOutput(final Collection> records) { } } if (eventAction.contains("${")) { - eventAction = event.formatString(eventAction, expressionEvaluator); + eventAction = event.formatString(eventAction, expressionEvaluator, null); } if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) { LOG.error("Unknown action {}, skipping the event", eventAction); @@ -485,7 +485,7 @@ SerializedJson getDocument(final Event event) { docId = event.get(documentIdField, String.class); } else if (Objects.nonNull(documentId)) { try { - docId = event.formatString(documentId, expressionEvaluator); + docId = event.formatString(documentId, expressionEvaluator, null); } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e); } @@ -496,7 +496,7 @@ SerializedJson getDocument(final Event event) { routingValue = event.get(routingField, String.class); } else if (routing != null) { try { - routingValue = event.formatString(routing, expressionEvaluator); + routingValue = event.formatString(routing, expressionEvaluator, null); } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { LOG.error("Unable to construct routing with format {}, the routing will be generated by OpenSearch", routing, e); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 9b51954b62..205dc41dc8 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -213,8 +213,8 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); - when(event.formatString(versionExpression, expressionEvaluator)).thenReturn("not_a_number"); - when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); + when(event.formatString(versionExpression, expressionEvaluator, null)).thenReturn("not_a_number"); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator, null)).thenReturn(index); final Record eventRecord = new Record<>(event); final OpenSearchSink objectUnderTest = createObjectUnderTest(); @@ -301,8 +301,8 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); - when(event.formatString(versionExpression, expressionEvaluator)).thenThrow(RuntimeException.class); - when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); + when(event.formatString(versionExpression, expressionEvaluator, null)).thenThrow(RuntimeException.class); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator, null)).thenReturn(index); final Record eventRecord = new Record<>(event); final OpenSearchSink objectUnderTest = createObjectUnderTest(); diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java index bba91c55d4..e219ba2208 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkIT.java @@ -89,7 +89,7 @@ public class S3SinkIT { @Mock private PluginSetting pluginSetting; - @Mock + @Mock(stubOnly = true) private S3SinkConfig s3SinkConfig; @Mock private PluginFactory pluginFactory; @@ -166,6 +166,8 @@ void setUp() { .build(); when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true); + + when(s3SinkConfig.getDefaultBucket()).thenReturn(null); } private S3Sink createObjectUnderTest() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java index 0f95f7e10c..9f7fd79112 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java @@ -15,6 +15,8 @@ import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; @@ -27,6 +29,8 @@ public class S3OutputStream extends PositionOutputStream { private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class); + static final String ACCESS_DENIED = "Access Denied"; + /** * Default chunk size is 10MB */ @@ -42,6 +46,8 @@ public class S3OutputStream extends PositionOutputStream { */ private final String key; + private String targetBucket; + /** * The temporary buffer used for storing the chunks */ @@ -65,6 +71,11 @@ public class S3OutputStream extends PositionOutputStream { */ private boolean open; + /** + * The default bucket to send to when upload fails with dynamic bucket + */ + private String defaultBucket; + /** * Creates a new S3 OutputStream * @@ -72,14 +83,19 @@ public class S3OutputStream extends PositionOutputStream { * @param bucketSupplier name of the bucket * @param keySupplier path within the bucket */ - public S3OutputStream(final S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { + public S3OutputStream(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { this.s3Client = s3Client; this.bucket = bucketSupplier.get(); + this.targetBucket = bucketSupplier.get(); this.key = keySupplier.get(); buf = new byte[BUFFER_SIZE]; position = 0; etags = new ArrayList<>(); open = true; + this.defaultBucket = defaultBucket; } @Override @@ -157,7 +173,7 @@ public void close() { .parts(completedParts) .build(); CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() - .bucket(bucket) + .bucket(targetBucket) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) @@ -184,21 +200,27 @@ private void flushBufferAndRewind() { private void possiblyStartMultipartUpload() { if (uploadId == null) { - CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() - .bucket(bucket) - .key(key) - .build(); - CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest); - uploadId = multipartUpload.uploadId(); - LOG.debug("Created multipart upload {} bucket='{}',key='{}'.", uploadId, bucket, key); + try { + createMultipartUpload(); + } catch (final S3Exception e) { + if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) { + targetBucket = defaultBucket; + LOG.warn("Bucket {} could not be accessed to create multi-part upload, attempting to create multi-part upload to default_bucket {}", bucket, defaultBucket); + createMultipartUpload(); + } else { + throw e; + } + } + + LOG.debug("Created multipart upload {} bucket='{}',key='{}'.", uploadId, targetBucket, key); } } private void uploadPart() { int partNumber = etags.size() + 1; UploadPartRequest uploadRequest = UploadPartRequest.builder() - .bucket(bucket) + .bucket(targetBucket) .key(key) .uploadId(uploadId) .partNumber(partNumber) @@ -217,5 +239,14 @@ private void uploadPart() { public long getPos() throws IOException { return position + (long) etags.size() * (long) BUFFER_SIZE; } + + private void createMultipartUpload() { + CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() + .bucket(targetBucket) + .key(key) + .build(); + CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest); + uploadId = multipartUpload.uploadId(); + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java index fe1f90793f..7a86687684 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; + public class KeyGenerator { private final S3SinkConfig s3SinkConfig; private final ExtensionProvider extensionProvider; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 38db7cba7e..18fee25c93 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -102,6 +102,11 @@ public S3Sink(final PluginSetting pluginSetting, throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression"); } + if (s3SinkConfig.getBucketName() != null && + !expressionEvaluator.isValidFormatExpression(s3SinkConfig.getBucketName())) { + throw new InvalidPluginConfigurationException("bucket name is not a valid format expression"); + } + S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); testCodec.validateAgainstCodecContext(s3OutputCodecContext); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index d20fa830bb..eb9372bcf5 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -37,6 +37,15 @@ public class S3SinkConfig { @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucketName; + /** + * The default bucket to send to if using a dynamic bucket name and failures occur + * for any reason when sending to a dynamic bucket + */ + @JsonProperty("default_bucket") + @Size(min = 3, max = 500, message = "default_bucket length should be at least 3 characters") + private String defaultBucket; + + @JsonProperty("object_key") @Valid private ObjectKeyOptions objectKeyOptions = new ObjectKeyOptions(); @@ -143,4 +152,6 @@ public int getMaxUploadRetries() { public CompressionOption getCompression() { return compression; } + + public String getDefaultBucket() { return defaultBucket; } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java index 17d8275f3d..ae8e503c08 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferFactory.java @@ -10,5 +10,5 @@ import java.util.function.Supplier; public interface BufferFactory { - Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier); + Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier, String defaultBucket); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java new file mode 100644 index 0000000000..b39642f16c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class BufferUtilities { + + private static final Logger LOG = LoggerFactory.getLogger(BufferUtilities.class); + + static final String ACCESS_DENIED = "Access Denied"; + + static void putObjectOrSendToDefaultBucket(final S3Client s3Client, + final RequestBody requestBody, + final String objectKey, + final String targetBucket, + final String defaultBucket) { + try { + s3Client.putObject( + PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(), + requestBody); + } catch (final S3Exception e) { + if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) { + LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket); + s3Client.putObject( + PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(), + requestBody); + } else { + throw e; + } + } + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java index ee19d927e3..fb8c51fa86 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CodecBufferFactory.java @@ -15,8 +15,11 @@ public CodecBufferFactory(BufferFactory innerBufferFactory, BufferedCodec codec) } @Override - public Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { - Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier); + public Buffer getBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { + Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); return new CodecBuffer(innerBuffer, bufferedCodec); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java index 81bef2c100..fe7eb55f3d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -24,8 +24,11 @@ public CompressionBufferFactory(final BufferFactory innerBufferFactory, final Co } @Override - public Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { - final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier); + public Buffer getBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { + final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); if(compressionInternal) return internalBuffer; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index 1020dd8ac5..3adef41731 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -8,7 +8,7 @@ import org.apache.commons.lang3.time.StopWatch; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; + import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.time.Duration; @@ -31,7 +31,12 @@ public class InMemoryBuffer implements Buffer { private String bucket; private String key; - InMemoryBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { + private String defaultBucket; + + InMemoryBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { this.s3Client = s3Client; this.bucketSupplier = bucketSupplier; this.keySupplier = keySupplier; @@ -40,6 +45,7 @@ public class InMemoryBuffer implements Buffer { watch = new StopWatch(); watch.start(); isCodecStarted = false; + this.defaultBucket = defaultBucket; } @Override @@ -62,9 +68,7 @@ public Duration getDuration() { @Override public void flushToS3() { final byte[] byteArray = byteArrayOutputStream.toByteArray(); - s3Client.putObject( - PutObjectRequest.builder().bucket(getBucket()).key(getKey()).build(), - RequestBody.fromBytes(byteArray)); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, RequestBody.fromBytes(byteArray), getKey(), getBucket(), defaultBucket); } private String getBucket() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java index a631972b93..bc15664289 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactory.java @@ -11,7 +11,10 @@ public class InMemoryBufferFactory implements BufferFactory { @Override - public Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { - return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + public Buffer getBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { + return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index aaff4c36ca..570946b008 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; + import java.io.BufferedOutputStream; import java.io.File; import java.io.FileNotFoundException; @@ -40,8 +40,14 @@ public class LocalFileBuffer implements Buffer { private String bucket; private String key; + private String defaultBucket; + - LocalFileBuffer(File tempFile, S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) throws FileNotFoundException { + LocalFileBuffer(final File tempFile, + final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) throws FileNotFoundException { localFile = tempFile; outputStream = new BufferedOutputStream(new FileOutputStream(tempFile), 32 * 1024); this.s3Client = s3Client; @@ -51,6 +57,7 @@ public class LocalFileBuffer implements Buffer { watch = new StopWatch(); watch.start(); isCodecStarted = false; + this.defaultBucket = defaultBucket; } @Override @@ -79,9 +86,7 @@ public Duration getDuration(){ @Override public void flushToS3() { flushAndCloseStream(); - s3Client.putObject( - PutObjectRequest.builder().bucket(getBucket()).key(getKey()).build(), - RequestBody.fromFile(localFile)); + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, RequestBody.fromFile(localFile), getKey(), getBucket(), defaultBucket); removeTemporaryFile(); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java index d3c068923f..68cc65b087 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactory.java @@ -20,12 +20,15 @@ public class LocalFileBufferFactory implements BufferFactory { public static final String SUFFIX = ".log"; @Override - public Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { + public Buffer getBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { File tempFile = null; Buffer localfileBuffer = null; try { tempFile = File.createTempFile(PREFIX, SUFFIX); - localfileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier); + localfileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket); } catch (IOException e) { LOG.error("Unable to create temp file ", e); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java index 6bff4331bb..2f801060aa 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/MultipartBufferFactory.java @@ -12,7 +12,10 @@ public class MultipartBufferFactory implements BufferFactory { @Override - public Buffer getBuffer(S3Client s3Client, Supplier bucketSupplier, Supplier keySupplier) { - return new MultipartBuffer(new S3OutputStream(s3Client, bucketSupplier, keySupplier)); + public Buffer getBuffer(final S3Client s3Client, + final Supplier bucketSupplier, + final Supplier keySupplier, + final String defaultBucket) { + return new MultipartBuffer(new S3OutputStream(s3Client, bucketSupplier, keySupplier, defaultBucket)); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java index c83ee192ec..65ac8edcef 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKey.java @@ -25,6 +25,8 @@ public class ObjectKey { private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\%\\{.*?\\}"; private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(TIME_PATTERN_REGULAR_EXPRESSION); + static final String REPLACEMENT_FOR_NON_EXISTENT_KEYS = ""; + private ObjectKey(){} /** @@ -47,7 +49,7 @@ private static String buildingPathPrefixInternal(final S3SinkConfig s3SinkConfig final Event event, final ExpressionEvaluator expressionEvaluator) { String pathPrefix = s3SinkConfig.getObjectKeyOptions().getPathPrefix(); - String pathPrefixExpressionResult = expressionEvaluator != null ? event.formatString(pathPrefix, expressionEvaluator) : pathPrefix; + String pathPrefixExpressionResult = expressionEvaluator != null ? event.formatString(pathPrefix, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS) : pathPrefix; StringBuilder s3ObjectPath = new StringBuilder(); if (pathPrefixExpressionResult != null && !pathPrefixExpressionResult.isEmpty()) { String[] pathPrefixList = pathPrefixExpressionResult.split("\\/"); @@ -74,7 +76,7 @@ public static String objectFileName(final S3SinkConfig s3SinkConfig, final Event event, final ExpressionEvaluator expressionEvaluator) { String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern(); - String configNamePatternExpressionResult = event.formatString(configNamePattern, expressionEvaluator); + String configNamePatternExpressionResult = event.formatString(configNamePattern, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS); int extensionIndex = configNamePatternExpressionResult.lastIndexOf('.'); if (extensionIndex > 0) { return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePatternExpressionResult.substring(0, extensionIndex)) + "." diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java index b0656441a8..c248b6579c 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java @@ -55,6 +55,6 @@ public void releaseEventHandles(final boolean result) { @Override public int compareTo(final S3Group o) { - return Long.compare(o.getBuffer().getSize(), buffer.getSize()); + return Long.compare(buffer.getSize(), o.getBuffer().getSize()); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java index 9b65348bae..170a2426dd 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java @@ -12,10 +12,14 @@ class S3GroupIdentifier { private final Map groupIdentifierHash; private final String groupIdentifierFullObjectKey; + private final String fullBucketName; + public S3GroupIdentifier(final Map groupIdentifierHash, - final String groupIdentifierFullObjectKey) { + final String groupIdentifierFullObjectKey, + final String fullBucketName) { this.groupIdentifierHash = groupIdentifierHash; this.groupIdentifierFullObjectKey = groupIdentifierFullObjectKey; + this.fullBucketName = fullBucketName; } @Override @@ -34,4 +38,6 @@ public int hashCode() { public String getGroupIdentifierFullObjectKey() { return groupIdentifierFullObjectKey; } public Map getGroupIdentifierHash() { return groupIdentifierHash; } + + public String getFullBucketName() { return fullBucketName; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java index 0387f6f683..5ec264616a 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java @@ -26,6 +26,8 @@ public class S3GroupIdentifierFactory { private final S3SinkConfig s3SinkConfig; + private static final String BUCKET_NAME_REPLACEMENT_FOR_NON_EXISTING_KEYS = ""; + public S3GroupIdentifierFactory(final KeyGenerator keyGenerator, final ExpressionEvaluator expressionEvaluator, final S3SinkConfig s3SinkConfig) { @@ -35,15 +37,19 @@ public S3GroupIdentifierFactory(final KeyGenerator keyGenerator, dynamicExpressions = expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); + dynamicExpressions.addAll(expressionEvaluator.extractDynamicExpressionsFromFormatExpression(s3SinkConfig.getBucketName())); dynamicEventsKeys = expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getPathPrefix()); dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getObjectKeyOptions().getNamePattern())); + dynamicEventsKeys.addAll(expressionEvaluator.extractDynamicKeysFromFormatExpression(s3SinkConfig.getBucketName())); } public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { final String fullObjectKey = keyGenerator.generateKeyForEvent(event); + final String fullBucketName = event.formatString(s3SinkConfig.getBucketName(), expressionEvaluator, BUCKET_NAME_REPLACEMENT_FOR_NON_EXISTING_KEYS); + final Map groupIdentificationHash = new HashMap<>(); for (final String key : dynamicEventsKeys) { @@ -56,6 +62,6 @@ public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { groupIdentificationHash.put(expression, value); } - return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey); + return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, fullBucketName); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java index 63d6dde8eb..a7af39ed9e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -17,6 +17,7 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; @@ -63,7 +64,7 @@ public Collection getS3GroupEntries() { } public Collection getS3GroupsSortedBySize() { - return allGroups.values().stream().sorted().collect(Collectors.toList()); + return allGroups.values().stream().sorted(Collections.reverseOrder()).collect(Collectors.toList()); } public S3Group getOrCreateGroupForEvent(final Event event) { @@ -73,7 +74,7 @@ public S3Group getOrCreateGroupForEvent(final Event event) { if (allGroups.containsKey(s3GroupIdentifier)) { return allGroups.get(s3GroupIdentifier); } else { - final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey); + final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket()); final OutputCodec outputCodec = codecFactory.provideCodec(); final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec); allGroups.put(s3GroupIdentifier, s3Group); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java new file mode 100644 index 0000000000..7050347761 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStreamTest.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.parquet; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +import java.util.UUID; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class S3OutputStreamTest { + + @Mock + private S3Client s3Client; + + private String bucket; + + private String defaultBucket; + + private String objectKey; + + @BeforeEach + void setup() { + bucket = UUID.randomUUID().toString(); + defaultBucket = UUID.randomUUID().toString(); + objectKey = UUID.randomUUID().toString(); + } + + private S3OutputStream createObjectUnderTest() { + return new S3OutputStream(s3Client, () -> bucket, () -> objectKey, defaultBucket); + } + + @Test + void close_creates_and_completes_multi_part_upload() { + + final byte[] bytes = new byte[25]; + final String uploadId = UUID.randomUUID().toString(); + final CreateMultipartUploadResponse createMultipartUploadResponse = mock(CreateMultipartUploadResponse.class); + when(createMultipartUploadResponse.uploadId()).thenReturn(uploadId); + when(s3Client.createMultipartUpload(any(CreateMultipartUploadRequest.class))).thenReturn(createMultipartUploadResponse); + + final UploadPartResponse uploadPartResponse = mock(UploadPartResponse.class); + when(uploadPartResponse.eTag()).thenReturn(UUID.randomUUID().toString()); + when(s3Client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(uploadPartResponse); + + when(s3Client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(mock(CompleteMultipartUploadResponse.class)); + + + final S3OutputStream s3OutputStream = createObjectUnderTest(); + + s3OutputStream.write(bytes); + + s3OutputStream.close(); + + final ArgumentCaptor createMultipartUploadRequestArgumentCaptor = ArgumentCaptor.forClass(CreateMultipartUploadRequest.class); + verify(s3Client).createMultipartUpload(createMultipartUploadRequestArgumentCaptor.capture()); + + final CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestArgumentCaptor.getValue(); + assertThat(createMultipartUploadRequest, notNullValue()); + assertThat(createMultipartUploadRequest.bucket(), equalTo(bucket)); + assertThat(createMultipartUploadRequest.key(), equalTo(objectKey)); + + final ArgumentCaptor uploadPartRequestArgumentCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); + verify(s3Client).uploadPart(uploadPartRequestArgumentCaptor.capture(), any(RequestBody.class)); + + final UploadPartRequest uploadPartRequest = uploadPartRequestArgumentCaptor.getValue(); + assertThat(uploadPartRequest, notNullValue()); + assertThat(uploadPartRequest.bucket(), equalTo(bucket)); + assertThat(uploadPartRequest.uploadId(), equalTo(uploadId)); + assertThat(uploadPartRequest.key(), equalTo(objectKey)); + + final ArgumentCaptor completeMultipartUploadRequestArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + verify(s3Client).completeMultipartUpload(completeMultipartUploadRequestArgumentCaptor.capture()); + + final CompleteMultipartUploadRequest completeMultipartUploadRequest = completeMultipartUploadRequestArgumentCaptor.getValue(); + assertThat(completeMultipartUploadRequest, notNullValue()); + assertThat(completeMultipartUploadRequest.bucket(), equalTo(bucket)); + assertThat(completeMultipartUploadRequest.key(), equalTo(objectKey)); + } + + @Test + void close_with_no_such_bucket_exception_creates_and_completes_multi_part_upload_for_default_bucket() { + + final byte[] bytes = new byte[25]; + final String uploadId = UUID.randomUUID().toString(); + final CreateMultipartUploadResponse createMultipartUploadResponse = mock(CreateMultipartUploadResponse.class); + when(createMultipartUploadResponse.uploadId()).thenReturn(uploadId); + when(s3Client.createMultipartUpload(any(CreateMultipartUploadRequest.class))) + .thenThrow(NoSuchBucketException.class) + .thenReturn(createMultipartUploadResponse); + + final UploadPartResponse uploadPartResponse = mock(UploadPartResponse.class); + when(uploadPartResponse.eTag()).thenReturn(UUID.randomUUID().toString()); + when(s3Client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(uploadPartResponse); + + when(s3Client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(mock(CompleteMultipartUploadResponse.class)); + + + final S3OutputStream s3OutputStream = createObjectUnderTest(); + + s3OutputStream.write(bytes); + + s3OutputStream.close(); + + final ArgumentCaptor createMultipartUploadRequestArgumentCaptor = ArgumentCaptor.forClass(CreateMultipartUploadRequest.class); + verify(s3Client, times(2)).createMultipartUpload(createMultipartUploadRequestArgumentCaptor.capture()); + + final List createMultipartUploadRequests = createMultipartUploadRequestArgumentCaptor.getAllValues(); + assertThat(createMultipartUploadRequests.size(), equalTo(2)); + + final CreateMultipartUploadRequest failedCreateMultiPartUploadRequest = createMultipartUploadRequests.get(0); + assertThat(failedCreateMultiPartUploadRequest, notNullValue()); + assertThat(failedCreateMultiPartUploadRequest.bucket(), equalTo(bucket)); + assertThat(failedCreateMultiPartUploadRequest.key(), equalTo(objectKey)); + + final CreateMultipartUploadRequest defaultBucketCreateMultiPartUploadRequest = createMultipartUploadRequests.get(1); + assertThat(defaultBucketCreateMultiPartUploadRequest, notNullValue()); + assertThat(defaultBucketCreateMultiPartUploadRequest.bucket(), equalTo(defaultBucket)); + assertThat(defaultBucketCreateMultiPartUploadRequest.key(), equalTo(objectKey)); + + final ArgumentCaptor uploadPartRequestArgumentCaptor = ArgumentCaptor.forClass(UploadPartRequest.class); + verify(s3Client).uploadPart(uploadPartRequestArgumentCaptor.capture(), any(RequestBody.class)); + + final UploadPartRequest uploadPartRequest = uploadPartRequestArgumentCaptor.getValue(); + assertThat(uploadPartRequest, notNullValue()); + assertThat(uploadPartRequest.bucket(), equalTo(defaultBucket)); + assertThat(uploadPartRequest.uploadId(), equalTo(uploadId)); + assertThat(uploadPartRequest.key(), equalTo(objectKey)); + + final ArgumentCaptor completeMultipartUploadRequestArgumentCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class); + verify(s3Client).completeMultipartUpload(completeMultipartUploadRequestArgumentCaptor.capture()); + + final CompleteMultipartUploadRequest completeMultipartUploadRequest = completeMultipartUploadRequestArgumentCaptor.getValue(); + assertThat(completeMultipartUploadRequest, notNullValue()); + assertThat(completeMultipartUploadRequest.bucket(), equalTo(defaultBucket)); + assertThat(completeMultipartUploadRequest.key(), equalTo(objectKey)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 2b5fd0a8b9..9b1a99046d 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -165,4 +165,11 @@ void invalid_object_name_expression_format_throws_InvalidPluginConfigurationExce assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); } + + @Test + void invalid_bucket_name_expression_format_throws_InvalidPluginConfigurationException() { + when(expressionEvaluator.isValidFormatExpression(s3SinkConfig.getBucketName())).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java new file mode 100644 index 0000000000..8265bb0ff4 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java @@ -0,0 +1,132 @@ +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.util.List; +import java.util.UUID; + + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.eq; +import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.ACCESS_DENIED; + +@ExtendWith(MockitoExtension.class) +public class BufferUtilitiesTest { + + private String defaultBucket; + private String targetBucket; + + private String objectKey; + + @Mock + private RequestBody requestBody; + + @Mock + private S3Client s3Client; + + @BeforeEach + void setup() { + targetBucket = UUID.randomUUID().toString(); + defaultBucket = UUID.randomUUID().toString(); + objectKey = UUID.randomUUID().toString(); + } + + @Test + void putObjectOrSendToDefaultBucket_with_no_exception_sends_to_target_bucket() { + + when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenReturn(mock(PutObjectResponse.class)); + + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, defaultBucket); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(s3Client, times(1)).putObject(argumentCaptor.capture(), eq(requestBody)); + + assertThat(argumentCaptor.getAllValues().size(), equalTo(1)); + + final PutObjectRequest putObjectRequest = argumentCaptor.getValue(); + assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); + assertThat(putObjectRequest.key(), equalTo(objectKey)); + + } + + @Test + void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_default_bucket_throws_exception() { + when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenThrow(NoSuchBucketException.class); + + assertThrows(NoSuchBucketException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, null)); + } + + @Test + void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_or_no_such_bucket_throws_exception() { + when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenThrow(RuntimeException.class); + + assertThrows(RuntimeException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, null)); + } + + @Test + void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_sends_to_default_bucket() { + when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))) + .thenThrow(NoSuchBucketException.class) + .thenReturn(mock(PutObjectResponse.class)); + + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, defaultBucket); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); + + assertThat(argumentCaptor.getAllValues().size(), equalTo(2)); + + final List putObjectRequestList = argumentCaptor.getAllValues(); + final PutObjectRequest putObjectRequest = putObjectRequestList.get(0); + assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); + assertThat(putObjectRequest.key(), equalTo(objectKey)); + + final PutObjectRequest defaultBucketPutObjectRequest = putObjectRequestList.get(1); + assertThat(defaultBucketPutObjectRequest.bucket(), equalTo(defaultBucket)); + assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); + } + + @Test + void putObjectOrSendToDefaultBucket_with_S3Exception_with_access_denied_sends_to_default_bucket() { + final S3Exception s3Exception = mock(S3Exception.class); + when(s3Exception.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID()); + + when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))) + .thenThrow(s3Exception) + .thenReturn(mock(PutObjectResponse.class)); + + BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, defaultBucket); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); + + assertThat(argumentCaptor.getAllValues().size(), equalTo(2)); + + final List putObjectRequestList = argumentCaptor.getAllValues(); + final PutObjectRequest putObjectRequest = putObjectRequestList.get(0); + assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); + assertThat(putObjectRequest.key(), equalTo(objectKey)); + + final PutObjectRequest defaultBucketPutObjectRequest = putObjectRequestList.get(1); + assertThat(defaultBucketPutObjectRequest.bucket(), equalTo(defaultBucket)); + assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java index 00bd76e239..90a365c846 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import software.amazon.awssdk.services.s3.S3Client; +import java.util.UUID; import java.util.function.Supplier; import static org.hamcrest.CoreMatchers.equalTo; @@ -47,6 +48,13 @@ class CompressionBufferFactoryTest { @Mock private OutputCodec codec; + private String defaultBucket; + + @BeforeEach + void setup() { + defaultBucket = UUID.randomUUID().toString(); + } + private CompressionBufferFactory createObjectUnderTest() { return new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec); } @@ -79,21 +87,21 @@ class WithBuffer { @BeforeEach void setUp() { - when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier)).thenReturn(innerBuffer); + when(innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket)).thenReturn(innerBuffer); } @Test void getBuffer_returns_CompressionBuffer() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); assertThat(buffer, instanceOf(CompressionBuffer.class)); } @Test void getBuffer_returns_new_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier); + final Buffer firstBuffer = objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); - assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier), not(equalTo(firstBuffer))); + assertThat(objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket), not(equalTo(firstBuffer))); } @Nested @@ -105,17 +113,17 @@ void setUp() { @Test void getBuffer_returns_innerBuffer_directly() { - final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier); + final Buffer buffer = createObjectUnderTest().getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); assertThat(buffer, sameInstance(innerBuffer)); } @Test void getBuffer_calls_on_each_call() { final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier); - objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); + objectUnderTest.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); - verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier); + verify(innerBufferFactory, times(2)).getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket); } } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java index 0196657f3b..a16ad57c60 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferFactoryTest.java @@ -22,7 +22,7 @@ void test_inMemoryBufferFactory_notNull(){ void test_buffer_notNull(){ InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); Assertions.assertNotNull(inMemoryBufferFactory); - Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null); + Buffer buffer = inMemoryBufferFactory.getBuffer(null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(Buffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java index 168c71beff..27c470c5e3 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBufferTest.java @@ -51,7 +51,7 @@ class InMemoryBufferTest { @Test void test_with_write_event_into_buffer() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -79,7 +79,7 @@ void test_with_write_event_into_buffer() throws IOException { */ void getDuration_provides_duration_within_expected_range() throws IOException, InterruptedException { Instant startTime = Instant.now(); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); Instant endTime = Instant.now(); @@ -103,7 +103,7 @@ void getDuration_provides_duration_within_expected_range() throws IOException, I @Test void test_with_write_event_into_buffer_and_flush_toS3() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -118,7 +118,7 @@ void test_with_write_event_into_buffer_and_flush_toS3() throws IOException { @Test void test_uploadedToS3_success() { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); Assertions.assertNotNull(inMemoryBuffer); assertDoesNotThrow(() -> { inMemoryBuffer.flushToS3(); @@ -127,7 +127,7 @@ void test_uploadedToS3_success() { @Test void test_uploadedToS3_fails() { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); Assertions.assertNotNull(inMemoryBuffer); SdkClientException sdkClientException = mock(SdkClientException.class); when(s3Client.putObject(any(PutObjectRequest.class), any(RequestBody.class))) @@ -139,14 +139,14 @@ void test_uploadedToS3_fails() { @Test void getOutputStream_is_PositionOutputStream() { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); assertThat(inMemoryBuffer.getOutputStream(), instanceOf(PositionOutputStream.class)); } @Test void getOutputStream_getPos_equals_written_size() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -163,7 +163,7 @@ void getOutputStream_getPos_equals_written_size() throws IOException { @Test void getSize_across_multiple_in_sequence() throws IOException { - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { OutputStream outputStream = inMemoryBuffer.getOutputStream(); @@ -173,7 +173,7 @@ void getSize_across_multiple_in_sequence() throws IOException { } assertThat(inMemoryBuffer.getSize(), equalTo((long) MAX_EVENTS * 1000)); - inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier); + inMemoryBuffer = new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, null); assertThat(inMemoryBuffer.getSize(), equalTo(0L)); while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java index 834a1af710..93ec98a94b 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferFactoryTest.java @@ -21,7 +21,7 @@ void test_localFileBufferFactory_notNull() { void test_buffer_notNull() { LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); Assertions.assertNotNull(localFileBufferFactory); - Buffer buffer = localFileBufferFactory.getBuffer(null, null, null); + Buffer buffer = localFileBufferFactory.getBuffer(null, null, null, null); Assertions.assertNotNull(buffer); assertThat(buffer, instanceOf(LocalFileBuffer.class)); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java index b4f69d6196..bcc608e1ec 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBufferTest.java @@ -47,10 +47,13 @@ class LocalFileBufferTest { private LocalFileBuffer localFileBuffer; private File tempFile; + private String defaultBucket; + @BeforeEach void setUp() throws IOException { + defaultBucket = UUID.randomUUID().toString(); tempFile = File.createTempFile(PREFIX, SUFFIX); - localFileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier); + localFileBuffer = new LocalFileBuffer(tempFile, s3Client, bucketSupplier, keySupplier, defaultBucket); } @Test diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java index 3377b2b516..710372465d 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/ObjectKeyTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey.REPLACEMENT_FOR_NON_EXISTENT_KEYS; @ExtendWith(MockitoExtension.class) class ObjectKeyTest { @@ -44,7 +45,7 @@ void test_buildingPathPrefix() { final String pathPrefix = "events/%{yyyy}/%{MM}/%{dd}/"; when(objectKeyOptions.getPathPrefix()).thenReturn(pathPrefix); - when(event.formatString(pathPrefix, expressionEvaluator)).thenReturn(pathPrefix); + when(event.formatString(pathPrefix, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS)).thenReturn(pathPrefix); String pathPrefixResult = ObjectKey.buildingPathPrefix(s3SinkConfig, event, expressionEvaluator); Assertions.assertNotNull(pathPrefixResult); assertThat(pathPrefixResult, startsWith("events")); @@ -55,7 +56,7 @@ void test_objectFileName() { final String namePattern = "my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"; when(objectKeyOptions.getNamePattern()).thenReturn(namePattern); - when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS)).thenReturn(namePattern); String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); assertThat(objectFileName, startsWith("my-elb")); @@ -67,7 +68,7 @@ void test_objectFileName_with_fileExtension() { when(s3SinkConfig.getObjectKeyOptions().getNamePattern()) .thenReturn(namePattern); - when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS)).thenReturn(namePattern); String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); Assertions.assertTrue(objectFileName.contains(".pdf")); @@ -79,7 +80,7 @@ void test_objectFileName_default_fileExtension() { when(s3SinkConfig.getObjectKeyOptions().getNamePattern()) .thenReturn(namePattern); - when(event.formatString(namePattern, expressionEvaluator)).thenReturn(namePattern); + when(event.formatString(namePattern, expressionEvaluator, REPLACEMENT_FOR_NON_EXISTENT_KEYS)).thenReturn(namePattern); String objectFileName = ObjectKey.objectFileName(s3SinkConfig, null, event, expressionEvaluator); Assertions.assertNotNull(objectFileName); Assertions.assertTrue(objectFileName.contains(".json")); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java index e8388a3b41..b9d22b8c12 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierTest.java @@ -22,9 +22,10 @@ void S3GroupIdentifier_with_the_same_identificationHash_and_different_fullObject final Map identificationHash = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); final String groupOneFullObjectKey = UUID.randomUUID().toString(); final String groupTwoFullObjectKey = UUID.randomUUID().toString(); + final String fullBucketName = UUID.randomUUID().toString(); - final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupOneFullObjectKey); - final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupTwoFullObjectKey); + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupOneFullObjectKey, fullBucketName); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHash, groupTwoFullObjectKey, fullBucketName); assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(true)); assertThat(s3GroupIdentifier.hashCode(), equalTo(seconds3GroupIdentifier.hashCode())); @@ -36,9 +37,10 @@ void S3GroupIdentifier_with_different_identificationHash_is_not_considered_equal final Map identificationHashTwo = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); final String groupOneFullObjectKey = UUID.randomUUID().toString(); final String groupTwoFullObjectKey = UUID.randomUUID().toString(); + final String fullBucketName = UUID.randomUUID().toString(); - final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHashOne, groupOneFullObjectKey); - final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHashTwo, groupTwoFullObjectKey); + final S3GroupIdentifier s3GroupIdentifier = new S3GroupIdentifier(identificationHashOne, groupOneFullObjectKey, fullBucketName); + final S3GroupIdentifier seconds3GroupIdentifier = new S3GroupIdentifier(identificationHashTwo, groupTwoFullObjectKey, fullBucketName); assertThat(s3GroupIdentifier.equals(seconds3GroupIdentifier), equalTo(false)); assertNotEquals(s3GroupIdentifier.hashCode(), seconds3GroupIdentifier.hashCode()); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java index ba9dd2af1d..1cfcb41e21 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -18,6 +18,7 @@ import software.amazon.awssdk.services.s3.S3Client; import java.util.Collection; +import java.util.UUID; import java.util.function.Supplier; import static org.hamcrest.MatcherAssert.assertThat; @@ -65,9 +66,11 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { final Event event = mock(Event.class); final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); + final String defaultBucket = UUID.randomUUID().toString(); + when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); final Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) .thenReturn(buffer); final OutputCodec outputCodec = mock(OutputCodec.class); when(codecFactory.provideCodec()).thenReturn(outputCodec); @@ -95,9 +98,12 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); + final String defaultBucket = UUID.randomUUID().toString(); + when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); + final Buffer buffer = mock(Buffer.class); final OutputCodec outputCodec = mock(OutputCodec.class); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) .thenReturn(buffer); when(codecFactory.provideCodec()).thenReturn(outputCodec); @@ -118,7 +124,7 @@ void getOrCreateGroupForEvent_returns_expected_group_when_it_exists() { assertThat(secondResult.getS3GroupIdentifier(), equalTo(s3GroupIdentifier)); assertThat(secondResult.getBuffer(), equalTo(buffer)); - verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class)); + verify(bufferFactory, times(1)).getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket)); final Collection groups = objectUnderTest.getS3GroupEntries(); assertThat(groups, notNullValue()); @@ -134,6 +140,9 @@ void recalculateAndGetGroupSize_returns_expected_size() { long bufferSizeBase = 100; long bufferSizeTotal = 100 + 200 + 300; + final String defaultBucket = UUID.randomUUID().toString(); + when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); + final Event event = mock(Event.class); final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); @@ -155,7 +164,7 @@ void recalculateAndGetGroupSize_returns_expected_size() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec outputCodec = mock(OutputCodec.class); @@ -177,6 +186,9 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { long bufferSizeBase = 100; + final String defaultBucket = UUID.randomUUID().toString(); + when(s3SinkConfig.getDefaultBucket()).thenReturn(defaultBucket); + final Event event = mock(Event.class); final S3GroupIdentifier s3GroupIdentifier = mock(S3GroupIdentifier.class); when(s3GroupIdentifierFactory.getS3GroupIdentifierForEvent(event)).thenReturn(s3GroupIdentifier); @@ -198,7 +210,7 @@ void getGroupsOrderedBySize_returns_groups_in_expected_order() { final Buffer thirdBuffer = mock(Buffer.class); when(thirdBuffer.getSize()).thenReturn(bufferSizeBase * 3); - when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class))) + when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) .thenReturn(buffer).thenReturn(secondBuffer).thenReturn(thirdBuffer); final OutputCodec firstOutputCodec = mock(OutputCodec.class); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java index da0cf899b0..96535888cb 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupTest.java @@ -53,8 +53,8 @@ void comparingS3GroupsReturns_expected_result_based_on_buffer_size() { final S3Group largeGroup = new S3Group(mock(S3GroupIdentifier.class), largeBuffer, mock(OutputCodec.class)); final S3Group anotherLargeGroup = new S3Group(mock(S3GroupIdentifier.class), equalBuffer, mock(OutputCodec.class)); - assertThat(smallGroup.compareTo(largeGroup), equalTo(1)); - assertThat(largeGroup.compareTo(smallGroup), equalTo(-1)); + assertThat(smallGroup.compareTo(largeGroup), equalTo(-1)); + assertThat(largeGroup.compareTo(smallGroup), equalTo(1)); assertThat(largeGroup.compareTo(anotherLargeGroup), equalTo(0)); } } From 57c4e14e8c75683969cac02b216f715f2ef6fb39 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 11 Apr 2024 13:20:25 -0500 Subject: [PATCH 2/3] Address PR comments Signed-off-by: Taylor Gray --- .../dataprepper/model/event/Event.java | 15 +++++++++++++-- .../dataprepper/model/event/JacksonEvent.java | 16 +++++++++++----- .../model/event/JacksonEventTest.java | 12 ++++++------ .../kafka/producer/KafkaCustomProducer.java | 2 +- .../mutateevent/AddEntryProcessor.java | 2 +- .../plugins/sink/opensearch/OpenSearchSink.java | 12 ++++++------ .../sink/opensearch/OpenSearchSinkTest.java | 8 ++++---- .../plugins/codec/parquet/S3OutputStream.java | 17 +++++++---------- .../sink/s3/accumulator/BufferUtilities.java | 2 +- 9 files changed, 50 insertions(+), 36 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index b5d0f9a23f..740447ecc0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -143,12 +143,23 @@ public interface Event extends Serializable { * of a Data Prepper expression * @param format input format * @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions - * @param replacementForFailures - The String to use as a replacement for when keys in Events can't be found * @return returns a string with no formatted parts, returns null if no value is found * @throws RuntimeException if the input string is not properly formatted * @since 2.1 */ - String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures); + String formatString(final String format, final ExpressionEvaluator expressionEvaluator); + + /** + * Returns formatted parts of the input string replaced by their values in the event or the values from the result + * of a Data Prepper expression + * @param format input format + * @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions + * @param defaultValue - The String to use as a replacement for when keys in Events can't be found + * @return returns a string with no formatted parts, returns null if no value is found + * @throws RuntimeException if the input string is not properly formatted + * @since 2.1 + */ + String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue); /** * Returns event handle diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 8591334bcd..44df50d39c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -333,11 +333,17 @@ public String formatString(final String format) { * @throws RuntimeException if the format is incorrect or the value is not a string */ @Override - public String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures) { - return formatStringInternal(format, expressionEvaluator, replacementForFailures); + public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) { + return formatStringInternal(format, expressionEvaluator, null); } - private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator, final String replacementForFailures) { + @Override + public String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue) { + return formatStringInternal(format, expressionEvaluator, defaultValue); + } + + + private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue) { int fromIndex = 0; String result = ""; int position = 0; @@ -361,11 +367,11 @@ private String formatStringInternal(final String format, final ExpressionEvaluat if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) { val = expressionEvaluator.evaluate(name, this); } else { - if (replacementForFailures == null) { + if (defaultValue == null) { throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name)); } - val = replacementForFailures; + val = defaultValue; } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index f3ac15b0b4..407b877de0 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -603,7 +603,7 @@ public void formatString_with_expression_evaluator_catches_exception_when_Event_ when(expressionEvaluator.evaluate(invalidKeyExpression, event)).thenReturn(invalidKeyExpressionResult); when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult); - assertThat(event.formatString(formatString, expressionEvaluator, null), is(equalTo(finalString))); + assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString))); } @Test @@ -630,7 +630,7 @@ public void testBuild_withFormatStringWithExpressionEvaluator() { verify(expressionEvaluator, never()).evaluate(eq("foo"), any(Event.class)); when(expressionEvaluator.evaluate(expressionStatement, event)).thenReturn(expressionEvaluationResult); - assertThat(event.formatString(formatString, expressionEvaluator, null), is(equalTo(finalString))); + assertThat(event.formatString(formatString, expressionEvaluator), is(equalTo(finalString))); } @ParameterizedTest @@ -663,9 +663,9 @@ public void testBuild_withFormatStringWithValueNotFound() { } @Test - public void testBuild_withFormatStringWithValueNotFound_and_replacement_failure() { + public void testBuild_withFormatStringWithValueNotFound_and_defaultValue_for_missing_keys() { - final String replacementForMissingKeys = "REPLACED"; + final String defaultValueForMissingKey = UUID.randomUUID().toString(); final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}"; final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class); event = JacksonEvent.builder() @@ -673,8 +673,8 @@ public void testBuild_withFormatStringWithValueNotFound_and_replacement_failure( .withData(jsonString) .getThis() .build(); - final String result = event.formatString("test-${boo}-string", expressionEvaluator, replacementForMissingKeys); - assertThat(result, equalTo("test-" + replacementForMissingKeys + "-string")); + final String result = event.formatString("test-${boo}-string", expressionEvaluator, defaultValueForMissingKey); + assertThat(result, equalTo("test-" + defaultValueForMissingKey + "-string")); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index baa15b68eb..56e9783d2b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -112,7 +112,7 @@ public void produceRawData(final byte[] bytes, final String key) throws Exceptio public void produceRecords(final Record record) throws Exception { bufferedEventHandles.add(record.getData().getEventHandle()); Event event = getEvent(record); - final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator, null); + final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator); try { if (Objects.equals(serdeFormat, MessageFormat.JSON.toString())) { publishJsonMessage(record, key); diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java index 1d7f170199..b501eea7e0 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java @@ -54,7 +54,7 @@ public Collection> doExecute(final Collection> recor } try { - final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator, null); + final String key = (entry.getKey() == null) ? null : recordEvent.formatString(entry.getKey(), expressionEvaluator); final String metadataKey = entry.getMetadataKey(); Object value; if (!Objects.isNull(entry.getValueExpression())) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 29726b13cf..e1547a925c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -391,7 +391,7 @@ public void doOutput(final Collection> records) { final SerializedJson document = getDocument(event); String indexName = configuredIndexAlias; try { - indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator, null)); + indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); } catch (final Exception e) { LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); dynamicIndexDroppedEvents.increment(); @@ -403,8 +403,8 @@ public void doOutput(final Collection> records) { String versionExpressionEvaluationResult = null; if (versionExpression != null) { try { - versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator, null); - version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator, null)); + versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); + version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); } catch (final NumberFormatException e) { final String errorMessage = String.format( "Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult @@ -433,7 +433,7 @@ public void doOutput(final Collection> records) { } } if (eventAction.contains("${")) { - eventAction = event.formatString(eventAction, expressionEvaluator, null); + eventAction = event.formatString(eventAction, expressionEvaluator); } if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) { LOG.error("Unknown action {}, skipping the event", eventAction); @@ -485,7 +485,7 @@ SerializedJson getDocument(final Event event) { docId = event.get(documentIdField, String.class); } else if (Objects.nonNull(documentId)) { try { - docId = event.formatString(documentId, expressionEvaluator, null); + docId = event.formatString(documentId, expressionEvaluator); } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e); } @@ -496,7 +496,7 @@ SerializedJson getDocument(final Event event) { routingValue = event.get(routingField, String.class); } else if (routing != null) { try { - routingValue = event.formatString(routing, expressionEvaluator, null); + routingValue = event.formatString(routing, expressionEvaluator); } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { LOG.error("Unable to construct routing with format {}, the routing will be generated by OpenSearch", routing, e); } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 205dc41dc8..9b51954b62 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -213,8 +213,8 @@ void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_ final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); - when(event.formatString(versionExpression, expressionEvaluator, null)).thenReturn("not_a_number"); - when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator, null)).thenReturn(index); + when(event.formatString(versionExpression, expressionEvaluator)).thenReturn("not_a_number"); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); final Record eventRecord = new Record<>(event); final OpenSearchSink objectUnderTest = createObjectUnderTest(); @@ -301,8 +301,8 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an final EventHandle eventHandle = mock(EventHandle.class); when(event.getEventHandle()).thenReturn(eventHandle); final String index = UUID.randomUUID().toString(); - when(event.formatString(versionExpression, expressionEvaluator, null)).thenThrow(RuntimeException.class); - when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator, null)).thenReturn(index); + when(event.formatString(versionExpression, expressionEvaluator)).thenThrow(RuntimeException.class); + when(event.formatString(indexConfiguration.getIndexAlias(), expressionEvaluator)).thenReturn(index); final Record eventRecord = new Record<>(event); final OpenSearchSink objectUnderTest = createObjectUnderTest(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java index 9f7fd79112..8e2d2aa68f 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/S3OutputStream.java @@ -39,15 +39,13 @@ public class S3OutputStream extends PositionOutputStream { /** * The bucket-name on Amazon S3 */ - private final String bucket; + private String bucket; /** * The key (path) name within the bucket */ private final String key; - private String targetBucket; - /** * The temporary buffer used for storing the chunks */ @@ -74,7 +72,7 @@ public class S3OutputStream extends PositionOutputStream { /** * The default bucket to send to when upload fails with dynamic bucket */ - private String defaultBucket; + private final String defaultBucket; /** * Creates a new S3 OutputStream @@ -89,7 +87,6 @@ public S3OutputStream(final S3Client s3Client, final String defaultBucket) { this.s3Client = s3Client; this.bucket = bucketSupplier.get(); - this.targetBucket = bucketSupplier.get(); this.key = keySupplier.get(); buf = new byte[BUFFER_SIZE]; position = 0; @@ -173,7 +170,7 @@ public void close() { .parts(completedParts) .build(); CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder() - .bucket(targetBucket) + .bucket(bucket) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) @@ -205,7 +202,7 @@ private void possiblyStartMultipartUpload() { createMultipartUpload(); } catch (final S3Exception e) { if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) { - targetBucket = defaultBucket; + bucket = defaultBucket; LOG.warn("Bucket {} could not be accessed to create multi-part upload, attempting to create multi-part upload to default_bucket {}", bucket, defaultBucket); createMultipartUpload(); } else { @@ -213,14 +210,14 @@ private void possiblyStartMultipartUpload() { } } - LOG.debug("Created multipart upload {} bucket='{}',key='{}'.", uploadId, targetBucket, key); + LOG.debug("Created multipart upload {} bucket='{}',key='{}'.", uploadId, bucket, key); } } private void uploadPart() { int partNumber = etags.size() + 1; UploadPartRequest uploadRequest = UploadPartRequest.builder() - .bucket(targetBucket) + .bucket(bucket) .key(key) .uploadId(uploadId) .partNumber(partNumber) @@ -242,7 +239,7 @@ public long getPos() throws IOException { private void createMultipartUpload() { CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder() - .bucket(targetBucket) + .bucket(bucket) .key(key) .build(); CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java index b39642f16c..228801f065 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java @@ -13,7 +13,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; -public class BufferUtilities { +class BufferUtilities { private static final Logger LOG = LoggerFactory.getLogger(BufferUtilities.class); From 5d636800ea66974d35f002dfc786e9a41421e442 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 11 Apr 2024 14:37:10 -0500 Subject: [PATCH 3/3] Add / modify unit tests Signed-off-by: Taylor Gray --- .../s3/accumulator/BufferUtilitiesTest.java | 63 ++++++++++--------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java index 8265bb0ff4..fd2341636d 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java @@ -3,6 +3,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -15,17 +21,17 @@ import java.util.List; import java.util.UUID; +import java.util.stream.Stream; - -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.eq; import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.ACCESS_DENIED; @ExtendWith(MockitoExtension.class) @@ -72,19 +78,26 @@ void putObjectOrSendToDefaultBucket_with_no_such_bucket_exception_and_null_defau when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenThrow(NoSuchBucketException.class); assertThrows(NoSuchBucketException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, null)); + + verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); } - @Test - void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_or_no_such_bucket_throws_exception() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void putObjectOrSendToDefaultBucket_with_S3Exception_that_is_not_access_denied_or_no_such_bucket_throws_exception(final boolean defaultBucketEnabled) { when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))).thenThrow(RuntimeException.class); - assertThrows(RuntimeException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, null)); + assertThrows(RuntimeException.class, () -> BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, + defaultBucketEnabled ? defaultBucket : null)); + + verify(s3Client, times(1)).putObject(any(PutObjectRequest.class), eq(requestBody)); } - @Test - void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_sends_to_default_bucket() { + @ParameterizedTest + @ArgumentsSource(ExceptionsProvider.class) + void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_or_access_denied_sends_to_default_bucket(final Exception exception) { when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))) - .thenThrow(NoSuchBucketException.class) + .thenThrow(exception) .thenReturn(mock(PutObjectResponse.class)); BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, defaultBucket); @@ -104,29 +117,19 @@ void putObjectOrSendToDefaultBucket_with_NoSuchBucketException_sends_to_default_ assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); } - @Test - void putObjectOrSendToDefaultBucket_with_S3Exception_with_access_denied_sends_to_default_bucket() { - final S3Exception s3Exception = mock(S3Exception.class); - when(s3Exception.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID()); - - when(s3Client.putObject(any(PutObjectRequest.class), eq(requestBody))) - .thenThrow(s3Exception) - .thenReturn(mock(PutObjectResponse.class)); - - BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, requestBody, objectKey, targetBucket, defaultBucket); + private static class ExceptionsProvider implements ArgumentsProvider { - final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - verify(s3Client, times(2)).putObject(argumentCaptor.capture(), eq(requestBody)); + @Override + public Stream provideArguments(ExtensionContext extensionContext) throws Exception { + final S3Exception s3Exception = mock(S3Exception.class); + when(s3Exception.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID()); - assertThat(argumentCaptor.getAllValues().size(), equalTo(2)); + final NoSuchBucketException noSuchBucketException = mock(NoSuchBucketException.class); - final List putObjectRequestList = argumentCaptor.getAllValues(); - final PutObjectRequest putObjectRequest = putObjectRequestList.get(0); - assertThat(putObjectRequest.bucket(), equalTo(targetBucket)); - assertThat(putObjectRequest.key(), equalTo(objectKey)); - - final PutObjectRequest defaultBucketPutObjectRequest = putObjectRequestList.get(1); - assertThat(defaultBucketPutObjectRequest.bucket(), equalTo(defaultBucket)); - assertThat(defaultBucketPutObjectRequest.key(), equalTo(objectKey)); + return Stream.of( + Arguments.arguments(noSuchBucketException), + Arguments.arguments(s3Exception) + ); + } } }