-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LUCENE-9662: CheckIndex should be concurrent - parallelizing index check across segments #128
Changes from 1 commit
bbd0250
1ac1305
9679cea
e87d891
2009c70
ed04964
b7f3c0f
22c87c2
99cea46
c5c1e56
6ab669c
a97de22
9883192
1bc1f8a
dadfe80
57f542f
71b5279
cad119e
9c88f2f
8f949dc
51bcaa2
b39a856
b892b21
09ae809
138b72e
6cd6a46
d1b19ea
bae7e7a
70dc71c
817c050
2ef3e8d
034f209
c7e3d4d
68fe4f5
afea6a7
6400304
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,10 @@ | |
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import org.apache.lucene.codecs.Codec; | ||
import org.apache.lucene.codecs.DocValuesProducer; | ||
import org.apache.lucene.codecs.NormsProducer; | ||
|
@@ -60,6 +64,7 @@ | |
import org.apache.lucene.util.FixedBitSet; | ||
import org.apache.lucene.util.IOUtils; | ||
import org.apache.lucene.util.LongBitSet; | ||
import org.apache.lucene.util.NamedThreadFactory; | ||
import org.apache.lucene.util.StringHelper; | ||
import org.apache.lucene.util.SuppressForbidden; | ||
import org.apache.lucene.util.Version; | ||
|
@@ -605,6 +610,15 @@ public Status checkIndex(List<String> onlySegments) throws IOException { | |
result.newSegments.clear(); | ||
result.maxSegmentName = -1; | ||
|
||
// nocommit number of threads should be set dynamically | ||
ExecutorService executor = | ||
Executors.newFixedThreadPool(10, new NamedThreadFactory("async-check-index")); | ||
|
||
// nocommit the msg statements with infoStream inside this loop (as well as inside each | ||
// index part checking methods such as testLiveDocs) may be | ||
// interleaved when this section of code run concurrently. Is it ok to not synchronize for | ||
// these msg statements and instead have each msg content to include segment id information for | ||
// identification? | ||
for (int i = 0; i < numSegments; i++) { | ||
final SegmentCommitInfo info = sis.info(i); | ||
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX); | ||
|
@@ -639,6 +653,8 @@ public Status checkIndex(List<String> onlySegments) throws IOException { | |
SegmentReader reader = null; | ||
|
||
try { | ||
// nocommit these msg statements may require synchronization if printed without segment | ||
// identifier | ||
msg(infoStream, " version=" + (version == null ? "3.0" : version)); | ||
msg(infoStream, " id=" + StringHelper.idToString(info.info.getId())); | ||
final Codec codec = info.info.getCodec(); | ||
|
@@ -731,40 +747,188 @@ public Status checkIndex(List<String> onlySegments) throws IOException { | |
} | ||
|
||
if (checksumsOnly == false) { | ||
// This redundant assignment is done to make compiler happy | ||
SegmentReader finalReader = reader; | ||
|
||
// Test Livedocs | ||
segInfoStat.liveDocStatus = testLiveDocs(reader, infoStream, failFast); | ||
CompletableFuture<Void> testliveDocs = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testLiveDocs(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
liveDocStatus -> { | ||
segInfoStat.liveDocStatus = liveDocStatus; | ||
}); | ||
|
||
// Test Fieldinfos | ||
segInfoStat.fieldInfoStatus = testFieldInfos(reader, infoStream, failFast); | ||
CompletableFuture<Void> testFieldInfos = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testFieldInfos(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
fieldInfoStatus -> { | ||
segInfoStat.fieldInfoStatus = fieldInfoStatus; | ||
}); | ||
|
||
// Test Field Norms | ||
segInfoStat.fieldNormStatus = testFieldNorms(reader, infoStream, failFast); | ||
CompletableFuture<Void> testFieldNorms = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testFieldNorms(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
fieldNormStatus -> { | ||
segInfoStat.fieldNormStatus = fieldNormStatus; | ||
}); | ||
|
||
// Test the Term Index | ||
segInfoStat.termIndexStatus = | ||
testPostings(reader, infoStream, verbose, doSlowChecks, failFast); | ||
CompletableFuture<Void> testTermIndex = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testPostings( | ||
finalReader, infoStream, verbose, doSlowChecks, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
termIndexStatus -> { | ||
segInfoStat.termIndexStatus = termIndexStatus; | ||
}); | ||
|
||
// Test Stored Fields | ||
segInfoStat.storedFieldStatus = testStoredFields(reader, infoStream, failFast); | ||
CompletableFuture<Void> testStoredFields = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testStoredFields(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
storedFieldStatus -> { | ||
segInfoStat.storedFieldStatus = storedFieldStatus; | ||
}); | ||
|
||
// Test Term Vectors | ||
segInfoStat.termVectorStatus = | ||
testTermVectors(reader, infoStream, verbose, doSlowChecks, failFast); | ||
CompletableFuture<Void> testTermVectors = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testTermVectors( | ||
finalReader, infoStream, verbose, doSlowChecks, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
termVectorStatus -> { | ||
segInfoStat.termVectorStatus = termVectorStatus; | ||
}); | ||
|
||
// Test Docvalues | ||
segInfoStat.docValuesStatus = testDocValues(reader, infoStream, failFast); | ||
CompletableFuture<Void> testDocValues = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testDocValues(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
docValuesStatus -> { | ||
segInfoStat.docValuesStatus = docValuesStatus; | ||
}); | ||
|
||
// Test PointValues | ||
segInfoStat.pointsStatus = testPoints(reader, infoStream, failFast); | ||
CompletableFuture<Void> testPointvalues = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a utility method that accepts a callable and wraps in completion exception? This is a very repetitive block. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also feel this is quite repetitive and cumbersome to look at. I gave it some more tries but it seems to be a bit difficult actually. The main issue here is the
Maybe some refactoring in those For the code in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a commit that shows what I meant (and replaces three calls). Sorry for not replacing all of them. Maybe it can be made even less verbose if you fold all those blocks into a single function that accepts executor, first callable, follow-up callable and just returns completable future from the composition of these. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok I see. I think I encountered the issue earlier when I tried to use |
||
try { | ||
return testPoints(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
pointsStatus -> { | ||
segInfoStat.pointsStatus = pointsStatus; | ||
}); | ||
|
||
// Test VectorValues | ||
segInfoStat.vectorValuesStatus = testVectors(reader, infoStream, failFast); | ||
CompletableFuture<Void> testVectors = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testVectors(finalReader, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
vectorValuesStatus -> { | ||
segInfoStat.vectorValuesStatus = vectorValuesStatus; | ||
}); | ||
|
||
// Test index sort | ||
segInfoStat.indexSortStatus = testSort(reader, indexSort, infoStream, failFast); | ||
CompletableFuture<Void> testSort = | ||
CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return testSort(finalReader, indexSort, infoStream, failFast); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} | ||
}, | ||
executor) | ||
.thenAccept( | ||
indexSortStatus -> { | ||
segInfoStat.indexSortStatus = indexSortStatus; | ||
}); | ||
|
||
// Rethrow the first exception we encountered | ||
// This will cause stats for failed segments to be incremented properly | ||
// nocommit The error != null check ordering below requires sequencing the above async | ||
// calls. | ||
// Does the order really matter here or can be done differently? | ||
CompletableFuture.allOf( | ||
testliveDocs, | ||
testFieldInfos, | ||
testFieldNorms, | ||
testTermIndex, | ||
testStoredFields, | ||
testTermVectors, | ||
testDocValues, | ||
testPointvalues, | ||
testVectors, | ||
testSort) | ||
.join(); | ||
if (segInfoStat.liveDocStatus.error != null) { | ||
throw new RuntimeException("Live docs test failed"); | ||
} else if (segInfoStat.fieldInfoStatus.error != null) { | ||
|
@@ -783,6 +947,8 @@ public Status checkIndex(List<String> onlySegments) throws IOException { | |
throw new RuntimeException("Points test failed"); | ||
} | ||
} | ||
|
||
// nocommit parallelize this as well? | ||
final String softDeletesField = reader.getFieldInfos().getSoftDeletesField(); | ||
if (softDeletesField != null) { | ||
checkSoftDeletes(softDeletesField, info, reader, infoStream, failFast); | ||
|
@@ -810,6 +976,8 @@ public Status checkIndex(List<String> onlySegments) throws IOException { | |
result.newSegments.add(info.clone()); | ||
} | ||
|
||
executor.shutdown(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add await; shutdown is non-blocking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops sorry. Thought it would be ok with the |
||
|
||
if (0 == result.numBadSegments) { | ||
result.clean = true; | ||
} else | ||
|
@@ -931,6 +1099,8 @@ public static Status.LiveDocStatus testLiveDocs( | |
final Status.LiveDocStatus status = new Status.LiveDocStatus(); | ||
|
||
try { | ||
// nocommit these msg statements may require synchronization if printed without segment | ||
// identifier | ||
if (infoStream != null) infoStream.print(" test: check live docs....."); | ||
final int numDocs = reader.numDocs(); | ||
if (reader.hasDeletions()) { | ||
|
@@ -969,6 +1139,8 @@ public static Status.LiveDocStatus testLiveDocs( | |
} | ||
} | ||
} | ||
// nocommit these msg statements may require synchronization if printed without segment | ||
// identifier | ||
msg( | ||
infoStream, | ||
String.format( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say: set dynamically based on CPU count, but provide an option (command line) to override?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the checkIndex() method just take ExecutorService as a parameter? This way, e.g. the calling
main
method could configure the ExecutorService (possibly based on commandline options), unit tests can use a single thread, etc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestions! I've updated it to work for both
checkIndex
invoked frommain
, as well as invoked from other classes / methods.