Skip to content

Commit

Permalink
[Snapshot Interop] Add Changes in Create Snapshot Flow for remote sto…
Browse files Browse the repository at this point in the history
…re interoperability. (#7118)

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
harishbhakuni authored Jun 8, 2023
1 parent f25f61b commit 1803fd9
Show file tree
Hide file tree
Showing 26 changed files with 1,535 additions and 107 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836))
- Add min, max, average and thread info to resource stats in tasks API ([#7673](https://github.com/opensearch-project/OpenSearch/pull/7673))
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) {
Map.of(),
null,
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random())
randomVersion(random()),
false
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public static Entry startedEntry(
long repositoryStateId,
final Map<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
return new SnapshotsInProgress.Entry(
snapshot,
Expand All @@ -127,7 +128,8 @@ public static Entry startedEntry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -164,7 +166,8 @@ public static Entry startClone(
Collections.emptyMap(),
version,
source,
Map.of()
Map.of(),
false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create.
);
}

Expand All @@ -177,6 +180,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean remoteStoreIndexShallowCopy;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
Expand Down Expand Up @@ -219,7 +223,8 @@ public Entry(
final Map<ShardId, ShardSnapshotStatus> shards,
String failure,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -235,7 +240,8 @@ public Entry(
userMetadata,
version,
null,
Map.of()
Map.of(),
remoteStoreIndexShallowCopy
);
}

Expand All @@ -253,7 +259,8 @@ private Entry(
final Map<String, Object> userMetadata,
Version version,
@Nullable SnapshotId source,
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones
@Nullable final Map<RepositoryShardId, ShardSnapshotStatus> clones,
boolean remoteStoreIndexShallowCopy
) {
this.state = state;
this.snapshot = snapshot;
Expand All @@ -274,6 +281,7 @@ private Entry(
} else {
this.clones = Collections.unmodifiableMap(clones);
}
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

Expand All @@ -292,6 +300,11 @@ private Entry(StreamInput in) throws IOException {
dataStreams = in.readStringList();
source = in.readOptionalWriteable(SnapshotId::new);
clones = in.readMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
remoteStoreIndexShallowCopy = in.readBoolean();
} else {
remoteStoreIndexShallowCopy = false;
}
}

private static boolean assertShardsConsistent(
Expand Down Expand Up @@ -346,7 +359,8 @@ public Entry(
long repositoryStateId,
final Map<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata,
Version version
Version version,
boolean remoteStoreIndexShallowCopy
) {
this(
snapshot,
Expand All @@ -360,7 +374,8 @@ public Entry(
shards,
null,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -385,7 +400,8 @@ public Entry(
shards,
failure,
entry.userMetadata,
version
version,
entry.remoteStoreIndexShallowCopy
);
}

Expand All @@ -409,7 +425,8 @@ public Entry withRepoGen(long newRepoGen) {
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -431,7 +448,8 @@ public Entry withClones(final Map<RepositoryShardId, ShardSnapshotStatus> update
userMetadata,
version,
source,
updatedClones
updatedClones,
remoteStoreIndexShallowCopy
);
}

Expand Down Expand Up @@ -486,7 +504,8 @@ public Entry fail(final Map<ShardId, ShardSnapshotStatus> shards, State state, S
userMetadata,
version,
source,
clones
clones,
remoteStoreIndexShallowCopy
);
}

Expand All @@ -512,7 +531,8 @@ public Entry withShardStates(final Map<ShardId, ShardSnapshotStatus> shards) {
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
}
return withStartedShards(shards);
Expand All @@ -535,7 +555,8 @@ public Entry withStartedShards(final Map<ShardId, ShardSnapshotStatus> shards) {
shards,
failure,
userMetadata,
version
version,
remoteStoreIndexShallowCopy
);
assert updated.state().completed() == false && completed(updated.shards().values()) == false
: "Only running snapshots allowed but saw [" + updated + "]";
Expand Down Expand Up @@ -567,6 +588,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public Map<String, Object> userMetadata() {
return userMetadata;
}
Expand Down Expand Up @@ -630,7 +655,7 @@ public boolean equals(Object o) {
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false;
return true;
}

Expand All @@ -647,6 +672,7 @@ public int hashCode() {
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -710,6 +736,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(dataStreams);
out.writeOptionalWriteable(source);
out.writeMap(clones, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(remoteStoreIndexShallowCopy);
}
}

@Override
Expand Down
40 changes: 40 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,46 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public GatedCloseable<IndexCommit> acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException {
GatedCloseable<IndexCommit> indexCommit = acquireLastIndexCommit(flushFirst);
getEngine().refresh("Snapshot for Remote Store based Shard");
return indexCommit;
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in acquiring lock in remote store.
*/
public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId);
}

/**
*
* @param snapshotId Snapshot UUID.
* @param primaryTerm current primary term.
* @param generation Snapshot Commit Generation.
* @throws IOException if there is some failure in releasing lock in remote store.
*/
public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard();
remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId);
}

private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() {
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
Expand Down
Loading

0 comments on commit 1803fd9

Please sign in to comment.