Skip to content

Commit

Permalink
feat: add dynamic cache limit based on heap size and apply SoftRefere…
Browse files Browse the repository at this point in the history
…nce for snapshot repository data

- Allow cache size configuration in `opensearch.yml`, adjustable within the range of 0-5% of heap memory. The default is set to 1%.
- Applied `SoftReference` to cached repository data for efficient memory management under heap pressure.
- Retained the JVM flag `opensearch.snapshot.cache.size.percentage` for adjusting cache size as a percentage of heap, but defaults to 500KB to minimize impact on existing users.

Signed-off-by: inpink <[email protected]>
  • Loading branch information
inpink committed Oct 30, 2024
1 parent f57b889 commit 6e8e8d3
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic cache limit based on heap size and apply SoftReference for snapshot repository data ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
Expand All @@ -167,6 +169,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -196,6 +199,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
Expand Down Expand Up @@ -253,6 +257,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";

public static final String SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME = "snapshot.repository_data.cache.size";

public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE = 1;

public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE = 5;

/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
Expand All @@ -275,6 +285,32 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

/**
* Sets the cache size for snapshot repository data, defaulting to 1% of node heap memory. The maximum is 5%.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE = new Setting<>(
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME,
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE + "%",
(s) -> {
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME);

ByteSizeValue jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax();
if ((userDefinedLimit.getKbFrac() / jvmHeapSize.getKbFrac()) > (SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE / 100.0)) {
throw new OpenSearchParseException(
"{} ({} KB) cannot exceed {}% of the heap ({} KB).",
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME,
userDefinedLimit.getKb(),
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE,
jvmHeapSize.getKb()
);
}

return userDefinedLimit;
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
Expand Down Expand Up @@ -374,6 +410,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private volatile boolean cacheRepositoryData;

private volatile long repositoryDataCacheSize;

private volatile RateLimiter snapshotRateLimiter;

private volatile RateLimiter restoreRateLimiter;
Expand Down Expand Up @@ -542,6 +580,7 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
repositoryDataCacheSize = SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE.get(metadata.settings()).getBytes();
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
compressor = COMPRESS_SETTING.get(metadata.settings())
Expand Down Expand Up @@ -1132,7 +1171,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
cached = (softRef != null) ? softRef.get() : null;
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
Expand Down Expand Up @@ -2926,15 +2966,19 @@ public void endVerification(String seed) {
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
new SoftReference<>(null)
);

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;

// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
Expand Down Expand Up @@ -2983,7 +3027,8 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
Expand Down Expand Up @@ -3050,12 +3095,14 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
try {
serialized = CompressorRegistry.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
long dynamicCacheSizeLimit = determineCacheSizeLimit();
if (len > dynamicCacheSizeLimit) {
logger.debug(
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
+ " serialized size",
len,
metadata.name()
metadata.name(),
dynamicCacheSizeLimit
);
if (len > ByteSizeUnit.MB.toBytes(5)) {
logger.warn(
Expand All @@ -3074,15 +3121,20 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
latestKnownRepositoryData.updateAndGet(knownRef -> {
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
if (known != null && known.v1() > generation) {
return known;
return knownRef;
}
return new Tuple<>(generation, serialized);
return new SoftReference<>(new Tuple<>(generation, serialized));
});
}
}

protected long determineCacheSizeLimit() {
return repositoryDataCacheSize;
}

private RepositoryData repositoryDataFromCachedEntry(Tuple<Long, BytesReference> cacheEntry) throws IOException {
try (InputStream input = CompressorRegistry.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) {
return RepositoryData.snapshotsFromXContent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.compress.Compressor;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.RepositoryPlugin;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -653,4 +655,71 @@ public void testGetRestrictedSystemRepositorySettings() {
assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
repository.close();
}

public void testRepositoryDataCacheSizeProportionalToHeapSize() {
// given
double[] cachePercentages = new double[] { 0.0, 0.1, 1.0, 5.0 };
final Client client = client();
final Path location = OpenSearchIntegTestCase.randomRepoPath(node().settings());
final String repositoryName = "test-repo";

for (double cachePercentage : cachePercentages) {
Builder settings = buildSettingsWithDynamicCacheLimit(cachePercentage, location);
OpenSearchIntegTestCase.putRepository(client.admin().cluster(), repositoryName, REPO_TYPE, settings);

final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);

long maxHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
double cacheSizeRatio = cachePercentage / 100.0;
long expectedCacheSize = (long) (maxHeapSize * cacheSizeRatio);

// when
long actualCacheSize = repository.determineCacheSizeLimit();

// then
assertEquals(expectedCacheSize, actualCacheSize);
}
}

public void testRepositoryDataUnvalidCacheSize() {
// given
double[] exceedingPercentages = new double[] { -1.0, 5.1, 101.0 };
final Client client = client();
final Path location = OpenSearchIntegTestCase.randomRepoPath(node().settings());
final String repositoryName = "test-repo";

for (double cachePercentage : exceedingPercentages) {
Builder settings = buildSettingsWithDynamicCacheLimit(cachePercentage, location);

// when && then
assertThrows(
RepositoryException.class,
() -> OpenSearchIntegTestCase.putRepository(client.admin().cluster(), repositoryName, REPO_TYPE, settings)
);
}
}

private Builder buildSettingsWithDynamicCacheLimit(double cachePercentage, Path location) {
Builder settings = Settings.builder()
.put(node().settings())
.put("location", location)
.put("snapshot.repository_data.cache.size", cachePercentage + "%");
return settings;
}

public void testRepositoryDataDefaultCacheSize() {
// given
BlobStoreRepository blobStoreRepository = setupRepo();

long maxHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
double cacheSizeRatio = 1 / 100.0;
long expectedCacheSize = (long) (maxHeapSize * cacheSizeRatio);

// when
long actualCacheSize = blobStoreRepository.determineCacheSizeLimit();

// then
assertEquals(expectedCacheSize, actualCacheSize);
}
}

0 comments on commit 6e8e8d3

Please sign in to comment.