Skip to content

Commit

Permalink
Merge branch 'main' into tokenManager
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Crawford <[email protected]>
  • Loading branch information
stephen-crawford authored May 16, 2023
2 parents ba38c92 + 6f65afb commit c2d4129
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 13 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466))
- SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394))
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
- Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.2.0
- Bump `com.google.protobuf:protobuf-java` from 3.22.2 to 3.22.3
- Bump `jackson` from 2.14.2 to 2.15.0 ([#7286](https://github.com/opensearch-project/OpenSearch/pull/7286))
Expand All @@ -110,6 +111,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump Apache Lucene to 9.6.0 ([#7505](https://github.com/opensearch-project/OpenSearch/pull/7505))
- Bump `com.google.cloud:google-cloud-core-http` from 1.93.3 to 2.17.0 (#7488)
- Bump `com.google.guava:guava` from 30.1.1-jre to 31.1-jre (#7565)
- Bump `com.azure:azure-storage-common` from 12.20.0 to 12.21.0 (#7566)
- Bump `org.apache.commons:commons-compress` from 1.22 to 1.23.0 (#7563)

### Changed
- Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
Expand Down
4 changes: 2 additions & 2 deletions buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ dependencies {
api localGroovy()

api 'commons-codec:commons-codec:1.15'
api 'org.apache.commons:commons-compress:1.22'
api 'org.apache.commons:commons-compress:1.23.0'
api 'org.apache.ant:ant:1.10.13'
api 'com.netflix.nebula:gradle-extra-configurations-plugin:9.0.0'
api 'com.netflix.nebula:nebula-publishing-plugin:20.3.0'
api 'com.netflix.nebula:gradle-info-plugin:12.1.0'
api 'com.netflix.nebula:gradle-info-plugin:12.1.3'
api 'org.apache.rat:apache-rat:0.15'
api 'commons-io:commons-io:2.11.0'
api "net.java.dev.jna:jna:5.13.0"
Expand Down
2 changes: 1 addition & 1 deletion plugins/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ opensearchplugin {
dependencies {
api 'com.azure:azure-core:1.39.0'
api 'com.azure:azure-json:1.0.1'
api 'com.azure:azure-storage-common:12.20.0'
api 'com.azure:azure-storage-common:12.21.0'
api 'com.azure:azure-core-http-netty:1.12.8'
api "io.netty:netty-codec-dns:${versions.netty}"
api "io.netty:netty-codec-socks:${versions.netty}"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0aeabde8faba0ab8b73eadbedfc3c4e52112f5c4
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,14 @@ public boolean isSegRepEnabled() {
return ReplicationType.SEGMENT.equals(replicationType);
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isSegRepWithRemoteEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled() && FeatureFlags.isEnabled(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL);
}

/**
* Returns if remote store is enabled for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3546,9 +3546,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
internalRefreshListener.add(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY
)
);
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -96,7 +98,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry;

public RemoteStoreRefreshListener(IndexShard indexShard) {
private final SegmentReplicationCheckpointPublisher checkpointPublisher;

public RemoteStoreRefreshListener(IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -111,6 +115,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
}
}
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
}

@Override
Expand Down Expand Up @@ -151,6 +156,10 @@ private synchronized void syncSegments(boolean isRetry) {
deleteStaleCommits();
}

// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -190,9 +199,11 @@ private synchronized void syncSegments(boolean isRetry) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
OnSuccessfulSegmentsSync();
onSuccessfulSegmentsSync();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);

checkpointPublisher.publish(indexShard, checkpoint);
} else {
shouldRetry = true;
}
Expand Down Expand Up @@ -229,7 +240,7 @@ private void beforeSegmentsSync(boolean isRetry) {
}
}

private void OnSuccessfulSegmentsSync() {
private void onSuccessfulSegmentsSync() {
// Reset the backoffDelayIterator for the future failures
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void setup(boolean primary, int numberOfDocs) throws IOException {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");

remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY);
}

private void indexDocs(int startDocId, int numberOfDocs) throws IOException {
Expand Down Expand Up @@ -316,7 +317,7 @@ private void mockIndexShardWithRetryAndScheduleRefresh(
return indexShard.getEngine();
}).when(shard).getEngine();

RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY);
refreshListener.afterRefresh(false);
}

Expand Down

0 comments on commit c2d4129

Please sign in to comment.