Skip to content

Commit

Permalink
Merge remote-tracking branch 'opensearch-project/main' into async-fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Mar 20, 2024
2 parents 0d23917 + c369ec4 commit ca53a44
Show file tree
Hide file tree
Showing 23 changed files with 391 additions and 51 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:

- name: Create PR for BASE
id: base_pr
uses: peter-evans/create-pull-request@v5
uses: peter-evans/create-pull-request@v6
with:
base: ${{ env.BASE }}
branch: 'create-pull-request/patch-${{ env.BASE }}'
Expand All @@ -88,7 +88,7 @@ jobs:

- name: Create PR for BASE_X
id: base_x_pr
uses: peter-evans/create-pull-request@v5
uses: peter-evans/create-pull-request@v6
with:
base: ${{ env.BASE_X }}
branch: 'create-pull-request/patch-${{ env.BASE_X }}'
Expand All @@ -114,7 +114,7 @@ jobs:

- name: Create PR for main
id: main_pr
uses: peter-evans/create-pull-request@v5
uses: peter-evans/create-pull-request@v6
with:
base: main
branch: 'create-pull-request/patch-main'
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand All @@ -141,11 +142,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `aws-sdk-java` from 2.20.55 to 2.20.86 ([#12251](https://github.com/opensearch-project/OpenSearch/pull/12251))
- Bump `reactor-netty` from 1.1.15 to 1.1.17 ([#12633](https://github.com/opensearch-project/OpenSearch/pull/12633))
- Bump `reactor` from 3.5.14 to 3.5.15 ([#12633](https://github.com/opensearch-project/OpenSearch/pull/12633))
- Bump `peter-evans/create-pull-request` from 5 to 6 ([#12724](https://github.com/opensearch-project/OpenSearch/pull/12724))

### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))
- Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631))
- Keep the election scheduler open until cluster state has been applied ([#11699](https://github.com/opensearch-project/OpenSearch/pull/11699))

### Deprecated

Expand Down
4 changes: 2 additions & 2 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ bouncycastle=1.77
randomizedrunner = 2.7.1
junit = 4.13.2
hamcrest = 2.1
mockito = 5.10.0
mockito = 5.11.0
objenesis = 3.2
bytebuddy = 1.14.7
bytebuddy = 1.14.9

# benchmark dependencies
jmh = 1.35
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cache.store.disk;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -47,6 +49,7 @@
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;

@ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class })
public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase {

private static final int CACHE_SIZE_IN_BYTES = 1024 * 101;
Expand Down Expand Up @@ -633,6 +636,47 @@ public void testBasicGetAndPutBytesReference() throws Exception {
}
}

public void testInvalidate() throws Exception {
Settings settings = Settings.builder().build();
MockRemovalListener<String, String> removalListener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, String> ehcacheTest = new EhcacheDiskCache.Builder<String, String>().setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setValueType(String.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES)
.setRemovalListener(removalListener)
.build();
int randomKeys = randomIntBetween(10, 100);
Map<String, String> keyValueMap = new HashMap<>();
for (int i = 0; i < randomKeys; i++) {
keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
ehcacheTest.put(entry.getKey(), entry.getValue());
}
assertEquals(keyValueMap.size(), ehcacheTest.count());
List<String> removedKeyList = new ArrayList<>();
for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
if (randomBoolean()) {
removedKeyList.add(entry.getKey());
ehcacheTest.invalidate(entry.getKey());
}
}
for (String removedKey : removedKeyList) {
assertNull(ehcacheTest.get(removedKey));
}
assertEquals(keyValueMap.size() - removedKeyList.size(), ehcacheTest.count());
ehcacheTest.close();
}
}

private static String generateRandomString(int length) {
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder randomString = new StringBuilder(length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.store.disk;

import com.carrotsearch.randomizedtesting.ThreadFilter;

/**
* In Ehcache(as of 3.10.8), while calling remove/invalidate() on entries causes to start a daemon thread in the
* background to clean up the stale offheap memory associated with the disk cache. And this thread is not closed even
* after we try to close the cache or cache manager. Considering that it requires a node restart to switch between
* different cache plugins, this shouldn't be a problem for now.
*
* See: https://github.com/ehcache/ehcache3/issues/3204
*/
public class EhcacheThreadLeakFilter implements ThreadFilter {

private static final String OFFENDING_THREAD_NAME = "MappedByteBufferSource";

@Override
public boolean reject(Thread t) {
return t.getName().startsWith(OFFENDING_THREAD_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
# will return a task immediately and the merge process will run in background.

- skip:
version: " - 2.99.99"
reason: "only available in 3.0+"
features: allowed_warnings
version: " - 2.6.99, 2.13.0 - "
reason: "wait_for_completion was introduced in 2.7.0 and task description was changed in 2.13.0"
features: allowed_warnings, node_selector

- do:
indices.create:
index: test_index

- do:
node_selector:
version: " 2.7.0 - 2.12.99"
indices.forcemerge:
index: test_index
wait_for_completion: false
Expand All @@ -25,8 +27,31 @@
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }

---
"Force merge index with wait_for_completion after task description changed":
- skip:
version: " - 2.12.99 "
reason: "task description was changed in 2.13.0"
features: allowed_warnings, node_selector

- do:
node_selector:
version: " 2.13.0 - "
indices.forcemerge:
index: test_index
wait_for_completion: false
max_num_segments: 1
- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }
# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
Expand Down Expand Up @@ -219,7 +219,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public void onFailure(String source, Exception e) {

@Override
public void onSuccess(String source) {
closePrevotingAndElectionScheduler();
applyListener.onResponse(null);
}
});
Expand Down Expand Up @@ -472,17 +473,29 @@ private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, Disco
}

private void closePrevotingAndElectionScheduler() {
closePrevoting();
closeElectionScheduler();
}

private void closePrevoting() {
if (prevotingRound != null) {
prevotingRound.close();
prevotingRound = null;
}
}

private void closeElectionScheduler() {
if (electionScheduler != null) {
electionScheduler.close();
electionScheduler = null;
}
}

// package-visible for testing
boolean isElectionSchedulerRunning() {
return electionScheduler != null;
}

private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
maxTermSeen = Math.max(maxTermSeen, term);
Expand Down Expand Up @@ -724,7 +737,7 @@ void becomeLeader(String method) {
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

assert leaderChecker.leader() == null : leaderChecker.leader();
Expand Down Expand Up @@ -761,7 +774,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
closePrevoting();
cancelActivePublication("become follower: " + method);
preVoteCollector.update(getPreVoteResponse(), leaderNode);

Expand Down Expand Up @@ -927,7 +940,6 @@ public void invariant() {
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert becomingClusterManager || getStateForClusterManagerService().nodes().getClusterManagerNodeId() != null
: getStateForClusterManagerService();
Expand Down Expand Up @@ -972,7 +984,6 @@ assert getLocalNode().equals(applierState.nodes().getClusterManagerNode())
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader;
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForClusterManagerService().nodes().getClusterManagerNodeId() == null : getStateForClusterManagerService();
assert leaderChecker.currentNodeIsClusterManager() == false;
Expand Down Expand Up @@ -1693,6 +1704,7 @@ public void onSuccess(String source) {
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
closePrevotingAndElectionScheduler();
// if necessary, abdicate to another node or improve the voting configuration
boolean attemptReconfiguration = true;
final ClusterState state = getLastAcceptedState(); // committed state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
static final String KEY_ROLLOVER_INFOS = "rollover_info";
static final String KEY_SYSTEM = "system";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store";

public static final String INDEX_STATE_FILE_PREFIX = "state-";

Expand Down
Loading

0 comments on commit ca53a44

Please sign in to comment.