diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b672e21372f8..a962953ecaa6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,10 +89,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466)) +- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) - Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) ### Dependencies -- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0 +- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) - Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.2.0 - Bump `com.google.protobuf:protobuf-java` from 3.22.2 to 3.22.3 - Bump `jackson` from 2.14.2 to 2.15.0 ([#7286](https://github.com/opensearch-project/OpenSearch/pull/7286)) @@ -110,6 +111,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump Apache Lucene to 9.6.0 ([#7505](https://github.com/opensearch-project/OpenSearch/pull/7505)) - Bump `com.google.cloud:google-cloud-core-http` from 1.93.3 to 2.17.0 (#7488) - Bump `com.google.guava:guava` from 30.1.1-jre to 31.1-jre (#7565) +- Bump `com.azure:azure-storage-common` from 12.20.0 to 12.21.0 (#7566) +- Bump `org.apache.commons:commons-compress` from 1.22 to 1.23.0 (#7563) ### Changed - Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 19a0b951a9f56..32020f00ef7ae 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -103,11 +103,11 @@ dependencies { api localGroovy() api 'commons-codec:commons-codec:1.15' - api 'org.apache.commons:commons-compress:1.22' + api 'org.apache.commons:commons-compress:1.23.0' api 'org.apache.ant:ant:1.10.13' api 'com.netflix.nebula:gradle-extra-configurations-plugin:9.0.0' api 'com.netflix.nebula:nebula-publishing-plugin:20.3.0' - api 'com.netflix.nebula:gradle-info-plugin:12.1.0' + api 'com.netflix.nebula:gradle-info-plugin:12.1.3' api 'org.apache.rat:apache-rat:0.15' api 'commons-io:commons-io:2.11.0' api "net.java.dev.jna:jna:5.13.0" diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 033a78060fde9..7b18facadcb30 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -46,7 +46,7 @@ opensearchplugin { dependencies { api 'com.azure:azure-core:1.39.0' api 'com.azure:azure-json:1.0.1' - api 'com.azure:azure-storage-common:12.20.0' + api 'com.azure:azure-storage-common:12.21.0' api 'com.azure:azure-core-http-netty:1.12.8' api "io.netty:netty-codec-dns:${versions.netty}" api "io.netty:netty-codec-socks:${versions.netty}" diff --git a/plugins/repository-azure/licenses/azure-storage-common-12.20.0.jar.sha1 b/plugins/repository-azure/licenses/azure-storage-common-12.20.0.jar.sha1 deleted file mode 100644 index 1fd3f8066411d..0000000000000 --- a/plugins/repository-azure/licenses/azure-storage-common-12.20.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -69b962bbeea787c6aca83115472791aacc2ae94c \ No newline at end of file diff --git a/plugins/repository-azure/licenses/azure-storage-common-12.21.0.jar.sha1 b/plugins/repository-azure/licenses/azure-storage-common-12.21.0.jar.sha1 new file mode 100644 index 0000000000000..7bfa9273776e8 --- /dev/null +++ b/plugins/repository-azure/licenses/azure-storage-common-12.21.0.jar.sha1 @@ -0,0 +1 @@ +0aeabde8faba0ab8b73eadbedfc3c4e52112f5c4 \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 04c97a2f41aaa..746cbb46aaaca 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1023,6 +1023,14 @@ public boolean isSegRepEnabled() { return ReplicationType.SEGMENT.equals(replicationType); } + public boolean isSegRepLocalEnabled() { + return isSegRepEnabled() && !isSegRepWithRemoteEnabled(); + } + + public boolean isSegRepWithRemoteEnabled() { + return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL); + } + /** * Returns if remote store is enabled for this index. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 66d095878d123..4254586f3d70e 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) { + if (didRefresh + && shard.state() == IndexShardState.STARTED + && shard.getReplicationTracker().isPrimaryMode() + && !shard.indexSettings.isSegRepWithRemoteEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f2aa886aed374..a6225569d86b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3546,9 +3546,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add(new RemoteStoreRefreshListener(this)); + internalRefreshListener.add( + new RemoteStoreRefreshListener( + this, + // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. + indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY + ) + ); } - if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { + + if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8672ba6c59a13..ac9c35aaee6b5 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -29,6 +29,8 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import java.io.IOException; import java.util.Collection; @@ -96,7 +98,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - public RemoteStoreRefreshListener(IndexShard indexShard) { + private final SegmentReplicationCheckpointPublisher checkpointPublisher; + + public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) { this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -111,6 +115,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) { } } resetBackOffDelayIterator(); + this.checkpointPublisher = checkpointPublisher; } @Override @@ -151,6 +156,10 @@ private synchronized void syncSegments(boolean isRetry) { deleteStaleCommits(); } + // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can + // move. + ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + String segmentInfoSnapshotFilename = null; try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); @@ -190,9 +199,11 @@ private synchronized void syncSegments(boolean isRetry) { .filter(file -> !localSegmentsPostRefresh.contains(file)) .collect(Collectors.toSet()) .forEach(localSegmentChecksumMap::remove); - OnSuccessfulSegmentsSync(); + onSuccessfulSegmentsSync(); final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + + checkpointPublisher.publish(indexShard, checkpoint); } else { shouldRetry = true; } @@ -229,7 +240,7 @@ private void beforeSegmentsSync(boolean isRetry) { } } - private void OnSuccessfulSegmentsSync() { + private void onSuccessfulSegmentsSync() { // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 84848bb87d634..7ae18771ee0fa 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -24,6 +24,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -52,7 +53,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { indexDocs(1, numberOfDocs); indexShard.refresh("test"); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -316,7 +317,7 @@ private void mockIndexShardWithRetryAndScheduleRefresh( return indexShard.getEngine(); }).when(shard).getEngine(); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY); refreshListener.afterRefresh(false); }