From 5fd583f585bd12dc296b3a3263a829e5906f448d Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Tue, 20 Jun 2023 21:47:52 +0530 Subject: [PATCH] Incorporating review comments Signed-off-by: Sarthak Aggarwal --- ...xCodecIT.java => MultiCodecReindexIT.java} | 48 +++++++++++-------- .../{CodecIT.java => MultiCodecMergeIT.java} | 24 ++++++---- 2 files changed, 45 insertions(+), 27 deletions(-) rename modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/{ReindexCodecIT.java => MultiCodecReindexIT.java} (87%) rename server/src/internalClusterTest/java/org/opensearch/index/codec/{CodecIT.java => MultiCodecMergeIT.java} (89%) diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java similarity index 87% rename from modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java rename to modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java index 09fc5108133a3..87f3c68d8af76 100644 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; @@ -37,10 +38,10 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; -public class ReindexCodecIT extends ReindexTestCase { +public class MultiCodecReindexIT extends ReindexTestCase { public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { - internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().ensureAtLeastNumDataNodes(1); Map codecMap = Map.of( "best_compression", "BEST_COMPRESSION", @@ -71,6 +72,7 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.codec", "default") + .put("index.merge.policy.max_merged_segment", "1b") .build() ); ensureGreen(index); @@ -79,9 +81,17 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod // indexing with all 4 codecs for (Map.Entry codec : codecMap.entrySet()) { - indexWithDifferentCodecs(index, codec.getKey(), codec.getValue(), nbDocs); + useCodec(index, codec.getKey()); + ingestDocs(index, nbDocs); } + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + // creating destination index with destination codec createIndex( destIndex, @@ -98,8 +108,8 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod .waitForActiveShards(ActiveShardCount.ONE) .get(); - assertEquals(4 * nbDocs, bulkResponse.getCreated()); - assertEquals(4 * nbDocs, bulkResponse.getTotal()); + assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated()); + assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal()); assertEquals(0, bulkResponse.getDeleted()); assertEquals(0, bulkResponse.getNoops()); assertEquals(0, bulkResponse.getVersionConflicts()); @@ -110,6 +120,19 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); } + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { + assertAcked(client().admin().indices().prepareClose(index)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index)); + } + private void flushAndRefreshIndex(String index) { // Request is not blocked @@ -135,19 +158,7 @@ private void flushAndRefreshIndex(String index) { } } - private void indexWithDifferentCodecs(String index, String codec, String codecMode, int nbDocs) throws InterruptedException, - ExecutionException { - - assertAcked(client().admin().indices().prepareClose(index)); - - assertAcked( - client().admin() - .indices() - .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) - .get() - ); - - assertAcked(client().admin().indices().prepareOpen(index)); + private void ingestDocs(String index, int nbDocs) throws InterruptedException { indexRandom( randomBoolean(), @@ -158,7 +169,6 @@ private void indexWithDifferentCodecs(String index, String codec, String codecMo .collect(toList()) ); flushAndRefreshIndex(index); - assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); } private ArrayList getSegments(String index) { diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java b/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java similarity index 89% rename from server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java rename to server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java index 7024846822512..9d88df039f90f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java @@ -38,7 +38,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) -public class CodecIT extends OpenSearchIntegTestCase { +public class MultiCodecMergeIT extends OpenSearchIntegTestCase { public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException { @@ -72,17 +72,26 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put("index.codec", "default") + .put("index.merge.policy.max_merged_segment", "1b") .build() ); ensureGreen(index); - // ingesting and asserting segment codec mode for all four codecs for (Map.Entry codec : codecMap.entrySet()) { - assertSegmentCodec(index, codec.getKey(), codec.getValue()); + useCodec(index, codec.getKey()); + ingestDocs(index); } + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + // force merge into final codec - assertSegmentCodec(index, finalCodec, finalCodecMode); + useCodec(index, finalCodec); + flushAndRefreshIndex(index); final ForceMergeResponse forceMergeResponse = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); assertThat(forceMergeResponse.getFailedShards(), is(0)); @@ -95,8 +104,7 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, assertTrue(segments.stream().findFirst().get().attributes.containsValue(finalCodecMode)); } - private void assertSegmentCodec(String index, String codec, String codecMode) throws InterruptedException, ExecutionException { - + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { assertAcked(client().admin().indices().prepareClose(index)); assertAcked( @@ -107,11 +115,11 @@ private void assertSegmentCodec(String index, String codec, String codecMode) th ); assertAcked(client().admin().indices().prepareOpen(index)); + } + private void ingestDocs(String index) throws InterruptedException { ingest(index); flushAndRefreshIndex(index); - - assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); } private ArrayList getSegments(String index) {