Skip to content

Commit

Permalink
feat: add vertical scaling and SoftReference for snapshot repository …
Browse files Browse the repository at this point in the history
…data cache

- Allows cache size configuration in `opensearch.yml`, adjustable within a range of 0-5% of heap memory, with a default of 1%.
- Applies `SoftReference` to cached repository data for efficient memory management under heap pressure.

Signed-off-by: inpink <[email protected]>
  • Loading branch information
inpink committed Oct 31, 2024
1 parent 4b284c5 commit 0aedcb8
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -142,6 +143,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
Expand All @@ -167,6 +169,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -196,6 +199,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
Expand Down Expand Up @@ -253,6 +257,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";

public static final String SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME = "snapshot.repository_data.cache.size";

public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE = 1;

public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE = 5;

/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
Expand All @@ -275,6 +285,31 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

/**
* Sets the cache size for snapshot repository data, defaulting to 1% of node heap memory. The maximum is 5%.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE = new Setting<>(
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME,
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE + "%",
(s) -> {
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME);
ByteSizeValue jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax();

if ((userDefinedLimit.getKbFrac() / jvmHeapSize.getKbFrac()) > (SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE / 100.0)) {
throw new OpenSearchParseException(
"{} ({} KB) cannot exceed {}% of the heap ({} KB).",
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME,
userDefinedLimit.getKb(),
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE,
jvmHeapSize.getKb()
);
}

return userDefinedLimit;
},
Setting.Property.NodeScope
);

/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
Expand Down Expand Up @@ -374,6 +409,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private volatile boolean cacheRepositoryData;

private volatile long repositoryDataCacheSize;

private volatile RateLimiter snapshotRateLimiter;

private volatile RateLimiter restoreRateLimiter;
Expand Down Expand Up @@ -515,6 +552,7 @@ protected BlobStoreRepository(
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
this.repositoryDataCacheSize = SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE.get(clusterService.getSettings()).getBytes();
}

@Override
Expand Down Expand Up @@ -1132,7 +1170,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
cached = (softRef != null) ? softRef.get() : null;
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
Expand Down Expand Up @@ -2926,15 +2965,19 @@ public void endVerification(String seed) {
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
new SoftReference<>(null)
);

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;

// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
Expand Down Expand Up @@ -2983,7 +3026,8 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
Expand Down Expand Up @@ -3050,12 +3094,14 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
try {
serialized = CompressorRegistry.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {

if (len > repositoryDataCacheSize) {
logger.debug(
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
+ " serialized size",
len,
metadata.name()
metadata.name(),
repositoryDataCacheSize
);
if (len > ByteSizeUnit.MB.toBytes(5)) {
logger.warn(
Expand All @@ -3074,11 +3120,12 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
latestKnownRepositoryData.updateAndGet(knownRef -> {
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
if (known != null && known.v1() > generation) {
return known;
return knownRef;
}
return new Tuple<>(generation, serialized);
return new SoftReference<>(new Tuple<>(generation, serialized));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -127,22 +128,67 @@ public void testIndicesFieldDataCacheSetting() {
);
}

public void testSnapshotRepositoryDataCacheSizeSetting() {
assertMemorySizeSettingWithinLimit(
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE,
"snapshot.repository_data.cache.size",
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.01)),
5.0
);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue) {
assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue, Settings settings) {
assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, settings);
}

private void assertMemorySizeSetting(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
double availablePercentage,
Settings settings
) {
assertThat(setting, notNullValue());
assertThat(setting.getKey(), equalTo(settingKey));
assertThat(setting.getProperties(), hasItem(Property.NodeScope));
assertThat(setting.getDefault(settings), equalTo(defaultValue));
Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build();
Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build();
assertThat(
setting.get(settingWithPercentage),
equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25)))
equalTo(
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage)))
)
);
Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024)));
}

private void assertMemorySizeSettingWithinLimit(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
double maxPercentage
) {
assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, Settings.EMPTY);

assertThrows(IllegalArgumentException.class, () -> {
double unavailablePercentage = maxPercentage + 1.0;
Settings settingWithPercentageExceedingLimit = Settings.builder()
.put(settingKey, percentageAsString(unavailablePercentage))
.build();
setting.get(settingWithPercentageExceedingLimit);
});
}

private double percentageAsFraction(double availablePercentage) {
return availablePercentage / 100.0;
}

private String percentageAsString(double availablePercentage) {
return availablePercentage + "%";
}
}

0 comments on commit 0aedcb8

Please sign in to comment.