Skip to content

Commit

Permalink
Fixing flaky test cases (opensearch-project#12320)
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
RS146BIJAY authored and shiv0408 committed Apr 25, 2024
1 parent 3daf88c commit 07c065c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ public void testIndexCreateBlockIsRemovedWhenAnyNodesNotExceedHighWatermarkWithA
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

// Validate if index create block is removed on the cluster
// Validate if index create block is removed on the cluster. Need to refresh this periodically as well to remove
// the node from high watermark breached list.
assertBusy(() -> {
clusterInfoService.refresh();
ClusterState state1 = client().admin().cluster().prepareState().setLocal(true).get().getState();
assertFalse(state1.blocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()));
}, 30L, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,22 @@ public void testIndexCreationOverLimitForDotIndexesFail() {
assertFalse(clusterState.getMetadata().hasIndex(".test-index"));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6287")
public void testCreateIndexWithMaxClusterShardSetting() {
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
setMaxShardLimit(dataNodes, shardsPerNodeKey);
int maxAllowedShardsPerNode = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
setMaxShardLimit(maxAllowedShardsPerNode, shardsPerNodeKey);

int maxAllowedShards = dataNodes + 1;
int extraShardCount = maxAllowedShards + 1;
// Always keep
int maxAllowedShardsPerCluster = maxAllowedShardsPerNode * 1000;
int extraShardCount = 1;
// Getting total active shards in the cluster.
int currentActiveShards = client().admin().cluster().prepareHealth().get().getActiveShards();
try {
setMaxShardLimit(maxAllowedShards, SETTING_MAX_SHARDS_PER_CLUSTER_KEY);
setMaxShardLimit(maxAllowedShardsPerCluster, SETTING_MAX_SHARDS_PER_CLUSTER_KEY);
prepareCreate("test_index_with_cluster_shard_limit").setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, extraShardCount).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
).get();
} catch (final IllegalArgumentException ex) {
verifyException(Math.min(maxAllowedShards, dataNodes * dataNodes), currentActiveShards, extraShardCount, ex);
verifyException(maxAllowedShardsPerCluster, currentActiveShards, extraShardCount, ex);
} finally {
setMaxShardLimit(-1, SETTING_MAX_SHARDS_PER_CLUSTER_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.tests.index.ForceMergePolicy;
import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -152,6 +153,7 @@
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -3278,12 +3280,15 @@ public void onFailedEngine(String reason, Exception e) {
final AtomicReference<RetentionLeases> retentionLeasesHolder = new AtomicReference<>(
new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList())
);

// Just allow force merge so that regular merge does not close the shard first before any any other operation
//
InternalEngine engine = createEngine(
config(
defaultSettings,
store,
createTempDir(),
newMergePolicy(),
newForceMergePolicy(),
null,
null,
null,
Expand Down Expand Up @@ -3377,7 +3382,7 @@ public void onFailedEngine(String reason, Exception e) {
defaultSettings,
store,
createTempDir(),
newMergePolicy(),
newForceMergePolicy(),
null,
null,
null,
Expand Down Expand Up @@ -3446,7 +3451,8 @@ public void eval(MockDirectoryWrapper dir) throws IOException {
wrapper.failOn(fail);
MockLogAppender mockAppender = MockLogAppender.createForLoggers(Loggers.getLogger(Engine.class, shardId));
try {
Store store = createStore(wrapper);
// Create a store where directory is closed during unreferenced file cleanup.
Store store = createFailingDirectoryStore(wrapper);
final Engine.EventListener eventListener = new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, Exception e) {
Expand All @@ -3473,7 +3479,7 @@ public void onFailedEngine(String reason, Exception e) {
defaultSettings,
store,
createTempDir(),
newMergePolicy(),
newForceMergePolicy(),
null,
null,
null,
Expand Down Expand Up @@ -3534,6 +3540,33 @@ public void testSettings() {
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
}

/**
* Creates a merge policy which only supports force merge.
* @return returns a merge policy which only supports force merge.
*/
private MergePolicy newForceMergePolicy() {
return new ForceMergePolicy(new TieredMergePolicy());
}

/**
* Create a store where directory is closed when referenced while unreferenced file cleanup.
*
* @param directory directory used for creating the store.
* @return a store where directory is closed when referenced while unreferenced file cleanup.
*/
private Store createFailingDirectoryStore(final Directory directory) {
return new Store(shardId, INDEX_SETTINGS, directory, new DummyShardLock(shardId)) {
@Override
public Directory directory() {
if (callStackContainsAnyOf("cleanUpUnreferencedFiles")) {
throw new AlreadyClosedException("store is already closed");
}

return super.directory();
}
};
}

public void testCurrentTranslogUUIIDIsCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
Expand Down

0 comments on commit 07c065c

Please sign in to comment.