Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LUCENE-9662: CheckIndex should be concurrent - parallelizing index check across segments #128

Merged
Merged
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bbd0250
LUCENE-9662: Parallelize checking each index part within each segment
zacharymorn May 6, 2021
1ac1305
Support passing in executorService through checkIndex, and awaits shu…
zacharymorn May 7, 2021
9679cea
Callable to supplier wrapper.
dweiss May 7, 2021
e87d891
Tidy.
dweiss May 7, 2021
2009c70
Use callable and function to reduce the repetitive section of code
zacharymorn May 8, 2021
ed04964
Add segment and part ids
zacharymorn May 18, 2021
b7f3c0f
"Simplify" failFast logic a bit as RuntimeException will be thrown re…
zacharymorn May 18, 2021
22c87c2
Make soft deletes check follows the same style with others
zacharymorn May 18, 2021
99cea46
Remove no longer used failFast
zacharymorn May 18, 2021
c5c1e56
Remove nocommit
zacharymorn May 18, 2021
6ab669c
Address comments
zacharymorn May 19, 2021
a97de22
Add happy path test case
zacharymorn May 25, 2021
9883192
LUCENE-9974: The test-framework module should apply the test ruleset …
dweiss May 25, 2021
1bc1f8a
Use RandomizedTest.randomIntBetween for reproducibility
zacharymorn May 26, 2021
dadfe80
Fix bugs for segment file name and -threadCount, and cap default and …
zacharymorn May 26, 2021
57f542f
Use per part buffer for synchronization
zacharymorn May 26, 2021
71b5279
Remove no longer needed constants (they didnt seems to fail precommit)
zacharymorn May 26, 2021
cad119e
Remove unneeded newline
zacharymorn May 27, 2021
9c88f2f
fix typo
zacharymorn May 27, 2021
8f949dc
Move 11 to constant
zacharymorn May 28, 2021
51bcaa2
Revert changes for concurrency within segment
zacharymorn May 29, 2021
b39a856
Small refactoring to prep for method extraction
zacharymorn May 29, 2021
b892b21
IDE automatic method extraction
zacharymorn May 29, 2021
09ae809
Refactor
zacharymorn May 29, 2021
138b72e
Add concurrency across segments, reducing total time from 300+ second…
zacharymorn May 30, 2021
6cd6a46
Add back spacing between segment output
zacharymorn May 30, 2021
d1b19ea
sync on NumberFormat
zacharymorn May 30, 2021
bae7e7a
Fix bug to work with -segment flag
zacharymorn May 31, 2021
70dc71c
Enhance console output so that result from smaller segments get print…
zacharymorn May 31, 2021
817c050
Start larger segment earlier
zacharymorn Jun 1, 2021
2ef3e8d
Remove uneeded assignment
zacharymorn Aug 26, 2021
034f209
Fix typo
zacharymorn Aug 26, 2021
c7e3d4d
Merge branch 'main' into LUCENE-9662-CheckIndex-Concurrent
zacharymorn Aug 27, 2021
68fe4f5
Fix test to use the latest vector format
zacharymorn Aug 27, 2021
afea6a7
Change threadCount handling per feedback
zacharymorn Aug 29, 2021
6400304
Add change entry
zacharymorn Aug 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 218 additions & 12 deletions lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.NormsProducer;
Expand Down Expand Up @@ -60,6 +67,7 @@
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LongBitSet;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.Version;
Expand Down Expand Up @@ -450,6 +458,13 @@ public void setChecksumsOnly(boolean v) {

private boolean checksumsOnly;

/** Set threadCount used for parallelizing index integrity checking. */
public void setThreadCount(int tc) {
threadCount = tc;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe validate the argument? It must be >= 1 at least?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

private int threadCount = Runtime.getRuntime().availableProcessors();

/**
* Set infoStream where messages should go. If null, no messages are printed. If verbose is true
* then more details are printed.
Expand Down Expand Up @@ -488,8 +503,35 @@ public Status checkIndex() throws IOException {
* quite a long time to run.
*/
public Status checkIndex(List<String> onlySegments) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't overload it here. force callers to pass executor.

I am still worried that tests are using multiple threads here which must not happen: on an 8 core machine we don't want to use 8 jvms * 8 threads each.

I am also concerned about newly-created synchronization issues here (e.g. with output). If checkindex fails it is really important that we can read this output.

All these changes leave my concerned that this is the right way to go. At the minimal, if we are going to make these changes to CheckIndex, then it needs to be done in a more cautious/defensive way (e.g. default threadcount to 1 instead of numcpus).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't overload it here. force callers to pass executor.

I took a quick look at these methods' current usage. checkIndex() has 4 references in tests and 1 in luke module, and checkIndex(onlySegments) has 6 references in tests and 1 in CheckIndex main, so it should be straightforward to remove these convenience methods and have all callers to invoke checkIndex(onlySegments, executorService). Having said that, it does seems to be a burden to put on each caller to handle executor creation and shutdown each time this method gets called though. Maybe we could keep these methods and use the 1 default threadcount executor like you suggested if one is not explicitly passed in (I also updated the default threadCount to be 1 in my latest commit) ?

I am still worried that tests are using multiple threads here which must not happen: on an 8 core machine we don't want to use 8 jvms * 8 threads each.

Oh I didn't realize it would spawn multiple jvm processes, or are you suggesting that if this gets run in multiple jvm processes for some reasons then the overall overhead will be high (which I agree!) ? Shall we do something like cpu / constant to reduce it a bit, or any other mechanism we can use to better control the number of threads based on whole system's overhead?

I am also concerned about newly-created synchronization issues here (e.g. with output). If checkindex fails it is really important that we can read this output.

Yeah definitely agree with this. For the output messages synchronization issue, I put in some nocommit comment earlier and proposed to also include segment / part id in these messages as a cheap way to allow manual sorting / grep-ing after the messages were written, to essentially work around this issue. Not sure if this will be good enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe post the CheckIndex output on a real-ish (multi-segment) index as an example? Actually I will do that on the nightly benchmark (English Wikipedia) index!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I didn't realize it would spawn multiple jvm processes,

This is just how Lucene's test infrastructure runs tests -- it spawns multiple JVMs, each of which is running one Lucene test at a time, but that test may use (often uses?) its own threads, including here in CheckIndex if we make it concurrent by default.

ExecutorService executorService =
Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index"));
try {
return checkIndex(onlySegments, executorService);
} finally {
executorService.shutdown();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (
@SuppressWarnings("unused")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignore? Seems like something went wrong there - log it at least?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. Updated.

InterruptedException e) {
} finally {
executorService.shutdownNow();
}
}
}

/**
* Returns a {@link Status} instance detailing the state of the index.
*
* <p>This method allows caller to pass in customized ExecutorService to speed up the check.
*
* <p><b>WARNING</b>: make sure you only call this when the index is not opened by any writer.
*/
public Status checkIndex(List<String> onlySegments, ExecutorService executorService)
throws IOException {
ensureOpen();
long startNS = System.nanoTime();

NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
SegmentInfos sis = null;
Status result = new Status();
Expand Down Expand Up @@ -605,6 +647,11 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
result.newSegments.clear();
result.maxSegmentName = -1;

// nocommit the msg statements with infoStream inside this loop (as well as inside each
// index part checking methods such as testLiveDocs) may be
// interleaved when this section of code run concurrently. Is it ok to not synchronize for
// these msg statements and instead have each msg content to include segment id information for
// identification?
for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = sis.info(i);
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
Expand Down Expand Up @@ -639,6 +686,8 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
SegmentReader reader = null;

try {
// nocommit these msg statements may require synchronization if printed without segment
// identifier
msg(infoStream, " version=" + (version == null ? "3.0" : version));
msg(infoStream, " id=" + StringHelper.idToString(info.info.getId()));
final Codec codec = info.info.getCodec();
Expand Down Expand Up @@ -731,40 +780,170 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
}

if (checksumsOnly == false) {
// This redundant assignment is done to make compiler happy
SegmentReader finalReader = reader;

// Test Livedocs
segInfoStat.liveDocStatus = testLiveDocs(reader, infoStream, failFast);
CompletableFuture<Void> testliveDocs =
CompletableFuture.supplyAsync(
callableToSupplier(() -> testLiveDocs(finalReader, infoStream, failFast)),
executorService)
.thenAccept(
liveDocStatus -> {
segInfoStat.liveDocStatus = liveDocStatus;
});

// Test Fieldinfos
segInfoStat.fieldInfoStatus = testFieldInfos(reader, infoStream, failFast);
CompletableFuture<Void> testFieldInfos =
CompletableFuture.supplyAsync(
callableToSupplier(() -> testFieldInfos(finalReader, infoStream, failFast)),
executorService)
.thenAccept(
fieldInfoStatus -> {
segInfoStat.fieldInfoStatus = fieldInfoStatus;
});

// Test Field Norms
segInfoStat.fieldNormStatus = testFieldNorms(reader, infoStream, failFast);
CompletableFuture<Void> testFieldNorms =
CompletableFuture.supplyAsync(
callableToSupplier(() -> testFieldNorms(finalReader, infoStream, failFast)),
executorService)
.thenAccept(
fieldNormStatus -> {
segInfoStat.fieldNormStatus = fieldNormStatus;
});

// Test the Term Index
segInfoStat.termIndexStatus =
testPostings(reader, infoStream, verbose, doSlowChecks, failFast);
CompletableFuture<Void> testTermIndex =
CompletableFuture.supplyAsync(
() -> {
try {
return testPostings(
finalReader, infoStream, verbose, doSlowChecks, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
termIndexStatus -> {
segInfoStat.termIndexStatus = termIndexStatus;
});

// Test Stored Fields
segInfoStat.storedFieldStatus = testStoredFields(reader, infoStream, failFast);
CompletableFuture<Void> testStoredFields =
CompletableFuture.supplyAsync(
() -> {
try {
return testStoredFields(finalReader, infoStream, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
storedFieldStatus -> {
segInfoStat.storedFieldStatus = storedFieldStatus;
});

// Test Term Vectors
segInfoStat.termVectorStatus =
testTermVectors(reader, infoStream, verbose, doSlowChecks, failFast);
CompletableFuture<Void> testTermVectors =
CompletableFuture.supplyAsync(
() -> {
try {
return testTermVectors(
finalReader, infoStream, verbose, doSlowChecks, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
termVectorStatus -> {
segInfoStat.termVectorStatus = termVectorStatus;
});

// Test Docvalues
segInfoStat.docValuesStatus = testDocValues(reader, infoStream, failFast);
CompletableFuture<Void> testDocValues =
CompletableFuture.supplyAsync(
() -> {
try {
return testDocValues(finalReader, infoStream, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
docValuesStatus -> {
segInfoStat.docValuesStatus = docValuesStatus;
});

// Test PointValues
segInfoStat.pointsStatus = testPoints(reader, infoStream, failFast);
CompletableFuture<Void> testPointvalues =
CompletableFuture.supplyAsync(
() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a utility method that accepts a callable and wraps in completion exception? This is a very repetitive block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel this is quite repetitive and cumbersome to look at. I gave it some more tries but it seems to be a bit difficult actually. The main issue here is the testXXX methods all throw checked IOException, and thus any callable I define that wrap around the method will need to handle the IOException there as well, instead of handling it inside the utility method, or the compiler may not be happy:

utilityMethod(() -> {
      try {
           testXXX
      } catch (IOException) {
           handle IOE
      })

Maybe some refactoring in those testXXX methods can work around the issue?

For the code in thenAccept, we should be able to simplify with generics and creating a new CheckIndexStatus that all status classes extend (right now they don't share a super class).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a commit that shows what I meant (and replaces three calls). Sorry for not replacing all of them. Maybe it can be made even less verbose if you fold all those blocks into a single function that accepts executor, first callable, follow-up callable and just returns completable future from the composition of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok I see. I think I encountered the issue earlier when I tried to use Supplier directly instead of Callable there. I've updated it according to your suggestion (I used a Function for the follow-up lambda as it takes one argument)

try {
return testPoints(finalReader, infoStream, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
pointsStatus -> {
segInfoStat.pointsStatus = pointsStatus;
});

// Test VectorValues
segInfoStat.vectorValuesStatus = testVectors(reader, infoStream, failFast);
CompletableFuture<Void> testVectors =
CompletableFuture.supplyAsync(
() -> {
try {
return testVectors(finalReader, infoStream, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
vectorValuesStatus -> {
segInfoStat.vectorValuesStatus = vectorValuesStatus;
});

// Test index sort
segInfoStat.indexSortStatus = testSort(reader, indexSort, infoStream, failFast);
CompletableFuture<Void> testSort =
CompletableFuture.supplyAsync(
() -> {
try {
return testSort(finalReader, indexSort, infoStream, failFast);
} catch (IOException e) {
throw new CompletionException(e);
}
},
executorService)
.thenAccept(
indexSortStatus -> {
segInfoStat.indexSortStatus = indexSortStatus;
});

// Rethrow the first exception we encountered
// This will cause stats for failed segments to be incremented properly
// nocommit The error != null check ordering below requires sequencing the above async
// calls.
// Does the order really matter here or can be done differently?
CompletableFuture.allOf(
testliveDocs,
testFieldInfos,
testFieldNorms,
testTermIndex,
testStoredFields,
testTermVectors,
testDocValues,
testPointvalues,
testVectors,
testSort)
.join();
if (segInfoStat.liveDocStatus.error != null) {
throw new RuntimeException("Live docs test failed");
} else if (segInfoStat.fieldInfoStatus.error != null) {
Expand All @@ -783,6 +962,8 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
throw new RuntimeException("Points test failed");
}
}

// nocommit parallelize this as well?
final String softDeletesField = reader.getFieldInfos().getSoftDeletesField();
if (softDeletesField != null) {
checkSoftDeletes(softDeletesField, info, reader, infoStream, failFast);
Expand Down Expand Up @@ -843,6 +1024,18 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
return result;
}

private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable e) {
throw new CompletionException(e);
}
};
}

/**
* Tests index sort order.
*
Expand Down Expand Up @@ -931,6 +1124,8 @@ public static Status.LiveDocStatus testLiveDocs(
final Status.LiveDocStatus status = new Status.LiveDocStatus();

try {
// nocommit these msg statements may require synchronization if printed without segment
// identifier
if (infoStream != null) infoStream.print(" test: check live docs.....");
final int numDocs = reader.numDocs();
if (reader.hasDeletions()) {
Expand Down Expand Up @@ -969,6 +1164,8 @@ public static Status.LiveDocStatus testLiveDocs(
}
}
}
// nocommit these msg statements may require synchronization if printed without segment
// identifier
msg(
infoStream,
String.format(
Expand Down Expand Up @@ -3622,6 +3819,7 @@ public static class Options {
boolean doSlowChecks = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Pre-existing] We could remove all these = false and = null. Hmm maybe there is a static ecj checker for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a spin-off issue available here https://issues.apache.org/jira/browse/LUCENE-10074.

boolean verbose = false;
boolean doChecksumsOnly = false;
int threadCount;
List<String> onlySegments = new ArrayList<>();
String indexPath = null;
String dirImpl = null;
Expand Down Expand Up @@ -3720,6 +3918,12 @@ public static Options parseOptions(String[] args) {
}
i++;
opts.dirImpl = args[i];
} else if ("-threadCount".equals(arg)) {
if (i == args.length - 1) {
throw new IllegalArgumentException("-threadCount requires a following number");
}
i++;
opts.threadCount = Integer.parseInt(args[i]);
} else {
if (opts.indexPath != null) {
throw new IllegalArgumentException("ERROR: unexpected extra argument '" + args[i] + "'");
Expand Down Expand Up @@ -3787,8 +3991,10 @@ public int doCheck(Options opts) throws IOException, InterruptedException {
setDoSlowChecks(opts.doSlowChecks);
setChecksumsOnly(opts.doChecksumsOnly);
setInfoStream(opts.out, opts.verbose);
setThreadCount(opts.threadCount);

Status result = checkIndex(opts.onlySegments);

if (result.missingSegments) {
return 1;
}
Expand Down