From 18b5a9f084391b8ac3af740fe192262b14635bda Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Mon, 6 May 2024 18:19:11 -0500 Subject: [PATCH] Add BsonType of primary key to metadata (#4506) Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../plugins/mongo/client/BsonHelper.java | 1 + .../PartitionKeyRecordConverter.java | 7 +- .../mongo/converter/RecordConverter.java | 22 ++++--- .../mongo/export/ExportPartitionWorker.java | 10 ++- .../plugins/mongo/stream/StreamWorker.java | 6 +- .../mongo/converter/RecordConverterTest.java | 9 ++- .../export/ExportPartitionWorkerTest.java | 16 +++-- .../mongo/stream/StreamWorkerTest.java | 64 +++++++++++++------ 8 files changed, 93 insertions(+), 42 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java index d02fc0f0e9..cda5523c7e 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/client/BsonHelper.java @@ -39,6 +39,7 @@ public class BsonHelper { private static final String PARTITION_SPLITTER = "-"; private static final String NUMBER_TYPE = "number"; public static final String MAX_KEY = "MaxKey"; + public static final String UNKNOWN_TYPE = "UNKNOWN"; // https://www.mongodb.com/docs/manual/reference/bson-type-comparison-order/ /** diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/PartitionKeyRecordConverter.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/PartitionKeyRecordConverter.java index d845ea0094..f74a67a6c8 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/PartitionKeyRecordConverter.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/PartitionKeyRecordConverter.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.mongo.converter; +import com.mongodb.client.model.changestream.OperationType; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -27,8 +28,9 @@ public void initializePartitions(final List partitionNames) { public Event convert(final String record, final long eventCreationTimeMillis, final long eventVersionNumber, - final String eventName) { - final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName); + final OperationType eventName, + final String primaryKeyBsonType) { + final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName, primaryKeyBsonType); final EventMetadata eventMetadata = event.getMetadata(); final String partitionKey = String.valueOf(eventMetadata.getAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE)); eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PathPrefix + S3_PATH_DELIMITER + hashKeyToPartition(partitionKey)); @@ -50,7 +52,6 @@ private int hashKeyToIndex(final String key) { return Math.abs(hashValue) % partitionSize; } catch (final NoSuchAlgorithmException e) { - //LOG.error("Exception hashing key to index.", e); return -1; } } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java index defc000ddc..fd8c711e59 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverter.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.client.model.changestream.OperationType; import org.opensearch.dataprepper.model.document.JacksonDocument; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -61,7 +62,8 @@ private String getAttributeValue(final Map data, final String at public Event convert(final String record, final long eventCreationTimeMillis, final long eventVersionNumber, - final String eventName) { + final OperationType eventName, + final String primaryKeyBsonType) { final Map data = convertToMap(record); final Event event = JacksonDocument.builder() .withData(data) @@ -85,6 +87,7 @@ public Event convert(final String record, final String partitionKey = getAttributeValue(data, MetadataKeyAttributes.DOCUMENTDB_PRIMARY_KEY_ATTRIBUTE_NAME); eventMetadata.setAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey); eventMetadata.setAttribute(MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey); + eventMetadata.setAttribute(MetadataKeyAttributes.DOCUMENTDB_ID_TYPE_METADATA_ATTRIBUTE, primaryKeyBsonType); return event; } @@ -99,22 +102,23 @@ public Event convert(final String record, */ public Event convert(final String record, final long eventCreationTimeMillis, - final long eventVersionNumber) { - return convert(record, eventCreationTimeMillis, eventVersionNumber, null); + final long eventVersionNumber, + final String primaryKeyBsonType) { + return convert(record, eventCreationTimeMillis, eventVersionNumber, null, primaryKeyBsonType); } - private String mapStreamEventNameToBulkAction(final String streamEventName) { + private String mapStreamEventNameToBulkAction(final OperationType streamEventName) { if (streamEventName == null) { return DEFAULT_ACTION; } // https://www.mongodb.com/docs/manual/reference/change-events/ - switch (streamEventName.toLowerCase()) { - case "insert": - case "modify": - case "replace": + switch (streamEventName) { + case INSERT: + case UPDATE: + case REPLACE: return OpenSearchBulkActions.INDEX.toString(); - case "delete": + case DELETE: return OpenSearchBulkActions.DELETE.toString(); default: return DEFAULT_ACTION; diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java index 89b4b4cf29..ae7804b958 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorker.java @@ -11,6 +11,7 @@ import com.mongodb.client.MongoDatabase; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -33,6 +34,7 @@ import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS; import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME; +import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.UNKNOWN_TYPE; public class ExportPartitionWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExportPartitionWorker.class); @@ -162,12 +164,16 @@ public void run() { } try { - final String record = cursor.next().toJson(JSON_WRITER_SETTINGS); + final Document document = cursor.next(); + final String record = document.toJson(JSON_WRITER_SETTINGS); final long bytes = record.getBytes().length; bytesReceivedSummary.record(bytes); + final Optional primaryKeyDoc = Optional.ofNullable(document.toBsonDocument()); + final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE); + // The version number is the export time minus some overlap to ensure new stream events still get priority final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000; - final Event event = recordConverter.convert(record, exportStartTime, eventVersionNumber); + final Event event = recordConverter.convert(record, exportStartTime, eventVersionNumber, primaryKeyBsonType); // event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); // delete _id event.delete(DOCUMENTDB_ID_FIELD_NAME); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index 8e0f7a9331..05c4dcfc8a 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -37,6 +37,7 @@ import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS; import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME; +import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.UNKNOWN_TYPE; public class StreamWorker { public static final String STREAM_PREFIX = "STREAM-"; @@ -194,8 +195,11 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS); bytesReceivedSummary.record(bytes); checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS); + final Optional primaryKeyDoc = Optional.ofNullable(document.getDocumentKey()); + final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE); // TODO fix eventVersionNumber - final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis, document.getOperationTypeString()); + final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis, + document.getOperationType(), primaryKeyBsonType); // event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class)); // delete _id event.delete(DOCUMENTDB_ID_FIELD_NAME); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverterTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverterTest.java index 8d368f4c76..b3b5ce3ee1 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverterTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/converter/RecordConverterTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.mongo.converter; +import com.mongodb.client.model.changestream.OperationType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,10 +59,11 @@ void convert() { final long exportStartTime = Instant.now().toEpochMilli(); final long eventVersionNumber = random.nextLong(); final String collection = UUID.randomUUID().toString(); + final String primaryKeyType = UUID.randomUUID().toString(); final RecordConverter recordConverter = new RecordConverter(collection, ExportPartition.PARTITION_TYPE); - final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber); + final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, primaryKeyType); assertThat(event.getMetadata(), notNullValue()); assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(id)); @@ -88,11 +90,12 @@ void convertWithEventName() { "\"orderDate\":{\"date\":\"" + LocalDate.now() +"\"}}"; final long exportStartTime = Instant.now().toEpochMilli(); final long eventVersionNumber = random.nextLong(); - final String eventName = "insert"; + final OperationType eventName = OperationType.INSERT; final String collection = UUID.randomUUID().toString(); + final String primaryKeyType = UUID.randomUUID().toString(); final RecordConverter recordConverter = new RecordConverter(collection, StreamPartition.PARTITION_TYPE); - final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, eventName); + final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, eventName, primaryKeyType); assertThat(event.getMetadata(), notNullValue()); assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(id)); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java index 43c1bf6803..df5daa065e 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportPartitionWorkerTest.java @@ -7,6 +7,8 @@ import com.mongodb.client.MongoDatabase; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import org.bson.BsonDocument; +import org.bson.BsonType; import org.bson.Document; import org.bson.conversions.Bson; import org.bson.json.JsonWriterSettings; @@ -121,6 +123,12 @@ public void testProcessPartitionSuccess(final String partitionKey) { lenient().when(cursor.hasNext()).thenReturn(true, true, false); Document doc1 = mock(Document.class); Document doc2 = mock(Document.class); + BsonDocument bsonDoc1 = mock(BsonDocument.class); + BsonDocument bsonDoc2 = mock(BsonDocument.class); + when(doc1.toBsonDocument()).thenReturn(bsonDoc1); + when(doc2.toBsonDocument()).thenReturn(bsonDoc2); + when(bsonDoc1.getBsonType()).thenReturn(BsonType.OBJECT_ID); + when(bsonDoc2.getBsonType()).thenReturn(BsonType.STRING); final String docJson1 = UUID.randomUUID().toString(); final String docJson2 = UUID.randomUUID() + docJson1; when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(docJson1); @@ -131,8 +139,8 @@ public void testProcessPartitionSuccess(final String partitionKey) { final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000; Event event1 = mock((Event.class)); Event event2 = mock((Event.class)); - when(mockRecordConverter.convert(docJson1, exportStartTime, eventVersionNumber)).thenReturn(event1); - when(mockRecordConverter.convert(docJson2, exportStartTime, eventVersionNumber)).thenReturn(event2); + when(mockRecordConverter.convert(docJson1, exportStartTime, eventVersionNumber, BsonType.OBJECT_ID.name())).thenReturn(event1); + when(mockRecordConverter.convert(docJson2, exportStartTime, eventVersionNumber, BsonType.STRING.name())).thenReturn(event2); lenient().when(dataQueryPartition.getPartitionKey()).thenReturn(partitionKey); lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE)) .thenReturn(Optional.of(dataQueryPartition)); @@ -163,8 +171,8 @@ public void testProcessPartitionSuccess(final String partitionKey) { future.cancel(true); verify(mongoClient, times(1)).close(); - verify(mockRecordConverter).convert(docJson1, exportStartTime, eventVersionNumber); - verify(mockRecordConverter).convert(docJson2, exportStartTime, eventVersionNumber); + verify(mockRecordConverter).convert(docJson1, exportStartTime, eventVersionNumber, BsonType.OBJECT_ID.name()); + verify(mockRecordConverter).convert(docJson2, exportStartTime, eventVersionNumber, BsonType.STRING.name()); verify(mongoDatabase).getCollection(eq("collection")); verify(mockRecordConverter).initializePartitions(partitions); verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any()); diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java index 5e116755c8..835c49415c 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorkerTest.java @@ -13,6 +13,7 @@ import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonTimestamp; +import org.bson.BsonType; import org.bson.Document; import org.bson.json.JsonWriterSettings; import org.junit.jupiter.api.BeforeEach; @@ -129,6 +130,7 @@ void test_processStream_success() { ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); Document doc1 = mock(Document.class); + BsonDocument doc1Key = mock(BsonDocument.class); BsonDocument doc2Key = mock(BsonDocument.class); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); BsonDocument bsonDoc2 = new BsonDocument("resumeToken2", new BsonInt32(234)); @@ -141,14 +143,13 @@ void test_processStream_success() { .thenReturn(streamDoc2); final String doc1Json1 = UUID.randomUUID().toString(); final String doc1Json2 = UUID.randomUUID().toString(); + when(doc1Key.getBsonType()).thenReturn(BsonType.INT64); when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json1); + when(doc2Key.getBsonType()).thenReturn(BsonType.INT32); when(doc2Key.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json2); when(streamDoc1.getFullDocument()).thenReturn(doc1); + when(streamDoc1.getDocumentKey()).thenReturn(doc1Key); when(streamDoc2.getDocumentKey()).thenReturn(doc2Key); - final String operationType1 = UUID.randomUUID().toString(); - final String operationType2 = UUID.randomUUID().toString(); - when(streamDoc1.getOperationTypeString()).thenReturn(operationType1); - when(streamDoc2.getOperationTypeString()).thenReturn(operationType2); final BsonTimestamp bsonTimestamp1 = mock(BsonTimestamp.class); final BsonTimestamp bsonTimestamp2 = mock(BsonTimestamp.class); final int timeSecond1 = random.nextInt(); @@ -163,7 +164,7 @@ void test_processStream_success() { when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); - when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), anyString())).thenReturn(event); + when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final Future future = executorService.submit(() -> { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { @@ -178,8 +179,8 @@ void test_processStream_success() { verify(mongoDatabase).getCollection(eq("collection")); verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockRecordConverter).initializePartitions(partitions); - verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1)); - verify(mockRecordConverter).convert(eq(doc1Json2), eq(timeSecond2 * 1000L), eq(timeSecond2 * 1000L), eq(operationType2)); + verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(OperationType.INSERT), eq(BsonType.INT64.name())); + verify(mockRecordConverter).convert(eq(doc1Json2), eq(timeSecond2 * 1000L), eq(timeSecond2 * 1000L), eq(OperationType.DELETE), eq(BsonType.INT32.name())); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); verify(successItemsCounter).increment(2); verify(failureItemsCounter, never()).increment(); @@ -259,9 +260,9 @@ void test_processStream_checkPointIntervalSuccess() { when(streamDoc1.getClusterTime()).thenReturn(bsonTimestamp1); when(streamDoc2.getClusterTime()).thenReturn(bsonTimestamp2); when(streamDoc3.getClusterTime()).thenReturn(bsonTimestamp3); - when(streamDoc1.getOperationTypeString()).thenReturn(UUID.randomUUID().toString()); - when(streamDoc2.getOperationTypeString()).thenReturn(UUID.randomUUID().toString()); - when(streamDoc3.getOperationTypeString()).thenReturn(UUID.randomUUID().toString()); + when(streamDoc1.getOperationType()).thenReturn(OperationType.INSERT); + when(streamDoc2.getOperationType()).thenReturn(OperationType.INSERT); + when(streamDoc3.getOperationType()).thenReturn(OperationType.INSERT); final String resumeToken1 = UUID.randomUUID().toString(); final String resumeToken2 = UUID.randomUUID().toString(); final String resumeToken3 = UUID.randomUUID().toString(); @@ -274,7 +275,7 @@ void test_processStream_checkPointIntervalSuccess() { when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); - when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), anyString())).thenReturn(event); + when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) @@ -354,6 +355,7 @@ void test_processStream_terminateChangeStreamSuccess() { ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); ChangeStreamDocument streamDoc2 = mock(ChangeStreamDocument.class); ChangeStreamDocument streamDoc3 = mock(ChangeStreamDocument.class); + BsonDocument keyDoc1 = mock(BsonDocument.class); Document doc1 = mock(Document.class); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); @@ -365,8 +367,10 @@ void test_processStream_terminateChangeStreamSuccess() { final String doc1Json1 = UUID.randomUUID().toString(); when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(doc1Json1); when(streamDoc1.getFullDocument()).thenReturn(doc1); - final String operationType1 = UUID.randomUUID().toString(); - when(streamDoc1.getOperationTypeString()).thenReturn(operationType1); + when(streamDoc1.getDocumentKey()).thenReturn(keyDoc1); + when(keyDoc1.getBsonType()).thenReturn(BsonType.BOOLEAN); + final OperationType operationType1 = OperationType.INSERT; + when(streamDoc1.getOperationType()).thenReturn(operationType1); final BsonTimestamp bsonTimestamp1 = mock(BsonTimestamp.class); final int timeSecond1 = random.nextInt(); when(bsonTimestamp1.getTime()).thenReturn(timeSecond1); @@ -377,7 +381,7 @@ void test_processStream_terminateChangeStreamSuccess() { when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); - when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), anyString())).thenReturn(event); + when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { @@ -392,7 +396,7 @@ void test_processStream_terminateChangeStreamSuccess() { verify(mongoDatabase).getCollection(eq("collection")); verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockRecordConverter).initializePartitions(partitions); - verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1)); + verify(mockRecordConverter).convert(eq(doc1Json1), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1), eq(BsonType.BOOLEAN.name())); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); verify(successItemsCounter).increment(1); verify(failureItemsCounter, never()).increment(); @@ -401,7 +405,7 @@ void test_processStream_terminateChangeStreamSuccess() { @ParameterizedTest @MethodSource("mongoDataTypeProvider") - void test_processStream_dataTypeConversionSuccess(final String actualDocument, final String expectedDocument) { + void test_processStream_dataTypeConversionSuccess(final String actualDocument, final String keyType, final String expectedDocument) { final String collection = "database.collection"; when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); @@ -419,15 +423,18 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f when(changeStreamIterable.iterator()).thenReturn(cursor); when(cursor.hasNext()).thenReturn(true, false); ChangeStreamDocument streamDoc1 = mock(ChangeStreamDocument.class); + BsonDocument key1 = mock(BsonDocument.class); Document doc1 = Document.parse(actualDocument); BsonDocument bsonDoc1 = new BsonDocument("resumeToken1", new BsonInt32(123)); when(streamDoc1.getResumeToken()).thenReturn(bsonDoc1); when(streamDoc1.getOperationType()).thenReturn(OperationType.INSERT); when(cursor.next()) .thenReturn(streamDoc1); + when(key1.getBsonType()).thenReturn(BsonType.valueOf(keyType)); + when(streamDoc1.getDocumentKey()).thenReturn(key1); when(streamDoc1.getFullDocument()).thenReturn(doc1); - final String operationType1 = UUID.randomUUID().toString(); - when(streamDoc1.getOperationTypeString()).thenReturn(operationType1); + final OperationType operationType1 = OperationType.INSERT; + when(streamDoc1.getOperationType()).thenReturn(operationType1); final BsonTimestamp bsonTimestamp1 = mock(BsonTimestamp.class); final int timeSecond1 = random.nextInt(); when(bsonTimestamp1.getTime()).thenReturn(timeSecond1); @@ -438,7 +445,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f when(mockPartitionCheckpoint.getGlobalS3FolderCreationStatus(collection)).thenReturn(Optional.of(s3PartitionStatus)); Event event = mock(Event.class); //when(event.get("_id", Object.class)).thenReturn(UUID.randomUUID().toString()); - when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), anyString())).thenReturn(event); + when(mockRecordConverter.convert(anyString(), anyLong(), anyLong(), any(OperationType.class), anyString())).thenReturn(event); final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { @@ -453,7 +460,7 @@ void test_processStream_dataTypeConversionSuccess(final String actualDocument, f verify(mongoDatabase).getCollection(eq("collection")); verify(mockPartitionCheckpoint).getGlobalS3FolderCreationStatus(collection); verify(mockRecordConverter).initializePartitions(partitions); - verify(mockRecordConverter).convert(eq(expectedDocument), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1)); + verify(mockRecordConverter).convert(eq(expectedDocument), eq(timeSecond1 * 1000L), eq(timeSecond1 * 1000L), eq(operationType1), eq(keyType)); verify(mockRecordBufferWriter).writeToBuffer(eq(null), any()); verify(successItemsCounter).increment(1); verify(failureItemsCounter, never()).increment(); @@ -464,54 +471,71 @@ private static Stream mongoDataTypeProvider() { return Stream.of( Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"name\": \"Hello User\"}", + BsonType.BOOLEAN.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"name\": \"Hello User\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"nullField\": null}", + BsonType.ARRAY.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"nullField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"numberField\": 123}", + BsonType.DATE_TIME.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"numberField\": 123}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"doubleValue\": 3.14159}", + BsonType.TIMESTAMP.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"doubleValue\": 3.14159}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"longValue\": { \"$numberLong\": \"1234567890123456768\"}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"longValue\": 1234567890123456768}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"stringField\": \"Hello, Mongo!\"}", + BsonType.DOCUMENT.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"stringField\": \"Hello, Mongo!\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"booleanField\": true}", + BsonType.DOUBLE.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"booleanField\": true}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"dateField\": { \"$date\": \"2024-05-03T13:57:51.155Z\"}}", + BsonType.DECIMAL128.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"dateField\": 1714744671155}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"arrayField\": [\"a\",\"b\",\"c\"]}", + BsonType.UNDEFINED.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"arrayField\": [\"a\", \"b\", \"c\"]}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"objectField\": { \"nestedKey\": \"nestedValue\"}}", + BsonType.BINARY.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"objectField\": {\"nestedKey\": \"nestedValue\"}}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"binaryField\": { \"$binary\": {\"base64\": \"AQIDBA==\", \"subType\": \"00\"}}}", + BsonType.STRING.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"binaryField\": \"AQIDBA==\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"objectIdField\": { \"$oid\": \"6634ed693ac62386d57b12d0\" }}", + BsonType.ARRAY.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"objectIdField\": \"6634ed693ac62386d57b12d0\"}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"timestampField\": { \"$timestamp\": {\"t\": 1714744681, \"i\": 29}}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"timestampField\": 7364772325884952605}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"regexField\": { \"$regularExpression\": {\"pattern\": \"^ABC\", \"options\": \"i\"}}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"regexField\": {\"pattern\": \"^ABC\", \"options\": \"i\"}}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"minKeyField\": { \"$minKey\": 1}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"minKeyField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"maxKeyField\": { \"$maxKey\": 1}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"maxKeyField\": null}"), Arguments.of( "{\"_id\": { \"$oid\": \"6634ed693ac62386d57bcaf0\" }, \"bigDecimalField\": { \"$numberDecimal\": \"123456789.0123456789\"}}", + BsonType.OBJECT_ID.name(), "{\"_id\": \"6634ed693ac62386d57bcaf0\", \"bigDecimalField\": \"123456789.0123456789\"}") ); }