Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect generational files in recoveryDiff #55239

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 91 additions & 62 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
Expand All @@ -53,6 +54,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -67,7 +69,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -101,6 +102,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -910,8 +912,6 @@ public Map<String, StoreFileMetadata> asMap() {
return metadata;
}

private static final String DEL_FILE_EXTENSION = "del"; // legacy delete file
private static final String LIV_FILE_EXTENSION = "liv"; // lucene 5 delete file
private static final String SEGMENT_INFO_EXTENSION = "si";

/**
Expand All @@ -922,80 +922,110 @@ public Map<String, StoreFileMetadata> asMap() {
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
* This method groups file into per-segment files and per-commit files. A file is treated as
* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated
* as identical iff:
* <ul>
* <li>all files in this segment have the same checksum</li>
* <li>all files in this segment have the same length</li>
* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code .si} file content as it's hash</li>
* </ul>
* <p>
* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be
* unique segment identifiers in there hardening this method further.
* Individual files are compared by name, length and checksum. The segment info ({@code *.si}) files and the segments file
* ({@code segments_N}) are also checked to be a byte-for-byte match.
* <p>
* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files
* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated
* as identical iff:
* <ul>
* <li>all files belonging to this commit have the same checksum</li>
* <li>all files belonging to this commit have the same length</li>
* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,
* The metadata transfers the {@code segments_N} file content as it's hash</li>
* </ul>
* Files are collected together into a group for each segment plus one group of "per-commit" ({@code segments_N}) files. Each
* per-segment group is subdivided into a nongenerational group (most of them) and a generational group (e.g. {@code *.liv},
* {@code *.fnm}, {@code *.dvm}, {@code *.dvd} that have been updated by subsequent commits).
* <p>
* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.
* For each segment, if any nongenerational files are different then the whole segment is considered to be different and will be
* recovered in full. If all the nongenerational files are the same but any generational files are different then all the
* generational files are considered to be different and will be recovered in full, but the nongenerational files are left alone.
* Finally, if any file is different then all the per-commit files are recovered too.
*/
/* Future work: the {@code *.si} file includes {@link SegmentInfo#getId()} which is a globally unique identifier for the
* nongenerational files in the segment so we could compare that instead of using the files lengths and checksums. We may also get a
* similar ID for the generational files in https://issues.apache.org/jira/browse/LUCENE-9324.
* TODO follow up once this Lucene discussion closes.
*/
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
final List<StoreFileMetadata> perCommitSourceFiles = new ArrayList<>();
final Map<String, Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>>> perSegmentSourceFiles = new HashMap<>();
// per segment, a tuple of <<non-generational files, generational files>>

for (StoreFileMetadata sourceFile : this) {
if (sourceFile.name().startsWith("_")) {
final String segmentId = IndexFileNames.parseSegmentName(sourceFile.name());
final long generation = IndexFileNames.parseGeneration(sourceFile.name());
final Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> perSegmentTuple = perSegmentSourceFiles
.computeIfAbsent(segmentId, k -> Tuple.tuple(new ArrayList<>(), new ArrayList<>()));
(generation == 0 ? perSegmentTuple.v1() : perSegmentTuple.v2()).add(sourceFile);
} else {
assert sourceFile.name().startsWith(IndexFileNames.SEGMENTS + "_") : "unexpected " + sourceFile;
perCommitSourceFiles.add(sourceFile);
}
}

final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();

for (StoreFileMetadata meta : this) {
if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is no longer relevant but looking for confirmation from someone who knows the history here.

continue; // we don't need that file at all
final List<StoreFileMetadata> tmpIdentical = new ArrayList<>(); // confirm whole group is identical before adding to 'identical'
final Predicate<List<StoreFileMetadata>> groupComparer = sourceGroup -> {
assert tmpIdentical.isEmpty() : "not cleaned up: " + tmpIdentical;
boolean groupIdentical = true;
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
groupIdentical = false;
missing.add(sourceFile);
} else if (groupIdentical && targetFile.isSame(sourceFile)) {
tmpIdentical.add(sourceFile);
} else {
groupIdentical = false;
different.add(sourceFile);
}
}
final String segmentId = IndexFileNames.parseSegmentName(meta.name());
final String extension = IndexFileNames.getExtension(meta.name());
if (IndexFileNames.SEGMENTS.equals(segmentId) ||
DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {
// only treat del files as per-commit files fnm files are generational but only for upgradable DV
perCommitStoreFiles.add(meta);
if (groupIdentical) {
identical.addAll(tmpIdentical);
} else {
perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);
different.addAll(tmpIdentical);
}
}
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
consistent = false;
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
tmpIdentical.clear();
return groupIdentical;
};
final Consumer<List<StoreFileMetadata>> allDifferent = sourceGroup -> {
for (StoreFileMetadata sourceFile : sourceGroup) {
final StoreFileMetadata targetFile = targetSnapshot.get(sourceFile.name());
if (targetFile == null) {
missing.add(sourceFile);
} else {
identicalFiles.add(meta);
different.add(sourceFile);
}
}
if (consistent) {
identical.addAll(identicalFiles);
};

boolean segmentsIdentical = true;

for (Tuple<List<StoreFileMetadata>, List<StoreFileMetadata>> segmentFiles : perSegmentSourceFiles.values()) {
final List<StoreFileMetadata> nonGenerationalFiles = segmentFiles.v1();
final List<StoreFileMetadata> generationalFiles = segmentFiles.v2();

if (groupComparer.test(nonGenerationalFiles)) {
// non-generational files are identical, now check the generational files
segmentsIdentical = groupComparer.test(generationalFiles) && segmentsIdentical;
} else {
// make sure all files are added - this can happen if only the deletes are different
different.addAll(identicalFiles);
// non-generational files were different, so consider the whole segment as different
segmentsIdentical = false;
allDifferent.accept(generationalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),
Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0)
: "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +
this.metadata.size() + "] contains segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";

if (segmentsIdentical) {
// segments were the same, check the per-commit files
groupComparer.test(perCommitSourceFiles);
} else {
// at least one segment was different, so treat all the per-commit files as different too
allDifferent.accept(perCommitSourceFiles);
}

final RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing));
assert recoveryDiff.size() == metadata.size() : "some files are missing: recoveryDiff is [" + recoveryDiff
+ "] comparing: [" + metadata + "] to [" + targetSnapshot.metadata + "]";
return recoveryDiff;
}

Expand Down Expand Up @@ -1099,7 +1129,6 @@ public String toString() {
}
}


/**
* Returns true if the file is auto-generated by the store and shouldn't be deleted during cleanup.
* This includes write lock and checksum files
Expand Down
Loading