From 138b72e9f2512df257c9acf01516bd071c9fb1d4 Mon Sep 17 00:00:00 2001 From: Zach Chen Date: Sat, 29 May 2021 21:23:47 -0700 Subject: [PATCH] Add concurrency across segments, reducing total time from 300+ seconds to 100+ seconds --- .../TestManyPointsInOldIndex.java | 2 +- .../org/apache/lucene/index/CheckIndex.java | 213 +++++++++++++----- .../index/TestAllFilesDetectTruncation.java | 2 +- .../apache/lucene/index/TestCheckIndex.java | 2 +- .../apache/lucene/index/TestPointValues.java | 2 +- .../lucene/index/TestSwappedIndexFiles.java | 7 +- .../index/BaseVectorFormatTestCase.java | 2 +- .../lucene/store/MockDirectoryWrapper.java | 6 +- .../java/org/apache/lucene/util/TestUtil.java | 14 +- 9 files changed, 185 insertions(+), 65 deletions(-) diff --git a/lucene/backward-codecs/src/test/org/apache/lucene/backward_index/TestManyPointsInOldIndex.java b/lucene/backward-codecs/src/test/org/apache/lucene/backward_index/TestManyPointsInOldIndex.java index 18e26ad3bc33..cae0742bb982 100644 --- a/lucene/backward-codecs/src/test/org/apache/lucene/backward_index/TestManyPointsInOldIndex.java +++ b/lucene/backward-codecs/src/test/org/apache/lucene/backward_index/TestManyPointsInOldIndex.java @@ -70,7 +70,7 @@ public void testCheckOldIndex() throws IOException { dir.setCheckIndexOnClose(false); // ... because we check ourselves here: - TestUtil.checkIndex(dir, false, true, null); + TestUtil.checkIndex(dir, false, true, true, null); dir.close(); } } diff --git a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java index 3383af57dd6c..a901f9aebba9 100644 --- a/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java +++ b/lucene/core/src/java/org/apache/lucene/index/CheckIndex.java @@ -18,9 +18,11 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.text.NumberFormat; @@ -31,9 +33,14 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.NormsProducer; @@ -80,7 +87,6 @@ */ public final class CheckIndex implements Closeable { - private static final int MAX_PER_SEGMENT_CONCURRENCY = 11; private PrintStream infoStream; private Directory dir; private Lock writeLock; @@ -521,22 +527,29 @@ public Status checkIndex() throws IOException { * quite a long time to run. */ public Status checkIndex(List onlySegments) throws IOException { - ExecutorService executorService = - Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index")); + ExecutorService executorService = null; + + if (threadCount > 0) { + executorService = + Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index")); + } + msg(infoStream, "Checking index with async threadCount: " + threadCount); try { return checkIndex(onlySegments, executorService); } finally { - executorService.shutdown(); - try { - executorService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - msg( - infoStream, - "ERROR: Interrupted exception occurred when shutting down executor service"); - if (infoStream != null) e.printStackTrace(infoStream); - } finally { - executorService.shutdownNow(); + if (executorService != null) { + executorService.shutdown(); + try { + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + msg( + infoStream, + "ERROR: Interrupted exception occurred when shutting down executor service"); + if (infoStream != null) e.printStackTrace(infoStream); + } finally { + executorService.shutdownNow(); + } } } } @@ -667,34 +680,101 @@ public Status checkIndex(List onlySegments, ExecutorService executorServ result.newSegments.clear(); result.maxSegmentName = -1; - for (int i = 0; i < numSegments; i++) { - final SegmentCommitInfo info = sis.info(i); - long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX); - if (segmentName > result.maxSegmentName) { - result.maxSegmentName = segmentName; + // checks segments sequentially + if (executorService == null) { + for (int i = 0; i < numSegments; i++) { + final SegmentCommitInfo info = sis.info(i); + updateMaxSegmentName(result, info); + if (onlySegments != null && !onlySegments.contains(info.info.name)) { + continue; + } + + msg( + infoStream, + (1 + i) + + " of " + + numSegments + + ": name=" + + info.info.name + + " maxDoc=" + + info.info.maxDoc()); + Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream); + + processSegmentInfoStatusResult(result, info, segmentInfoStatus); } - if (onlySegments != null && !onlySegments.contains(info.info.name)) { - continue; + } else { + List outputs = new ArrayList<>(); + List> futures = new ArrayList<>(); + + // checks segments concurrently + for (int i = 0; i < numSegments; i++) { + final SegmentCommitInfo info = sis.info(i); + updateMaxSegmentName(result, info); + if (onlySegments != null && !onlySegments.contains(info.info.name)) { + continue; + } + + SegmentInfos finalSis = sis; + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrintStream stream; + if (i > 0) { + // buffer the messages for segment starting from the 2nd one so that they can later be + // printed in order + stream = new PrintStream(output, true, IOUtils.UTF_8); + } else { + // optimize for first segment to print real-time + stream = infoStream; + } + msg( + stream, + (1 + i) + + " of " + + numSegments + + ": name=" + + info.info.name + + " maxDoc=" + + info.info.maxDoc()); + + outputs.add(output); + futures.add( + runAsyncSegmentCheck(() -> testSegment(finalSis, info, stream), executorService)); } - msg( - infoStream, - (1 + i) - + " of " - + numSegments - + ": name=" - + info.info.name - + " maxDoc=" - + info.info.maxDoc()); - - Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream); - - result.segmentInfos.add(segmentInfoStatus); - if (segmentInfoStatus.error != null) { - result.totLoseDocCount += segmentInfoStatus.toLoseDocCount; - result.numBadSegments++; - } else { - // Keeper - result.newSegments.add(info.clone()); + + for (int i = 0; i < numSegments; i++) { + SegmentCommitInfo info = sis.info(i); + if (onlySegments != null && !onlySegments.contains(info.info.name)) { + continue; + } + + ByteArrayOutputStream output = outputs.get(i); + + // print segment results in order + Status.SegmentInfoStatus segmentInfoStatus = null; + try { + segmentInfoStatus = futures.get(i).get(); + } catch (InterruptedException e) { + // the segment test output should come before interrupted exception message that follows, + // hence it's not emitted from finally clause + infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing()); + msg( + infoStream, + "ERROR: Interrupted exception occurred when getting segment check result for segment " + + info.info.name); + if (infoStream != null) e.printStackTrace(infoStream); + } catch (ExecutionException e) { + infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing()); + + assert failFast; + throw new CheckIndexException("Segment " + info.info.name + " check failed.", e); + } + + if (i > 0) { + // first segment output already printed by infoStream + infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing()); + } + + processSegmentInfoStatusResult(result, info, segmentInfoStatus); } } @@ -731,6 +811,42 @@ public Status checkIndex(List onlySegments, ExecutorService executorServ return result; } + private void updateMaxSegmentName(Status result, SegmentCommitInfo info) { + long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX); + if (segmentName > result.maxSegmentName) { + result.maxSegmentName = segmentName; + } + } + + private void processSegmentInfoStatusResult( + Status result, SegmentCommitInfo info, Status.SegmentInfoStatus segmentInfoStatus) { + result.segmentInfos.add(segmentInfoStatus); + if (segmentInfoStatus.error != null) { + result.totLoseDocCount += segmentInfoStatus.toLoseDocCount; + result.numBadSegments++; + } else { + // Keeper + result.newSegments.add(info.clone()); + } + } + + private CompletableFuture runAsyncSegmentCheck( + Callable asyncCallable, ExecutorService executorService) { + return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), executorService); + } + + private Supplier callableToSupplier(Callable callable) { + return () -> { + try { + return callable.call(); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new CompletionException(e); + } + }; + } + private Status.SegmentInfoStatus testSegment( SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws IOException { Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus(); @@ -923,8 +1039,6 @@ private Status.SegmentInfoStatus testSegment( "Soft Deletes test failed", segInfoStat.softDeletesStatus.error); } } - - msg(infoStream, ""); } catch (Throwable t) { if (failFast) { throw IOUtils.rethrowAlways(t); @@ -3830,15 +3944,7 @@ public static Options parseOptions(String[] args) { throw new IllegalArgumentException("-threadCount requires a following number"); } i++; - int providedThreadCount = Integer.parseInt(args[i]); - // Current implementation supports up to 11 concurrent checks at any time, and no - // concurrency across segments. - // Capping the thread count to 11 to avoid unnecessary threads to be created. - if (providedThreadCount > MAX_PER_SEGMENT_CONCURRENCY) { - System.out.println( - "-threadCount currently only supports up to 11 threads. Value higher than that will be capped."); - } - opts.threadCount = Math.min(providedThreadCount, MAX_PER_SEGMENT_CONCURRENCY); + opts.threadCount = Integer.parseInt(args[i]); } else { if (opts.indexPath != null) { throw new IllegalArgumentException("ERROR: unexpected extra argument '" + args[i] + "'"); @@ -3906,10 +4012,9 @@ public int doCheck(Options opts) throws IOException, InterruptedException { setDoSlowChecks(opts.doSlowChecks); setChecksumsOnly(opts.doChecksumsOnly); setInfoStream(opts.out, opts.verbose); - // when threadCount was not provided via command line, don't overwrite the default - if (opts.threadCount > 0) { - setThreadCount(opts.threadCount); - } + // when threadCount was not provided via command line, override it with 0 to turn of concurrent + // check + setThreadCount(opts.threadCount); Status result = checkIndex(opts.onlySegments); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestAllFilesDetectTruncation.java b/lucene/core/src/test/org/apache/lucene/index/TestAllFilesDetectTruncation.java index ed272b883b9c..8c94f87f3b46 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestAllFilesDetectTruncation.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestAllFilesDetectTruncation.java @@ -121,7 +121,7 @@ private void truncateOneFile(Directory dir, String victim) throws IOException { // CheckIndex should also fail: expectThrowsAnyOf( Arrays.asList(CorruptIndexException.class, EOFException.class), - () -> TestUtil.checkIndex(dirCopy, true, true, null)); + () -> TestUtil.checkIndex(dirCopy, true, true, true, null)); } } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestCheckIndex.java b/lucene/core/src/test/org/apache/lucene/index/TestCheckIndex.java index 2af80949ec83..df7b528518c5 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestCheckIndex.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestCheckIndex.java @@ -122,7 +122,7 @@ public void testCheckIndexAllValid() throws Exception { } ByteArrayOutputStream output = new ByteArrayOutputStream(); - CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output); + CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output); assertEquals(1, status.segmentInfos.size()); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java b/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java index 83c6ce4f4004..8ca1e1a917dd 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java @@ -726,7 +726,7 @@ public void testCheckIndexIncludesPoints() throws Exception { w.close(); ByteArrayOutputStream output = new ByteArrayOutputStream(); - CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output); + CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output); assertEquals(1, status.segmentInfos.size()); CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0); // total 3 point values were index: diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSwappedIndexFiles.java b/lucene/core/src/test/org/apache/lucene/index/TestSwappedIndexFiles.java index 659745462ff9..d11f50dc3754 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSwappedIndexFiles.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSwappedIndexFiles.java @@ -116,8 +116,11 @@ private void swapOneFile(Directory dir1, Directory dir2, String victim) throws I // CheckIndex should also fail: expectThrowsAnyOf( Arrays.asList( - CorruptIndexException.class, EOFException.class, IndexFormatTooOldException.class), - () -> TestUtil.checkIndex(dirCopy, true, true, null)); + CorruptIndexException.class, + EOFException.class, + IndexFormatTooOldException.class, + CheckIndex.CheckIndexException.class), + () -> TestUtil.checkIndex(dirCopy, true, true, true, null)); } } } diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseVectorFormatTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseVectorFormatTestCase.java index 3e0b18243964..cc8168d400a6 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseVectorFormatTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseVectorFormatTestCase.java @@ -941,7 +941,7 @@ public void testCheckIndexIncludesVectors() throws Exception { } ByteArrayOutputStream output = new ByteArrayOutputStream(); - CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output); + CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output); assertEquals(1, status.segmentInfos.size()); CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0); // total 3 vector values were indexed: diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java index 7dcf36c5eb5c..bb1f5a639213 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java +++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java @@ -895,7 +895,11 @@ public synchronized void close() throws IOException { System.out.println("\nNOTE: MockDirectoryWrapper: now run CheckIndex"); } - TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, null); + // Methods in MockDirectoryWrapper hold locks on this, which will cause deadlock when + // TestUtil#checkIndex checks segment concurrently using another thread, but making + // call back to synchronized methods such as MockDirectoryWrapper#fileLength. + // Hence passing concurrent = false to this method to turn off concurrent checks. + TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, false, null); } // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java b/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java index 180f8ccfbc1d..f9fd2c016d29 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java +++ b/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java @@ -302,7 +302,7 @@ public static CheckIndex.Status checkIndex(Directory dir) throws IOException { public static CheckIndex.Status checkIndex(Directory dir, boolean doSlowChecks) throws IOException { - return checkIndex(dir, doSlowChecks, false, null); + return checkIndex(dir, doSlowChecks, false, true, null); } /** @@ -310,7 +310,11 @@ public static CheckIndex.Status checkIndex(Directory dir, boolean doSlowChecks) * moving on to other fields/segments to look for any other corruption. */ public static CheckIndex.Status checkIndex( - Directory dir, boolean doSlowChecks, boolean failFast, ByteArrayOutputStream output) + Directory dir, + boolean doSlowChecks, + boolean failFast, + boolean concurrent, + ByteArrayOutputStream output) throws IOException { if (output == null) { output = new ByteArrayOutputStream(1024); @@ -322,7 +326,11 @@ public static CheckIndex.Status checkIndex( checker.setDoSlowChecks(doSlowChecks); checker.setFailFast(failFast); checker.setInfoStream(new PrintStream(output, false, IOUtils.UTF_8), false); - checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5)); + if (concurrent) { + checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5)); + } else { + checker.setThreadCount(0); + } CheckIndex.Status indexStatus = checker.checkIndex(null); if (indexStatus == null || indexStatus.clean == false) {