Skip to content

Commit

Permalink
[BUG FIX] Fix updating plugins.ml_commons.jvm_heap_memory_threshold t…
Browse files Browse the repository at this point in the history
…akes no effect (#1943)

* fix plugins.ml_commons.jvm_heap_memory_threshold settings ineffective

Signed-off-by: zhichao-aws <[email protected]>

* add integ test

Signed-off-by: zhichao-aws <[email protected]>

* add license header

Signed-off-by: zhichao-aws <[email protected]>

* fix the bug by override getThreshold

Signed-off-by: zhichao-aws <[email protected]>

---------

Signed-off-by: zhichao-aws <[email protected]>
  • Loading branch information
zhichao-aws authored Jan 29, 2024
1 parent 1a436fe commit 126ed3a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.jvmHeapMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return jvmService.stats().getMem().getHeapUsedPercent() > this.getThreshold();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
package org.opensearch.ml.breaker;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmService;
import org.opensearch.monitor.jvm.JvmStats;

Expand All @@ -26,6 +30,9 @@ public class MemoryCircuitBreakerTests {
@Mock
JvmStats.Mem mem;

@Mock
ClusterService clusterService;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
Expand Down Expand Up @@ -60,4 +67,21 @@ public void testIsOpen_CustomThreshold_ExceedMemoryThreshold() {
when(mem.getHeapUsedPercent()).thenReturn((short) 95);
Assert.assertTrue(breaker.isOpen());
}

@Test
public void testIsOpen_UpdatedByClusterSettings_ExceedMemoryThreshold() {
ClusterSettings settingsService = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
settingsService.registerSetting(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD);
when(clusterService.getClusterSettings()).thenReturn(settingsService);

CircuitBreaker breaker = new MemoryCircuitBreaker(Settings.builder().build(), clusterService, jvmService);

when(mem.getHeapUsedPercent()).thenReturn((short) 90);
Assert.assertTrue(breaker.isOpen());

Settings.Builder newSettingsBuilder = Settings.builder();
newSettingsBuilder.put("plugins.ml_commons.jvm_heap_memory_threshold", 95);
settingsService.applySettings(newSettingsBuilder.build());
Assert.assertFalse(breaker.isOpen());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.ml.rest;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;

import java.io.IOException;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.ml.breaker.MemoryCircuitBreaker;
import org.opensearch.ml.utils.TestHelper;

import com.google.common.collect.ImmutableList;

public class RestMLMemoryCircuitBreakerIT extends MLCommonsRestTestCase {
@After
public void tearDown() throws Exception {
super.tearDown();
// restore the threshold to default value
Response response1 = TestHelper
.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":"
+ MemoryCircuitBreaker.DEFAULT_JVM_HEAP_USAGE_THRESHOLD
+ "}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response1.getStatusLine().getStatusCode());
}

public void testRunWithMemoryCircuitBreaker() throws IOException {
// set a low threshold
Response response1 = TestHelper
.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":1}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response1.getStatusLine().getStatusCode());

// expect task fail due to memory limit
Exception exception = assertThrows(ResponseException.class, () -> ingestModelData());
org.hamcrest.MatcherAssert
.assertThat(
exception.getMessage(),
allOf(
containsString("Memory Circuit Breaker is open, please check your resources!"),
containsString("m_l_limit_exceeded_exception")
)
);

// set a higher threshold
Response response2 = TestHelper
.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response2.getStatusLine().getStatusCode());

// expect task success
ingestModelData();
}
}

0 comments on commit 126ed3a

Please sign in to comment.