-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add vertical scaling and SoftReference for snapshot repository data cache #16489
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import org.apache.lucene.store.RateLimiter; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.OpenSearchParseException; | ||
import org.opensearch.Version; | ||
import org.opensearch.action.ActionRunnable; | ||
import org.opensearch.action.StepListener; | ||
|
@@ -142,6 +143,7 @@ | |
import org.opensearch.indices.RemoteStoreSettings; | ||
import org.opensearch.indices.recovery.RecoverySettings; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.monitor.jvm.JvmInfo; | ||
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; | ||
import org.opensearch.repositories.IndexId; | ||
import org.opensearch.repositories.IndexMetaDataGenerations; | ||
|
@@ -167,6 +169,7 @@ | |
import java.io.FilterInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.lang.ref.SoftReference; | ||
import java.nio.file.NoSuchFileException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
|
@@ -196,6 +199,7 @@ | |
import java.util.stream.LongStream; | ||
import java.util.stream.Stream; | ||
|
||
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio; | ||
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; | ||
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; | ||
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; | ||
|
@@ -253,6 +257,12 @@ | |
*/ | ||
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__"; | ||
|
||
public static final String SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME = "snapshot.repository_data.cache.size"; | ||
|
||
public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE = 1; | ||
|
||
public static final Integer SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE = 5; | ||
|
||
/** | ||
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository | ||
* contents will not result in the repository being marked as corrupted. | ||
|
@@ -275,6 +285,31 @@ | |
Setting.Property.Deprecated | ||
); | ||
|
||
/** | ||
* Sets the cache size for snapshot repository data, defaulting to 1% of node heap memory. The maximum is 5%. | ||
*/ | ||
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE = new Setting<>( | ||
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME, | ||
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_DEFAULT_PERCENTAGE + "%", | ||
(s) -> { | ||
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME); | ||
ByteSizeValue jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax(); | ||
|
||
if ((userDefinedLimit.getKbFrac() / jvmHeapSize.getKbFrac()) > (SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE / 100.0)) { | ||
throw new OpenSearchParseException( | ||
"{} ({} KB) cannot exceed {}% of the heap ({} KB).", | ||
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_SETTING_NAME, | ||
userDefinedLimit.getKb(), | ||
SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE_MAX_PERCENTAGE, | ||
jvmHeapSize.getKb() | ||
); | ||
} | ||
|
||
return userDefinedLimit; | ||
}, | ||
Setting.Property.NodeScope | ||
); | ||
|
||
/** | ||
* Size hint for the IO buffer size to use when reading from and writing to the repository. | ||
*/ | ||
|
@@ -461,6 +496,8 @@ | |
|
||
private volatile boolean enableAsyncDeletion; | ||
|
||
private long repositoryDataCacheSize; | ||
|
||
/** | ||
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for | ||
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart | ||
|
@@ -515,6 +552,7 @@ | |
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings()); | ||
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings()); | ||
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion); | ||
this.repositoryDataCacheSize = SNAPSHOT_REPOSITORY_DATA_CACHE_SIZE.get(clusterService.getSettings()).getBytes(); | ||
} | ||
|
||
@Override | ||
|
@@ -1132,7 +1170,8 @@ | |
cached = null; | ||
} else { | ||
genToLoad = latestKnownRepoGen.get(); | ||
cached = latestKnownRepositoryData.get(); | ||
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @inpink could you please educate me why the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understand is a WeakReference will be collected even if the JVM isn't under any significant memory pressure if the object isn't otherwise being referenced. The JVM will hang on to a SoftReference using a more complicated formula, but is guaranteed to reclaim the space before OOMing. I admit there's some risk here with a SoftReference, specifically if OpenSearch's circuit breakers prevent the JVM from ever hitting a memory pressure scenario where it will collect soft references, then these references might never be reclaimed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct, this is my understanding, I think if we go with weak, the risks could be largely eliminated, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we'd have the opposite risk: a performance degradation because the weak reference would almost always be collected before it could be re-used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point, so what if we go with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe @andrross has provided an excellent explanation for why SoftReference was chosen. Thank you very much! We previously discussed SoftReference in this issue, so I’m sharing it here in case it might be helpful: link to issue discussion. Any additional thoughts you might have would be greatly appreciated. @reta, if I understood correctly, your suggestion of a TTL was to help reduce the unpredictability associated with SoftReference—would that be accurate? This cache is accessed each time a snapshot is added, deleted, or restored. If snapshots are taken every hour over 14 days, this would mean metadata for 336 snapshots would be cached (following AWS OpenSearch’s snapshot storage approach as described here). In this case, may I ask what you would consider an appropriate TTL duration? |
||
cached = (softRef != null) ? softRef.get() : null; | ||
} | ||
if (genToLoad > generation) { | ||
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just | ||
|
@@ -2926,15 +2965,19 @@ | |
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN); | ||
|
||
// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json | ||
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>(); | ||
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>( | ||
new SoftReference<>(null) | ||
); | ||
|
||
@Override | ||
public void getRepositoryData(ActionListener<RepositoryData> listener) { | ||
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) { | ||
listener.onFailure(corruptedStateException(null)); | ||
return; | ||
} | ||
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get(); | ||
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get(); | ||
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null; | ||
|
||
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with | ||
// the latest known repository generation | ||
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) { | ||
|
@@ -2983,7 +3026,8 @@ | |
genToLoad = latestKnownRepoGen.get(); | ||
} | ||
try { | ||
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get(); | ||
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get(); | ||
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null; | ||
final RepositoryData loaded; | ||
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details | ||
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) { | ||
|
@@ -3050,12 +3094,14 @@ | |
try { | ||
serialized = CompressorRegistry.defaultCompressor().compress(updated); | ||
final int len = serialized.length(); | ||
if (len > ByteSizeUnit.KB.toBytes(500)) { | ||
|
||
if (len > repositoryDataCacheSize) { | ||
logger.debug( | ||
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" | ||
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in" | ||
+ " serialized size", | ||
len, | ||
metadata.name() | ||
metadata.name(), | ||
repositoryDataCacheSize | ||
Check warning on line 3104 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java Codecov / codecov/patchserver/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3103-L3104
|
||
); | ||
if (len > ByteSizeUnit.MB.toBytes(5)) { | ||
logger.warn( | ||
|
@@ -3074,11 +3120,12 @@ | |
logger.warn("Failed to serialize repository data", e); | ||
return; | ||
} | ||
latestKnownRepositoryData.updateAndGet(known -> { | ||
latestKnownRepositoryData.updateAndGet(knownRef -> { | ||
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null; | ||
if (known != null && known.v1() > generation) { | ||
return known; | ||
return knownRef; | ||
Check warning on line 3126 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java Codecov / codecov/patchserver/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3126
|
||
} | ||
return new Tuple<>(generation, serialized); | ||
return new SoftReference<>(new Tuple<>(generation, serialized)); | ||
}); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was quite a bit work done to centralize cache management:
CacheService
to create caches (and basically provide an opportunity to make sure the caches configurations are sane)NodeCacheStats
to provide the cache utilization statisticsIt looks to me this is the direction we should be taking, no? @peteralfonsi @sgup432 what do you think folks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result caching is substantially more complex than what is happening here. I would really really like to avoid exposing anything in a stat API about this behavior. I would also like to avoid adding a cluster setting too. The original proposal was "that we have cache size which is x% of heap size" instead of just a fixed 500KB threshold. The use case here is to simply memoize the latest snapshot metadata because there's a good chance it will need to be accessed again. I'm wondering if changing the threshold to a fixed percentage of the heap (I know settling on the specific value can be difficult) plus the new soft reference behavior as a backstop against OOMs is good enough? No need for any setting or stat or anything like that. @ashking94 @reta what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this is the problem: what is relationship between heap size and snapshot metadata? Fixed size is at least predictable measure, but 1% of 32Gb heap is ~300Mb. I think just keeping the weak / soft reference to the snapshot metadata should be sufficient, if JVM is low on heap, those will be reclaimed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, @reta!
The snapshot metadata grows with the number of indices and snapshots. As a best practice, the number of indices should be limited based on the available heap memory. I’ve provided more details in my comment, including an estimate of how much the snapshot metadata cache size may increase according to AWS-recommended best practices.
Thank you for your review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but to be clear, how often do we access all the snapshots? In my experience we are mostly concerned with the most recent one(s) (for restoring) or the oldest one(s) (which contain the data about indices that haven't changed much).
Both of these would be well served by a LRU / TTL model.
This alone is a good argument for making the value a fixed percentage of heap, but I'm not sure that multiplying by the number of snapshots is necessary beyond a day or three's worth.