Skip to content

Commit

Permalink
Add concurrency across segments, reducing total time from 300+ second…
Browse files Browse the repository at this point in the history
…s to 100+ seconds
  • Loading branch information
zacharymorn committed May 30, 2021
1 parent 09ae809 commit 138b72e
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testCheckOldIndex() throws IOException {
dir.setCheckIndexOnClose(false);

// ... because we check ourselves here:
TestUtil.checkIndex(dir, false, true, null);
TestUtil.checkIndex(dir, false, true, true, null);
dir.close();
}
}
213 changes: 159 additions & 54 deletions lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.NumberFormat;
Expand All @@ -31,9 +33,14 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.NormsProducer;
Expand Down Expand Up @@ -80,7 +87,6 @@
*/
public final class CheckIndex implements Closeable {

private static final int MAX_PER_SEGMENT_CONCURRENCY = 11;
private PrintStream infoStream;
private Directory dir;
private Lock writeLock;
Expand Down Expand Up @@ -521,22 +527,29 @@ public Status checkIndex() throws IOException {
* quite a long time to run.
*/
public Status checkIndex(List<String> onlySegments) throws IOException {
ExecutorService executorService =
Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index"));
ExecutorService executorService = null;

if (threadCount > 0) {
executorService =
Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index"));
}

msg(infoStream, "Checking index with async threadCount: " + threadCount);
try {
return checkIndex(onlySegments, executorService);
} finally {
executorService.shutdown();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
msg(
infoStream,
"ERROR: Interrupted exception occurred when shutting down executor service");
if (infoStream != null) e.printStackTrace(infoStream);
} finally {
executorService.shutdownNow();
if (executorService != null) {
executorService.shutdown();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
msg(
infoStream,
"ERROR: Interrupted exception occurred when shutting down executor service");
if (infoStream != null) e.printStackTrace(infoStream);
} finally {
executorService.shutdownNow();
}
}
}
}
Expand Down Expand Up @@ -667,34 +680,101 @@ public Status checkIndex(List<String> onlySegments, ExecutorService executorServ
result.newSegments.clear();
result.maxSegmentName = -1;

for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = sis.info(i);
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
if (segmentName > result.maxSegmentName) {
result.maxSegmentName = segmentName;
// checks segments sequentially
if (executorService == null) {
for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = sis.info(i);
updateMaxSegmentName(result, info);
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
continue;
}

msg(
infoStream,
(1 + i)
+ " of "
+ numSegments
+ ": name="
+ info.info.name
+ " maxDoc="
+ info.info.maxDoc());
Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream);

processSegmentInfoStatusResult(result, info, segmentInfoStatus);
}
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
continue;
} else {
List<ByteArrayOutputStream> outputs = new ArrayList<>();
List<CompletableFuture<Status.SegmentInfoStatus>> futures = new ArrayList<>();

// checks segments concurrently
for (int i = 0; i < numSegments; i++) {
final SegmentCommitInfo info = sis.info(i);
updateMaxSegmentName(result, info);
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
continue;
}

SegmentInfos finalSis = sis;

ByteArrayOutputStream output = new ByteArrayOutputStream();
PrintStream stream;
if (i > 0) {
// buffer the messages for segment starting from the 2nd one so that they can later be
// printed in order
stream = new PrintStream(output, true, IOUtils.UTF_8);
} else {
// optimize for first segment to print real-time
stream = infoStream;
}
msg(
stream,
(1 + i)
+ " of "
+ numSegments
+ ": name="
+ info.info.name
+ " maxDoc="
+ info.info.maxDoc());

outputs.add(output);
futures.add(
runAsyncSegmentCheck(() -> testSegment(finalSis, info, stream), executorService));
}
msg(
infoStream,
(1 + i)
+ " of "
+ numSegments
+ ": name="
+ info.info.name
+ " maxDoc="
+ info.info.maxDoc());

Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream);

result.segmentInfos.add(segmentInfoStatus);
if (segmentInfoStatus.error != null) {
result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
result.numBadSegments++;
} else {
// Keeper
result.newSegments.add(info.clone());

for (int i = 0; i < numSegments; i++) {
SegmentCommitInfo info = sis.info(i);
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
continue;
}

ByteArrayOutputStream output = outputs.get(i);

// print segment results in order
Status.SegmentInfoStatus segmentInfoStatus = null;
try {
segmentInfoStatus = futures.get(i).get();
} catch (InterruptedException e) {
// the segment test output should come before interrupted exception message that follows,
// hence it's not emitted from finally clause
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());
msg(
infoStream,
"ERROR: Interrupted exception occurred when getting segment check result for segment "
+ info.info.name);
if (infoStream != null) e.printStackTrace(infoStream);
} catch (ExecutionException e) {
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());

assert failFast;
throw new CheckIndexException("Segment " + info.info.name + " check failed.", e);
}

if (i > 0) {
// first segment output already printed by infoStream
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());
}

processSegmentInfoStatusResult(result, info, segmentInfoStatus);
}
}

Expand Down Expand Up @@ -731,6 +811,42 @@ public Status checkIndex(List<String> onlySegments, ExecutorService executorServ
return result;
}

private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
if (segmentName > result.maxSegmentName) {
result.maxSegmentName = segmentName;
}
}

private void processSegmentInfoStatusResult(
Status result, SegmentCommitInfo info, Status.SegmentInfoStatus segmentInfoStatus) {
result.segmentInfos.add(segmentInfoStatus);
if (segmentInfoStatus.error != null) {
result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
result.numBadSegments++;
} else {
// Keeper
result.newSegments.add(info.clone());
}
}

private <R> CompletableFuture<R> runAsyncSegmentCheck(
Callable<R> asyncCallable, ExecutorService executorService) {
return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), executorService);
}

private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable e) {
throw new CompletionException(e);
}
};
}

private Status.SegmentInfoStatus testSegment(
SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws IOException {
Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
Expand Down Expand Up @@ -923,8 +1039,6 @@ private Status.SegmentInfoStatus testSegment(
"Soft Deletes test failed", segInfoStat.softDeletesStatus.error);
}
}

msg(infoStream, "");
} catch (Throwable t) {
if (failFast) {
throw IOUtils.rethrowAlways(t);
Expand Down Expand Up @@ -3830,15 +3944,7 @@ public static Options parseOptions(String[] args) {
throw new IllegalArgumentException("-threadCount requires a following number");
}
i++;
int providedThreadCount = Integer.parseInt(args[i]);
// Current implementation supports up to 11 concurrent checks at any time, and no
// concurrency across segments.
// Capping the thread count to 11 to avoid unnecessary threads to be created.
if (providedThreadCount > MAX_PER_SEGMENT_CONCURRENCY) {
System.out.println(
"-threadCount currently only supports up to 11 threads. Value higher than that will be capped.");
}
opts.threadCount = Math.min(providedThreadCount, MAX_PER_SEGMENT_CONCURRENCY);
opts.threadCount = Integer.parseInt(args[i]);
} else {
if (opts.indexPath != null) {
throw new IllegalArgumentException("ERROR: unexpected extra argument '" + args[i] + "'");
Expand Down Expand Up @@ -3906,10 +4012,9 @@ public int doCheck(Options opts) throws IOException, InterruptedException {
setDoSlowChecks(opts.doSlowChecks);
setChecksumsOnly(opts.doChecksumsOnly);
setInfoStream(opts.out, opts.verbose);
// when threadCount was not provided via command line, don't overwrite the default
if (opts.threadCount > 0) {
setThreadCount(opts.threadCount);
}
// when threadCount was not provided via command line, override it with 0 to turn of concurrent
// check
setThreadCount(opts.threadCount);

Status result = checkIndex(opts.onlySegments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void truncateOneFile(Directory dir, String victim) throws IOException {
// CheckIndex should also fail:
expectThrowsAnyOf(
Arrays.asList(CorruptIndexException.class, EOFException.class),
() -> TestUtil.checkIndex(dirCopy, true, true, null));
() -> TestUtil.checkIndex(dirCopy, true, true, true, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testCheckIndexAllValid() throws Exception {
}

ByteArrayOutputStream output = new ByteArrayOutputStream();
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);

assertEquals(1, status.segmentInfos.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ public void testCheckIndexIncludesPoints() throws Exception {
w.close();

ByteArrayOutputStream output = new ByteArrayOutputStream();
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);
assertEquals(1, status.segmentInfos.size());
CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0);
// total 3 point values were index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ private void swapOneFile(Directory dir1, Directory dir2, String victim) throws I
// CheckIndex should also fail:
expectThrowsAnyOf(
Arrays.asList(
CorruptIndexException.class, EOFException.class, IndexFormatTooOldException.class),
() -> TestUtil.checkIndex(dirCopy, true, true, null));
CorruptIndexException.class,
EOFException.class,
IndexFormatTooOldException.class,
CheckIndex.CheckIndexException.class),
() -> TestUtil.checkIndex(dirCopy, true, true, true, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ public void testCheckIndexIncludesVectors() throws Exception {
}

ByteArrayOutputStream output = new ByteArrayOutputStream();
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);
assertEquals(1, status.segmentInfos.size());
CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0);
// total 3 vector values were indexed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,11 @@ public synchronized void close() throws IOException {
System.out.println("\nNOTE: MockDirectoryWrapper: now run CheckIndex");
}

TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, null);
// Methods in MockDirectoryWrapper hold locks on this, which will cause deadlock when
// TestUtil#checkIndex checks segment concurrently using another thread, but making
// call back to synchronized methods such as MockDirectoryWrapper#fileLength.
// Hence passing concurrent = false to this method to turn off concurrent checks.
TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, false, null);
}

// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,19 @@ public static CheckIndex.Status checkIndex(Directory dir) throws IOException {

public static CheckIndex.Status checkIndex(Directory dir, boolean doSlowChecks)
throws IOException {
return checkIndex(dir, doSlowChecks, false, null);
return checkIndex(dir, doSlowChecks, false, true, null);
}

/**
* If failFast is true, then throw the first exception when index corruption is hit, instead of
* moving on to other fields/segments to look for any other corruption.
*/
public static CheckIndex.Status checkIndex(
Directory dir, boolean doSlowChecks, boolean failFast, ByteArrayOutputStream output)
Directory dir,
boolean doSlowChecks,
boolean failFast,
boolean concurrent,
ByteArrayOutputStream output)
throws IOException {
if (output == null) {
output = new ByteArrayOutputStream(1024);
Expand All @@ -322,7 +326,11 @@ public static CheckIndex.Status checkIndex(
checker.setDoSlowChecks(doSlowChecks);
checker.setFailFast(failFast);
checker.setInfoStream(new PrintStream(output, false, IOUtils.UTF_8), false);
checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5));
if (concurrent) {
checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5));
} else {
checker.setThreadCount(0);
}
CheckIndex.Status indexStatus = checker.checkIndex(null);

if (indexStatus == null || indexStatus.clean == false) {
Expand Down

0 comments on commit 138b72e

Please sign in to comment.