Skip to content

Commit

Permalink
Make index and global metadata upload timeout dynamic cluster settings (
Browse files Browse the repository at this point in the history
#10814)

* Make index and global metadata upload wait time dynamic

Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
rahulkarajgikar authored Oct 23, 2023
1 parent 218a2ef commit 8f13dee
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

// TODO make this two variable as dynamic setting [issue: #10688]
public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.global_metadata.upload_timeout",
GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
Expand Down Expand Up @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);

public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
Expand Down Expand Up @@ -171,7 +188,11 @@ public RemoteClusterStateService(
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -367,7 +388,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
);

try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
Expand Down Expand Up @@ -422,7 +443,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
}

try {
if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
Expand Down Expand Up @@ -621,6 +642,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) {
this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout;
}

private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) {
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}

public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
public class RemoteClusterStateServiceTests extends OpenSearchTestCase {

private RemoteClusterStateService remoteClusterStateService;
private ClusterSettings clusterSettings;
private Supplier<RepositoriesService> repositoriesServiceSupplier;
private RepositoriesService repositoriesService;
private BlobStoreRepository blobStoreRepository;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void setup() {
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();

clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
Stream.of(
NetworkModule.getNamedXContents().stream(),
Expand All @@ -149,7 +151,7 @@ public void setup() {
"test-node-id",
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
clusterSettings,
() -> 0L,
threadPool
);
Expand Down Expand Up @@ -1053,6 +1055,38 @@ public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Excepti
assertBusy(() -> assertEquals(1, callCount.get()));
}

public void testIndexMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getIndexMetadataUploadTimeout()
);

// verify update index metadata upload timeout
int indexMetadataUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds());
}

public void testGlobalMetadataUploadWaitTimeSetting() {
// verify default value
assertEquals(
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
remoteClusterStateService.getGlobalMetadataUploadTimeout()
);

// verify update global metadata upload timeout
int globalMetadataUploadTimeout = randomIntBetween(1, 10);
Settings newSettings = Settings.builder()
.put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s")
.build();
clusterSettings.applySettings(newSettings);
assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds());
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false);
}
Expand Down

0 comments on commit 8f13dee

Please sign in to comment.