Skip to content

Commit

Permalink
Fix OpenSearchSink upsert operation (#4178)
Browse files Browse the repository at this point in the history
* Fix opensearch upsert operation in Opensearch Sink

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified to to Stringutils.equals

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Feb 23, 2024
1 parent ef5d5e4 commit 1aede50
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc

@Test
public void testBulkActionUpdateWithDocumentRootKey() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias-upd1";
final String testIndexAlias = "test-alias-update";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

Expand Down Expand Up @@ -960,9 +960,40 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr
sink.shutdown();
}

@Test
public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias-upsert-no-create2";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "key", "value")));

List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> actionMap = new HashMap<>();
actionMap.put("type", OpenSearchBulkActions.UPSERT.toString());
aList.add(actionMap);

final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, testTemplateFile);
pluginSetting.getSettings().put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
pluginSetting.getSettings().put(IndexConfiguration.ACTIONS, aList);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);

sink.output(testRecords);
List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);

assertThat(retSources.size(), equalTo(1));
Map<String, Object> source = retSources.get(0);
assertThat((String) source.get("key"), equalTo("value"));
assertThat((String) source.get(testIdField), equalTo(testId));
assertThat(getDocumentCount(testIndexAlias, "_id", testId), equalTo(Integer.valueOf(1)));
sink.shutdown();
}

@Test
public void testBulkActionUpsertWithActions() throws IOException, InterruptedException {
final String testIndexAlias = "test-alias-upd2";
final String testIndexAlias = "test-alias-upsert";
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

Expand Down Expand Up @@ -1725,6 +1756,7 @@ private void wipeAllOpenSearchIndices() throws IOException {
.filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_")))
.filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-")))
.filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_")))
.filter(Predicate.not(indexName -> indexName.startsWith(".ql")))
.filter(Predicate.not(indexName -> indexName.startsWith(".plugins-ml-config")))
.forEach(indexName -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private BulkOperation getBulkOperationForAction(final String action,
}


final UpdateOperation.Builder<Object> updateOperationBuilder = (action.toLowerCase() == OpenSearchBulkActions.UPSERT.toString()) ?
final UpdateOperation.Builder<Object> updateOperationBuilder = (StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString())) ?
new UpdateOperation.Builder<>()
.index(indexName)
.document(filteredJsonNode)
Expand Down

0 comments on commit 1aede50

Please sign in to comment.