Skip to content

Commit

Permalink
Add BsonType of primary key to metadata (#4506)
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored May 6, 2024
1 parent 681c587 commit 18b5a9f
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BsonHelper {
private static final String PARTITION_SPLITTER = "-";
private static final String NUMBER_TYPE = "number";
public static final String MAX_KEY = "MaxKey";
public static final String UNKNOWN_TYPE = "UNKNOWN";

// https://www.mongodb.com/docs/manual/reference/bson-type-comparison-order/
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.mongo.converter;

import com.mongodb.client.model.changestream.OperationType;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;

Expand Down Expand Up @@ -27,8 +28,9 @@ public void initializePartitions(final List<String> partitionNames) {
public Event convert(final String record,
final long eventCreationTimeMillis,
final long eventVersionNumber,
final String eventName) {
final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName);
final OperationType eventName,
final String primaryKeyBsonType) {
final Event event = super.convert(record, eventCreationTimeMillis, eventVersionNumber, eventName, primaryKeyBsonType);
final EventMetadata eventMetadata = event.getMetadata();
final String partitionKey = String.valueOf(eventMetadata.getAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE));
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PathPrefix + S3_PATH_DELIMITER + hashKeyToPartition(partitionKey));
Expand All @@ -50,7 +52,6 @@ private int hashKeyToIndex(final String key) {
return Math.abs(hashValue) % partitionSize;
} catch (final NoSuchAlgorithmException
e) {
//LOG.error("Exception hashing key to index.", e);
return -1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.model.changestream.OperationType;
import org.opensearch.dataprepper.model.document.JacksonDocument;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand Down Expand Up @@ -61,7 +62,8 @@ private String getAttributeValue(final Map<String, Object> data, final String at
public Event convert(final String record,
final long eventCreationTimeMillis,
final long eventVersionNumber,
final String eventName) {
final OperationType eventName,
final String primaryKeyBsonType) {
final Map<String, Object> data = convertToMap(record);
final Event event = JacksonDocument.builder()
.withData(data)
Expand All @@ -85,6 +87,7 @@ public Event convert(final String record,
final String partitionKey = getAttributeValue(data, MetadataKeyAttributes.DOCUMENTDB_PRIMARY_KEY_ATTRIBUTE_NAME);
eventMetadata.setAttribute(MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);
eventMetadata.setAttribute(MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
eventMetadata.setAttribute(MetadataKeyAttributes.DOCUMENTDB_ID_TYPE_METADATA_ATTRIBUTE, primaryKeyBsonType);

return event;
}
Expand All @@ -99,22 +102,23 @@ public Event convert(final String record,
*/
public Event convert(final String record,
final long eventCreationTimeMillis,
final long eventVersionNumber) {
return convert(record, eventCreationTimeMillis, eventVersionNumber, null);
final long eventVersionNumber,
final String primaryKeyBsonType) {
return convert(record, eventCreationTimeMillis, eventVersionNumber, null, primaryKeyBsonType);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
private String mapStreamEventNameToBulkAction(final OperationType streamEventName) {
if (streamEventName == null) {
return DEFAULT_ACTION;
}

// https://www.mongodb.com/docs/manual/reference/change-events/
switch (streamEventName.toLowerCase()) {
case "insert":
case "modify":
case "replace":
switch (streamEventName) {
case INSERT:
case UPDATE:
case REPLACE:
return OpenSearchBulkActions.INDEX.toString();
case "delete":
case DELETE:
return OpenSearchBulkActions.DELETE.toString();
default:
return DEFAULT_ACTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.mongodb.client.MongoDatabase;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -33,6 +34,7 @@

import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS;
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME;
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.UNKNOWN_TYPE;

public class ExportPartitionWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ExportPartitionWorker.class);
Expand Down Expand Up @@ -162,12 +164,16 @@ public void run() {
}

try {
final String record = cursor.next().toJson(JSON_WRITER_SETTINGS);
final Document document = cursor.next();
final String record = document.toJson(JSON_WRITER_SETTINGS);
final long bytes = record.getBytes().length;
bytesReceivedSummary.record(bytes);
final Optional<BsonDocument> primaryKeyDoc = Optional.ofNullable(document.toBsonDocument());
final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE);

// The version number is the export time minus some overlap to ensure new stream events still get priority
final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000;
final Event event = recordConverter.convert(record, exportStartTime, eventVersionNumber);
final Event event = recordConverter.convert(record, exportStartTime, eventVersionNumber, primaryKeyBsonType);
// event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class));
// delete _id
event.delete(DOCUMENTDB_ID_FIELD_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.JSON_WRITER_SETTINGS;
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.DOCUMENTDB_ID_FIELD_NAME;
import static org.opensearch.dataprepper.plugins.mongo.client.BsonHelper.UNKNOWN_TYPE;

public class StreamWorker {
public static final String STREAM_PREFIX = "STREAM-";
Expand Down Expand Up @@ -194,8 +195,11 @@ record = document.getFullDocument().toJson(JSON_WRITER_SETTINGS);
bytesReceivedSummary.record(bytes);

checkPointToken = document.getResumeToken().toJson(JSON_WRITER_SETTINGS);
final Optional<BsonDocument> primaryKeyDoc = Optional.ofNullable(document.getDocumentKey());
final String primaryKeyBsonType = primaryKeyDoc.map(bsonDocument -> bsonDocument.getBsonType().name()).orElse(UNKNOWN_TYPE);
// TODO fix eventVersionNumber
final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis, document.getOperationTypeString());
final Event event = recordConverter.convert(record, eventCreationTimeMillis, eventCreationTimeMillis,
document.getOperationType(), primaryKeyBsonType);
// event.put(DEFAULT_ID_MAPPING_FIELD_NAME, event.get(DOCUMENTDB_ID_FIELD_NAME, Object.class));
// delete _id
event.delete(DOCUMENTDB_ID_FIELD_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.mongo.converter;

import com.mongodb.client.model.changestream.OperationType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -58,10 +59,11 @@ void convert() {
final long exportStartTime = Instant.now().toEpochMilli();
final long eventVersionNumber = random.nextLong();
final String collection = UUID.randomUUID().toString();
final String primaryKeyType = UUID.randomUUID().toString();

final RecordConverter recordConverter = new RecordConverter(collection, ExportPartition.PARTITION_TYPE);

final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber);
final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, primaryKeyType);
assertThat(event.getMetadata(), notNullValue());

assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(id));
Expand All @@ -88,11 +90,12 @@ void convertWithEventName() {
"\"orderDate\":{\"date\":\"" + LocalDate.now() +"\"}}";
final long exportStartTime = Instant.now().toEpochMilli();
final long eventVersionNumber = random.nextLong();
final String eventName = "insert";
final OperationType eventName = OperationType.INSERT;
final String collection = UUID.randomUUID().toString();
final String primaryKeyType = UUID.randomUUID().toString();
final RecordConverter recordConverter = new RecordConverter(collection, StreamPartition.PARTITION_TYPE);

final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, eventName);
final JacksonEvent event = (JacksonEvent) recordConverter.convert(record, exportStartTime, eventVersionNumber, eventName, primaryKeyType);
assertThat(event.getMetadata(), notNullValue());

assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.mongodb.client.MongoDatabase;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.json.JsonWriterSettings;
Expand Down Expand Up @@ -121,6 +123,12 @@ public void testProcessPartitionSuccess(final String partitionKey) {
lenient().when(cursor.hasNext()).thenReturn(true, true, false);
Document doc1 = mock(Document.class);
Document doc2 = mock(Document.class);
BsonDocument bsonDoc1 = mock(BsonDocument.class);
BsonDocument bsonDoc2 = mock(BsonDocument.class);
when(doc1.toBsonDocument()).thenReturn(bsonDoc1);
when(doc2.toBsonDocument()).thenReturn(bsonDoc2);
when(bsonDoc1.getBsonType()).thenReturn(BsonType.OBJECT_ID);
when(bsonDoc2.getBsonType()).thenReturn(BsonType.STRING);
final String docJson1 = UUID.randomUUID().toString();
final String docJson2 = UUID.randomUUID() + docJson1;
when(doc1.toJson(any(JsonWriterSettings.class))).thenReturn(docJson1);
Expand All @@ -131,8 +139,8 @@ public void testProcessPartitionSuccess(final String partitionKey) {
final long eventVersionNumber = (exportStartTime - VERSION_OVERLAP_TIME_FOR_EXPORT.toMillis()) * 1_000;
Event event1 = mock((Event.class));
Event event2 = mock((Event.class));
when(mockRecordConverter.convert(docJson1, exportStartTime, eventVersionNumber)).thenReturn(event1);
when(mockRecordConverter.convert(docJson2, exportStartTime, eventVersionNumber)).thenReturn(event2);
when(mockRecordConverter.convert(docJson1, exportStartTime, eventVersionNumber, BsonType.OBJECT_ID.name())).thenReturn(event1);
when(mockRecordConverter.convert(docJson2, exportStartTime, eventVersionNumber, BsonType.STRING.name())).thenReturn(event2);
lenient().when(dataQueryPartition.getPartitionKey()).thenReturn(partitionKey);
lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE))
.thenReturn(Optional.of(dataQueryPartition));
Expand Down Expand Up @@ -163,8 +171,8 @@ public void testProcessPartitionSuccess(final String partitionKey) {
future.cancel(true);

verify(mongoClient, times(1)).close();
verify(mockRecordConverter).convert(docJson1, exportStartTime, eventVersionNumber);
verify(mockRecordConverter).convert(docJson2, exportStartTime, eventVersionNumber);
verify(mockRecordConverter).convert(docJson1, exportStartTime, eventVersionNumber, BsonType.OBJECT_ID.name());
verify(mockRecordConverter).convert(docJson2, exportStartTime, eventVersionNumber, BsonType.STRING.name());
verify(mongoDatabase).getCollection(eq("collection"));
verify(mockRecordConverter).initializePartitions(partitions);
verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any());
Expand Down
Loading

0 comments on commit 18b5a9f

Please sign in to comment.