Skip to content

Commit

Permalink
Segment Replication - Fix NoSuchFileException errors caused when comp…
Browse files Browse the repository at this point in the history
…uting metadata snapshot on primary shards.

This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos.  The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit.  The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not.  This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Sep 1, 2022
1 parent 689a2c4 commit 0410038
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 142 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add timeout on Mockito.verify to reduce flakyness in testReplicationOnDone test([#4314](https://github.com/opensearch-project/OpenSearch/pull/4314))
- Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- Bugs for dependabot changelog verifier workflow ([#4364](https://github.com/opensearch-project/OpenSearch/pull/4364))
- Bugs for dependabot changelog verifier workflow ([#4364](https://github.com/opensearch-project/OpenSearch/pull/4364))
- Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.junit.BeforeClass;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
Expand Down Expand Up @@ -516,13 +515,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
IndexShard indexShard = getIndexShard(replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
indexShard.store().readLastCommittedSegmentsInfo();
}
}
}

public void testDropPrimaryDuringReplication() throws Exception {
final Settings settings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
// don't wait for replication to complete, stop the primary immediately.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertSegmentStats(6);
}
}

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception if assertion fails
Expand All @@ -541,10 +583,12 @@ private void waitForReplicaUpdate() throws Exception {
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments());
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
Expand Down
38 changes: 30 additions & 8 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -122,6 +123,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -334,6 +336,16 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Fetch a map of StoreFileMetadata for segments, ignoring commit points.
* @param segmentInfos {@link SegmentInfos} from which to compute metadata.
* @return {@link Map} map file name to {@link StoreFileMetadata}.
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -709,31 +721,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* Segment Replication method -
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -1033,6 +1048,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
return loadMetadata(segmentInfos, directory, logger, false);
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Expand Down Expand Up @@ -1067,8 +1087,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
if (ignoreSegmentsFile == false) {
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
}
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Set;
import java.util.Map;

/**
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
Expand All @@ -28,52 +27,41 @@
public class CheckpointInfoResponse extends TransportResponse {

private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot snapshot;
private final Map<String, StoreFileMetadata> snapshot;
private final byte[] infosBytes;
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
// but are still referenced by the latest commit point (Segments_N).
private final Set<StoreFileMetadata> pendingDeleteFiles;

public CheckpointInfoResponse(
final ReplicationCheckpoint checkpoint,
final Store.MetadataSnapshot snapshot,
final byte[] infosBytes,
final Set<StoreFileMetadata> additionalFiles
final Map<String, StoreFileMetadata> snapshot,
final byte[] infosBytes
) {
this.checkpoint = checkpoint;
this.snapshot = snapshot;
this.infosBytes = infosBytes;
this.pendingDeleteFiles = additionalFiles;
}

public CheckpointInfoResponse(StreamInput in) throws IOException {
this.checkpoint = new ReplicationCheckpoint(in);
this.snapshot = new Store.MetadataSnapshot(in);
this.snapshot = in.readMap(StreamInput::readString, StoreFileMetadata::new);
this.infosBytes = in.readByteArray();
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
checkpoint.writeTo(out);
snapshot.writeTo(out);
out.writeMap(snapshot, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeByteArray(infosBytes);
out.writeCollection(pendingDeleteFiles);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}

public Store.MetadataSnapshot getSnapshot() {
public Map<String, StoreFileMetadata> getSnapshot() {
return snapshot;
}

public byte[] getInfosBytes() {
return infosBytes;
}

public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
);
final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
channel.sendResponse(
new CheckpointInfoResponse(
copyState.getCheckpoint(),
copyState.getMetadataSnapshot(),
copyState.getInfosBytes(),
copyState.getPendingDeleteFiles()
)
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
timer.stop();
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Collections;

/**
* Represents the target of a replication event.
Expand Down Expand Up @@ -174,7 +170,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
final Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(checkpointInfo.getSnapshot(), Collections.emptyMap(), 0L);
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.segmentReplicationDiff(localMetadata);
logger.trace("Replication diff {}", diff);
Expand All @@ -191,24 +187,14 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
)
);
}
final List<StoreFileMetadata> filesToFetch = new ArrayList<StoreFileMetadata>(diff.missing);

Set<String> storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll()));
final Set<StoreFileMetadata> pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles()
.stream()
.filter(f -> storeFiles.contains(f.name()) == false)
.collect(Collectors.toSet());

filesToFetch.addAll(pendingDeleteFiles);
logger.trace("Files to fetch {}", filesToFetch);

for (StoreFileMetadata file : filesToFetch) {
for (StoreFileMetadata file : diff.missing) {
state.getIndex().addFileDetail(file.name(), file.length(), false);
}
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), diff.missing, store, getFilesListener);
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
Expand All @@ -227,7 +213,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
responseCheckpoint.getSegmentsGen()
);
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos));
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down
Loading

0 comments on commit 0410038

Please sign in to comment.