Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into tiramisu-stats-tsc-…
Browse files Browse the repository at this point in the history
…final
  • Loading branch information
Peter Alfonsi committed Apr 30, 2024
2 parents 2a5f5e2 + 5d61ac2 commit af99c33
Show file tree
Hide file tree
Showing 51 changed files with 3,052 additions and 139 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
Expand Down Expand Up @@ -60,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula.ospackage-base` from 11.8.1 to 11.9.0 ([#13440](https://github.com/opensearch-project/OpenSearch/pull/13440))
- Bump `org.bouncycastle:bc-fips` from 1.0.2.4 to 1.0.2.5 ([#13446](https://github.com/opensearch-project/OpenSearch/pull/13446))
- Bump `lycheeverse/lychee-action` from 1.9.3 to 1.10.0 ([#13447](https://github.com/opensearch-project/OpenSearch/pull/13447))
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ plugins {
id 'opensearch.docker-support'
id 'opensearch.global-build-info'
id "com.diffplug.spotless" version "6.25.0" apply false
id "org.gradle.test-retry" version "1.5.8" apply false
id "org.gradle.test-retry" version "1.5.9" apply false
id "test-report-aggregation"
id 'jacoco-report-aggregation'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,90 @@ teardown:
index: test_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}

---
"Test bulk API with batch enabled happy case":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
batch_size: 2
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
- '{"index": {"_index": "test_index", "_id": "test_id3"}}'
- '{"text": "text3"}'
- '{"index": {"_index": "test_index", "_id": "test_id4"}}'
- '{"text": "text4"}'
- '{"index": {"_index": "test_index", "_id": "test_id5", "pipeline": "pipeline2"}}'
- '{"text": "text5"}'
- '{"index": {"_index": "test_index", "_id": "test_id6", "pipeline": "pipeline2"}}'
- '{"text": "text6"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id5
- match: { _source: {"text": "text5", "field2": "value2"}}

- do:
get:
index: test_index
id: test_id3
- match: { _source: { "text": "text3", "field1": "value1" } }

---
"Test bulk API with batch_size missing":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id1
- match: { _source: { "text": "text1", "field1": "value1" } }

- do:
get:
index: test_index
id: test_id2
- match: { _source: { "text": "text2", "field1": "value1" } }

---
"Test bulk API with invalid batch_size":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
catch: bad_request
bulk:
refresh: true
batch_size: -1
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
Original file line number Diff line number Diff line change
Expand Up @@ -829,15 +829,16 @@ public void testInvalidateWithDropDimensions() throws Exception {

ICacheKey<String> keyToDrop = keysAdded.get(0);

ImmutableCacheStats snapshot = ehCacheDiskCachingTier.stats().getStatsForDimensionValues(keyToDrop.dimensions);
String[] levels = dimensionNames.toArray(new String[0]);
ImmutableCacheStats snapshot = ehCacheDiskCachingTier.stats(levels).getStatsForDimensionValues(keyToDrop.dimensions);
assertNotNull(snapshot);

keyToDrop.setDropStatsForDimensions(true);
ehCacheDiskCachingTier.invalidate(keyToDrop);

// Now assert the stats are gone for any key that has this combination of dimensions, but still there otherwise
for (ICacheKey<String> keyAdded : keysAdded) {
snapshot = ehCacheDiskCachingTier.stats().getStatsForDimensionValues(keyAdded.dimensions);
snapshot = ehCacheDiskCachingTier.stats(levels).getStatsForDimensionValues(keyAdded.dimensions);
if (keyAdded.dimensions.equals(keyToDrop.dimensions)) {
assertNull(snapshot);
} else {
Expand Down
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
"batch_size": {
"type": "int",
"description": "Sets the batch size"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,10 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1136,7 +1140,7 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand All @@ -1155,6 +1159,10 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1173,13 +1181,13 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 300;
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10)
Expand All @@ -1194,37 +1202,41 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
setupIndex(client, index1);
setupIndex(client, index2);

// assert cache is empty for index1
assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// create first cache entry in index1
createCacheEntry(client, index1, "hello");
assertCacheState(client, index1, 0, 1);
long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1 > 0);
long memorySizeForIndex1With1Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With1Entries > 0);

// create second cache entry in index1
createCacheEntry(client, index1, "there");
assertCacheState(client, index1, 0, 2);
long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1);
long memorySizeForIndex1With2Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With2Entries > memorySizeForIndex1With1Entries);

// assert cache is empty for index2
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// create first cache entry in index2
createCacheEntry(client, index2, "hello");
assertCacheState(client, index2, 0, 1);
assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0);

// force refresh index 1 so that it creates 2 stale keys
flushAndRefresh(index1);
// create another cache entry in index 1, this should not be cleaned up.
// force refresh both index1 and index2
flushAndRefresh(index1, index2);
// create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up.
createCacheEntry(client, index1, "hello");
// record the size of this entry
long memorySizeOfLatestEntryForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes() - finalMemorySizeForIndex1;
// force refresh index 2 so that it creates 1 stale key
flushAndRefresh(index2);
// sleep until cache cleaner would have cleaned up the stale key from index 2
// sleep until cache cleaner would have cleaned up the stale key from index2
assertBusy(() -> {
// cache cleaner should have cleaned up the stale key from index 2
// cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities
assertEquals(memorySizeOfLatestEntryForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities for index1
long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
// assert the memory size of index1 to only contain 1 entry added after flushAndRefresh
assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1);
// cache for index1 should not be empty since there was an item cached after flushAndRefresh
assertTrue(currentMemorySizeInBytesForIndex1 > 0);
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public void testAdmissionControlResponseStatus() throws Exception {

@Override
public void sendResponse(RestResponse response) {
waitForResponse.countDown();
aliasResponse.set(response);
waitForResponse.countDown();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
Expand Down Expand Up @@ -199,4 +200,25 @@ public void setRefreshFrequency(int refreshFrequency) {
this.refreshFrequency = refreshFrequency;
}
}

public void excludeNodeSet(String attr, String value) {
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.exclude._" + attr, value))
.get()
);
}

public void stopShardRebalancing() {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build())
.get()
);
}
}
Loading

0 comments on commit af99c33

Please sign in to comment.