diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java index 2b5dd6e060..1fb1ef36cf 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/DiskCircuitBreaker.java @@ -5,30 +5,31 @@ package org.opensearch.ml.breaker; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD; + import java.io.File; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.Optional; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.ml.common.exception.MLException; /** * A circuit breaker for disk usage. */ -public class DiskCircuitBreaker extends ThresholdCircuitBreaker { - // TODO: make this value configurable as cluster setting +public class DiskCircuitBreaker extends ThresholdCircuitBreaker { private static final String ML_DISK_CB = "Disk Circuit Breaker"; - public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L; - private static final long GB = 1024 * 1024 * 1024; - private String diskDir; - - public DiskCircuitBreaker(String diskDir) { - super(DEFAULT_DISK_SHORTAGE_THRESHOLD); - this.diskDir = diskDir; - } + public static final ByteSizeValue DEFAULT_DISK_SHORTAGE_THRESHOLD = new ByteSizeValue(5, ByteSizeUnit.GB); + private final File diskDir; - public DiskCircuitBreaker(long threshold, String diskDir) { - super(threshold); + public DiskCircuitBreaker(Settings settings, ClusterService clusterService, File diskDir) { + super(Optional.ofNullable(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.get(settings)).orElse(DEFAULT_DISK_SHORTAGE_THRESHOLD)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD, super::setThreshold); this.diskDir = diskDir; } @@ -42,7 +43,7 @@ public String getName() { public boolean isOpen() { try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB + return new ByteSizeValue(diskDir.getFreeSpace(), ByteSizeUnit.BYTES).compareTo(getThreshold()) < 0; // in GB }); } catch (PrivilegedActionException e) { throw new MLException("Failed to run disk circuit breaker"); diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java index 156c71b69c..8db1b72ffc 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MLCircuitBreakerService.java @@ -5,6 +5,7 @@ package org.opensearch.ml.breaker; +import java.io.File; import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,7 +77,7 @@ public MLCircuitBreakerService init(Path path) { // Register memory circuit breaker registerBreaker(BreakerName.MEMORY, new MemoryCircuitBreaker(this.settings, this.clusterService, this.jvmService)); log.info("Registered ML memory breaker."); - registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString())); + registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(this.settings, this.clusterService, new File(path.toString()))); log.info("Registered ML disk breaker."); // Register native memory circuit breaker, disabling due to unstability. // registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService)); diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java index c1287ef481..92b86e98ef 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/MemoryCircuitBreaker.java @@ -7,6 +7,8 @@ import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD; +import java.util.Optional; + import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.monitor.jvm.JvmService; @@ -15,11 +17,9 @@ * A circuit breaker for memory usage. */ public class MemoryCircuitBreaker extends ThresholdCircuitBreaker { - // TODO: make this value configurable as cluster setting private static final String ML_MEMORY_CB = "Memory Circuit Breaker"; public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85; private final JvmService jvmService; - private volatile Integer jvmHeapMemThreshold = 85; public MemoryCircuitBreaker(JvmService jvmService) { super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD); @@ -32,10 +32,16 @@ public MemoryCircuitBreaker(short threshold, JvmService jvmService) { } public MemoryCircuitBreaker(Settings settings, ClusterService clusterService, JvmService jvmService) { - super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD); + super( + Optional + .ofNullable(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings)) + .map(Integer::shortValue) + .orElse(DEFAULT_JVM_HEAP_USAGE_THRESHOLD) + ); this.jvmService = jvmService; - this.jvmHeapMemThreshold = ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> jvmHeapMemThreshold = it); + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue())); } @Override @@ -43,11 +49,6 @@ public String getName() { return ML_MEMORY_CB; } - @Override - public Short getThreshold() { - return this.jvmHeapMemThreshold.shortValue(); - } - @Override public boolean isOpen() { return getThreshold() < 100 && jvmService.stats().getMem().getHeapUsedPercent() > getThreshold(); diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java index 195a017648..7c0ea6794b 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/NativeMemoryCircuitBreaker.java @@ -7,6 +7,8 @@ import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD; +import java.util.Optional; + import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.monitor.os.OsService; @@ -18,18 +20,22 @@ public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker { private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker"; public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90; private final OsService osService; - private volatile Integer nativeMemThreshold = 90; public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) { - super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD); + super( + Optional + .ofNullable(ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings)) + .map(Integer::shortValue) + .orElse(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD) + ); this.osService = osService; - this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it); + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue())); } public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) { super(threshold.shortValue()); - this.nativeMemThreshold = threshold; this.osService = osService; } @@ -38,13 +44,8 @@ public String getName() { return ML_MEMORY_CB; } - @Override - public Short getThreshold() { - return this.nativeMemThreshold.shortValue(); - } - @Override public boolean isOpen() { - return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue(); + return osService.stats().getMem().getUsedPercent() > getThreshold(); } } diff --git a/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java index 1feec444ab..155a7ba637 100644 --- a/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java +++ b/plugin/src/main/java/org/opensearch/ml/breaker/ThresholdCircuitBreaker.java @@ -5,22 +5,21 @@ package org.opensearch.ml.breaker; +import lombok.Data; + /** * An abstract class for all breakers with threshold. * @param data type of threshold */ +@Data public abstract class ThresholdCircuitBreaker implements CircuitBreaker { - private T threshold; + private volatile T threshold; public ThresholdCircuitBreaker(T threshold) { this.threshold = threshold; } - public T getThreshold() { - return threshold; - } - @Override public abstract boolean isOpen(); } diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index 73fd060a92..df9b01b9a9 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -911,6 +911,7 @@ public List> getSettings() { MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE, MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX, MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD, + MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD, MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, MLCommonsSettings.ML_COMMONS_EXCLUDE_NODE_NAMES, MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN, diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java index 84bdef95fe..1e7a569a09 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java @@ -9,6 +9,8 @@ import java.util.function.Function; import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.ml.common.conversation.ConversationalIndexConstants; import org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAProcessorConstants; @@ -77,6 +79,14 @@ private MLCommonsSettings() {} public static final Setting ML_COMMONS_JVM_HEAP_MEM_THRESHOLD = Setting .intSetting("plugins.ml_commons.jvm_heap_memory_threshold", 85, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ML_COMMONS_DISK_FREE_SPACE_THRESHOLD = Setting + .byteSizeSetting( + "plugins.ml_commons.disk_free_space_threshold", + new ByteSizeValue(5L, ByteSizeUnit.GB), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final Setting ML_COMMONS_EXCLUDE_NODE_NAMES = Setting .simpleString("plugins.ml_commons.exclude_nodes._name", Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN = Setting diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java new file mode 100644 index 0000000000..7eab9d63e8 --- /dev/null +++ b/plugin/src/test/java/org/opensearch/ml/breaker/DiskCircuitBreakerTests.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.breaker; + +import static org.mockito.Mockito.when; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD; + +import java.io.File; +import java.util.HashSet; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; + +public class DiskCircuitBreakerTests { + @Mock + ClusterService clusterService; + + @Mock + File file; + + @Before + public void setup() { + MockitoAnnotations.openMocks(this); + when(clusterService.getClusterSettings()) + .thenReturn(new ClusterSettings(Settings.EMPTY, new HashSet<>(List.of(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD)))); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceIsHigherThanMinValue_breakerIsNotOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker( + Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(4L, ByteSizeUnit.GB)).build(), + clusterService, + file + ); + when(file.getFreeSpace()).thenReturn(5 * 1024 * 1024 * 1024L); + Assert.assertFalse(breaker.isOpen()); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceIsLessThanMinValue_breakerIsOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker( + Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(5L, ByteSizeUnit.GB)).build(), + clusterService, + file + ); + when(file.getFreeSpace()).thenReturn(4 * 1024 * 1024 * 1024L); + Assert.assertTrue(breaker.isOpen()); + } + + @Test + public void test_isOpen_whenDiskFreeSpaceConfiguredToZero_breakerIsNotOpen() { + CircuitBreaker breaker = new DiskCircuitBreaker( + Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(0L, ByteSizeUnit.KB)).build(), + clusterService, + file + ); + when(file.getFreeSpace()).thenReturn((long) (Math.random() * 1024 * 1024 * 1024 * 1024L)); + Assert.assertFalse(breaker.isOpen()); + } + + @Test + public void test_getName() { + CircuitBreaker breaker = new DiskCircuitBreaker(Settings.EMPTY, clusterService, file); + Assert.assertEquals("Disk Circuit Breaker", breaker.getName()); + } +} diff --git a/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java b/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java index 9ed06d0b3c..76f371004e 100644 --- a/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java +++ b/plugin/src/test/java/org/opensearch/ml/breaker/MLCircuitBreakerServiceTests.java @@ -6,6 +6,7 @@ package org.opensearch.ml.breaker; import static org.mockito.Mockito.when; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD; import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD; import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD; @@ -103,7 +104,9 @@ public void testInit() { .build(); ClusterSettings clusterSettings = new ClusterSettings( settings, - new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD)) + new HashSet<>( + Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, ML_COMMONS_DISK_FREE_SPACE_THRESHOLD) + ) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService); diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index 8e013d4305..abfaee0236 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -178,7 +178,8 @@ public void setupSettings() throws IOException { String jsonEntity = "{\n" + " \"persistent\" : {\n" + " \"plugins.ml_commons.jvm_heap_memory_threshold\" : 100, \n" - + " \"plugins.ml_commons.native_memory_threshold\" : 100 \n" + + " \"plugins.ml_commons.native_memory_threshold\" : 100, \n" + + " \"plugins.ml_commons.disk_free_space_threshold\" : 0 \n" + " }\n" + "}"; response = TestHelper