Skip to content

Commit

Permalink
[Remote Store] Add remote segment upload backpressure integ tests (#8197
Browse files Browse the repository at this point in the history
)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Jun 27, 2023
1 parent 0c7ba94 commit 9d9a143
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void deleteRepo() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
Expand All @@ -88,13 +88,14 @@ protected void setup(Path repoLocation, double ioFailureRate, String skipExcepti
.put("max_failure_number", maxFailure)
);

internalCluster().startDataOnlyNodes(1);
String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
return dataNodeName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,149 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
}

public void testWritesRejectedDueToBytesLagBreach() throws Exception {
// Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
}

public void testWritesRejected() {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
}

private void validateBackpressure(
int initialDocSize,
int initialDocsToIndex,
int onFailureDocSize,
int onFailureDocsToIndex,
String breachMode
) throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata", Long.MAX_VALUE);
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
Settings request = Settings.builder()
.put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 10)
.build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");
assertEquals(clusterUpdateResponse.getPersistentSettings().get(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey()), "10");

logger.info("--> Indexing data");

String jsonString = generateString(initialDocSize);
BytesReference initialSource = new BytesArray(jsonString);
indexDocAndRefresh(initialSource, initialDocsToIndex);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(1d);

jsonString = generateString(onFailureDocSize);
BytesReference onFailureSource = new BytesArray(jsonString);
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexData(randomIntBetween(10, 20), randomBoolean())
() -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex)
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

RemoteRefreshSegmentTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
RemoteRefreshSegmentTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
}, 30, TimeUnit.SECONDS);

long rejectionCount = stats.rejectionCount;
stats = stats();
indexDocAndRefresh(initialSource, initialDocsToIndex);
assertEquals(rejectionCount, stats.rejectionCount);
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);
deleteRepo();
return matches.get(0).getStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, XContentType.JSON).get();
refresh(INDEX_NAME);
}
}

/**
* Generates string of given sizeInBytes
*
* @param sizeInBytes size of the string
* @return the generated string
*/
private String generateString(int sizeInBytes) {
StringBuilder sb = new StringBuilder();
sb.append("{");
int i = 0;
// Based on local tests, 1 char is occupying 1 byte
while (sb.length() < sizeInBytes) {
String key = "field" + i;
String value = "value" + i;
sb.append("\"").append(key).append("\":\"").append(value).append("\",");
i++;
}
if (sb.length() > 1 && sb.charAt(sb.length() - 1) == ',') {
sb.setLength(sb.length() - 1);
}
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public long getFailureCount() {
return failureCounter.get();
}

private final double randomControlIOExceptionRate;
private volatile double randomControlIOExceptionRate;

private final double randomDataFileIOExceptionRate;

Expand Down Expand Up @@ -246,6 +246,10 @@ public synchronized void unblock() {
this.notifyAll();
}

public void setRandomControlIOExceptionRate(double randomControlIOExceptionRate) {
this.randomControlIOExceptionRate = randomControlIOExceptionRate;
}

public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
Expand Down

0 comments on commit 9d9a143

Please sign in to comment.