From b275ffed8d6138b4312b7fe2dc42980db5218f6a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Oct 2024 11:36:05 +0530 Subject: [PATCH] [Backport] Changes required for Dart (#17046) (#17047) (#17048) (#17066) (#17074) (#17076) (#17077) (#17193) (#17243) Backport the following patches for a clean backport of Dart changes 1. Add "targetPartitionsPerWorker" setting for MSQ. (#17048) 2. MSQ: Improved worker cancellation. (#17046) 3. Add "includeAllCounters()" to WorkerContext. (#17047) 4. MSQ: Include worker context maps in WorkOrders. (#17076) 5. TableInputSpecSlicer changes to support running on Brokers. (#17074) 6. Fix call to MemoryIntrospector in IndexerControllerContext. (#17066) 7. MSQ: Add QueryKitSpec to encapsulate QueryKit params. (#17077) 8. MSQ: Use task context flag useConcurrentLocks to determine task lock type (#17193) --- .../frame/FrameChannelMergerBenchmark.java | 8 +- .../druid/msq/exec/ControllerContext.java | 27 ++- .../apache/druid/msq/exec/ControllerImpl.java | 106 +++++----- .../apache/druid/msq/exec/RunWorkOrder.java | 137 ++++++++++-- .../druid/msq/exec/RunWorkOrderListener.java | 2 +- .../apache/druid/msq/exec/WorkerContext.java | 6 + .../org/apache/druid/msq/exec/WorkerImpl.java | 127 +++++++----- .../indexing/IndexerControllerContext.java | 119 ++++++++--- .../IndexerTableInputSpecSlicer.java} | 14 +- .../msq/indexing/IndexerWorkerContext.java | 11 +- .../druid/msq/indexing/MSQControllerTask.java | 24 ++- .../msq/input/table/SegmentsInputSlice.java | 2 +- .../apache/druid/msq/kernel/WorkOrder.java | 70 ++++++- .../controller/ControllerQueryKernel.java | 3 +- .../ControllerQueryKernelConfig.java | 62 ++++-- .../msq/querykit/DataSegmentProvider.java | 8 +- .../msq/querykit/DataSegmentTimelineView.java | 49 ----- .../druid/msq/querykit/DataSourcePlan.java | 117 ++++------- .../druid/msq/querykit/MultiQueryKit.java | 8 +- .../apache/druid/msq/querykit/QueryKit.java | 10 +- .../druid/msq/querykit/QueryKitSpec.java | 109 ++++++++++ .../msq/querykit/WindowOperatorQueryKit.java | 28 +-- .../msq/querykit/groupby/GroupByQueryKit.java | 31 +-- .../druid/msq/querykit/scan/ScanQueryKit.java | 17 +- .../msq/util/MultiStageQueryContext.java | 32 +-- .../druid/msq/exec/QueryValidatorTest.java | 3 +- .../apache/druid/msq/exec/WorkerImplTest.java | 88 ++++++++ .../msq/indexing/MSQControllerTaskTest.java | 196 +++++++++--------- ...a => IndexerTableInputSpecSlicerTest.java} | 7 +- .../ControllerQueryKernelConfigTest.java | 83 ++++++++ .../msq/querykit/FrameProcessorTestBase.java | 9 +- .../msq/test/MSQTestControllerContext.java | 43 +++- .../test/MSQTestOverlordServiceClient.java | 2 +- .../druid/msq/test/MSQTestWorkerContext.java | 6 + .../processor/FrameProcessorExecutor.java | 83 ++++++-- .../frame/processor/RunAllFullyWidget.java | 6 +- .../processor/RunnableFrameProcessor.java | 65 ++++++ .../druid/frame/processor/SuperSorter.java | 4 +- .../processor/FrameProcessorExecutorTest.java | 23 ++ .../processor/RunAllFullyWidgetTest.java | 4 +- 40 files changed, 1237 insertions(+), 512 deletions(-) rename extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/{input/table/TableInputSpecSlicer.java => indexing/IndexerTableInputSpecSlicer.java} (96%) delete mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java rename extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/{TableInputSpecSlicerTest.java => IndexerTableInputSpecSlicerTest.java} (98%) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java create mode 100644 processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java index a57b7a116c4e..25f9015de2b9 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.guava.FutureUtils; @@ -203,6 +204,7 @@ public int getChannelNumber(int rowNumber, int numRows, int numChannels) private final List sortKey = ImmutableList.of(new KeyColumn(KEY, KeyOrder.ASCENDING)); private List> channelFrames; + private ListeningExecutorService innerExec; private FrameProcessorExecutor exec; private List channels; @@ -226,7 +228,7 @@ public void setupTrial() frameReader = FrameReader.create(signature); exec = new FrameProcessorExecutor( - MoreExecutors.listeningDecorator( + innerExec = MoreExecutors.listeningDecorator( Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName())) ) ); @@ -335,8 +337,8 @@ public void setupInvocation() throws IOException @TearDown(Level.Trial) public void tearDown() throws Exception { - exec.getExecutorService().shutdownNow(); - if (!exec.getExecutorService().awaitTermination(1, TimeUnit.MINUTES)) { + innerExec.shutdownNow(); + if (!innerExec.awaitTermination(1, TimeUnit.MINUTES)) { throw new ISE("Could not terminate executor after 1 minute"); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 40b114511c28..687660ba750d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Injector; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -28,8 +29,10 @@ import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; -import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; +import org.apache.druid.query.Query; import org.apache.druid.server.DruidNode; /** @@ -41,7 +44,7 @@ public interface ControllerContext /** * Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}. */ - ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef); + ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec); /** * Callback from the controller implementation to "register" the controller. Used in the indexing task implementation @@ -73,7 +76,7 @@ public interface ControllerContext /** * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. */ - InputSpecSlicer newTableInputSpecSlicer(); + InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); /** * Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where @@ -81,12 +84,17 @@ public interface ControllerContext */ TaskActionClient taskActionClient(); + /** + * Task lock type. + */ + TaskLockType taskLockType(); + /** * Provides services about workers: starting, canceling, obtaining status. * * @param queryId query ID * @param querySpec query spec - * @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)} + * @param queryKernelConfig config from {@link #queryKernelConfig(String, MSQSpec)} * @param workerFailureListener listener that receives callbacks when workers fail */ WorkerManager newWorkerManager( @@ -100,4 +108,15 @@ WorkerManager newWorkerManager( * Client for communicating with workers. */ WorkerClient newWorkerClient(); + + /** + * Create a {@link QueryKitSpec}. This method provides controller contexts a way to customize parameters around the + * number of workers and partitions. + */ + QueryKitSpec makeQueryKitSpec( + QueryKit> queryKit, + String queryId, + MSQSpec querySpec, + ControllerQueryKernelConfig queryKernelConfig + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 6858c57fe3d2..2d0a6212a0ad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -151,6 +151,7 @@ import org.apache.druid.msq.kernel.controller.WorkerInputs; import org.apache.druid.msq.querykit.MultiQueryKit; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.WindowOperatorQueryKit; @@ -224,6 +225,7 @@ public class ControllerImpl implements Controller { private static final Logger log = new Logger(ControllerImpl.class); + private static final String RESULT_READER_CANCELLATION_ID = "result-reader"; private final String queryId; private final MSQSpec querySpec; @@ -364,7 +366,7 @@ private void runInternal(final QueryListener queryListener, final Closer closer) // Execution-related: run the multi-stage QueryDefinition. final InputSpecSlicerFactory inputSpecSlicerFactory = - makeInputSpecSlicerFactory(context.newTableInputSpecSlicer()); + makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager)); final Pair> queryRunResult = new RunQueryUntilDone( @@ -560,12 +562,12 @@ public void addToKernelManipulationQueue(Consumer kernelC private QueryDefinition initializeQueryDefAndState(final Closer closer) { this.selfDruidNode = context.selfNode(); - this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient()); - closer.register(netClient); + this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient())); + this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec); + final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( - queryId(), - makeQueryControllerToolKit(), + context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), querySpec, context.jsonMapper(), resultsContext @@ -587,7 +589,6 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) QueryValidator.validateQueryDef(queryDef); queryDefRef.set(queryDef); - queryKernelConfig = context.queryKernelConfig(querySpec, queryDef); workerManager = context.newWorkerManager( queryId, querySpec, @@ -612,7 +613,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) ); } - final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); + final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext); this.faultsExceededChecker = new FaultsExceededChecker( ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions) ); @@ -624,7 +625,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) stageDefinition.getId().getStageNumber(), finalizeClusterStatisticsMergeMode( stageDefinition, - MultiStageQueryContext.getClusterStatisticsMergeMode(querySpec.getQuery().context()) + MultiStageQueryContext.getClusterStatisticsMergeMode(queryContext) ) ) ); @@ -920,7 +921,7 @@ private List generateSegmentIdsWithShardSpecs( destination, partitionBoundaries, keyReader, - MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), false), + context.taskLockType(), isStageOutputEmpty ); } @@ -1191,7 +1192,7 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( } @SuppressWarnings("rawtypes") - private QueryKit makeQueryControllerToolKit() + private QueryKit> makeQueryControllerToolKit() { final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() @@ -1328,10 +1329,7 @@ private void publishAllSegments( (DataSourceMSQDestination) querySpec.getDestination(); final Set segmentsWithTombstones = new HashSet<>(segments); int numTombstones = 0; - final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType( - QueryContext.of(querySpec.getQuery().getContext()), - destination.isReplaceTimeChunks() - ); + final TaskLockType taskLockType = context.taskLockType(); if (destination.isReplaceTimeChunks()) { final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments")); @@ -1715,8 +1713,7 @@ private void cleanUpDurableStorageIfNeeded() @SuppressWarnings("unchecked") private static QueryDefinition makeQueryDefinition( - final String queryId, - @SuppressWarnings("rawtypes") final QueryKit toolKit, + final QueryKitSpec queryKitSpec, final MSQSpec querySpec, final ObjectMapper jsonMapper, final ResultsContext resultsContext @@ -1725,11 +1722,11 @@ private static QueryDefinition makeQueryDefinition( final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; - final ShuffleSpecFactory shuffleSpecFactory; + final ShuffleSpecFactory resultShuffleSpecFactory; if (MSQControllerTask.isIngestion(querySpec)) { - shuffleSpecFactory = querySpec.getDestination() - .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); + resultShuffleSpecFactory = querySpec.getDestination() + .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); if (!columnMappings.hasUniqueOutputColumnNames()) { // We do not expect to hit this case in production, because the SQL validator checks that column names @@ -1753,7 +1750,7 @@ private static QueryDefinition makeQueryDefinition( queryToPlan = querySpec.getQuery(); } } else { - shuffleSpecFactory = + resultShuffleSpecFactory = querySpec.getDestination() .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())); queryToPlan = querySpec.getQuery(); @@ -1762,12 +1759,10 @@ private static QueryDefinition makeQueryDefinition( final QueryDefinition queryDef; try { - queryDef = toolKit.makeQueryDefinition( - queryId, + queryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, queryToPlan, - toolKit, - shuffleSpecFactory, - tuningConfig.getMaxNumWorkers(), + resultShuffleSpecFactory, 0 ); } @@ -1796,7 +1791,7 @@ private static QueryDefinition makeQueryDefinition( // Add all query stages. // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { if (stageDef.equals(finalShuffleStageDef)) { @@ -1822,7 +1817,7 @@ private static QueryDefinition makeQueryDefinition( // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); if (finalShuffleStageDef.doesSortDuringShuffle()) { - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1859,7 +1854,7 @@ private static QueryDefinition makeQueryDefinition( } final ResultFormat resultFormat = exportMSQDestination.getResultFormat(); - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1867,7 +1862,7 @@ private static QueryDefinition makeQueryDefinition( .signature(queryDef.getFinalStageDefinition().getSignature()) .shuffleSpec(null) .processorFactory(new ExportResultsFrameProcessorFactory( - queryId, + queryKitSpec.getQueryId(), exportStorageProvider, resultFormat, columnMappings, @@ -2183,6 +2178,34 @@ private static void logKernelStatus(final String queryId, final ControllerQueryK } } + /** + * Create a result-reader executor for {@link RunQueryUntilDone#readQueryResults()}. + */ + private static FrameProcessorExecutor createResultReaderExec(final String queryId) + { + return new FrameProcessorExecutor( + MoreExecutors.listeningDecorator( + Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId + "]"))) + ); + } + + /** + * Cancel any currently-running work and shut down a result-reader executor, like one created by + * {@link #createResultReaderExec(String)}. + */ + private static void closeResultReaderExec(final FrameProcessorExecutor exec) + { + try { + exec.cancel(RESULT_READER_CANCELLATION_ID); + } + catch (Exception e) { + throw new RuntimeException(e); + } + finally { + exec.shutdownNow(); + } + } + private void stopExternalFetchers() { if (workerSketchFetcher != null) { @@ -2692,12 +2715,9 @@ private void startQueryResultsReader() inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> taskIds); } - final FrameProcessorExecutor resultReaderExec = new FrameProcessorExecutor( - MoreExecutors.listeningDecorator( - Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId() + "]"))) - ); + final FrameProcessorExecutor resultReaderExec = createResultReaderExec(queryId()); + resultReaderExec.registerCancellationId(RESULT_READER_CANCELLATION_ID); - final String cancellationId = "results-reader"; ReadableConcatFrameChannel resultsChannel = null; try { @@ -2707,7 +2727,7 @@ private void startQueryResultsReader() inputChannelFactory, () -> ArenaMemoryAllocator.createOnHeap(5_000_000), resultReaderExec, - cancellationId, + RESULT_READER_CANCELLATION_ID, null, MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()) ); @@ -2741,7 +2761,7 @@ private void startQueryResultsReader() queryListener ); - queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, cancellationId); + queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, RESULT_READER_CANCELLATION_ID); // When results are done being read, kick the main thread. // Important: don't use FutureUtils.futureWithBaggage, because we need queryResultsReaderFuture to resolve @@ -2758,23 +2778,13 @@ private void startQueryResultsReader() e, () -> CloseableUtils.closeAll( finalResultsChannel, - () -> resultReaderExec.getExecutorService().shutdownNow() + () -> closeResultReaderExec(resultReaderExec) ) ); } // Result reader is set up. Register with the query-wide closer. - closer.register(() -> { - try { - resultReaderExec.cancel(cancellationId); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - resultReaderExec.getExecutorService().shutdownNow(); - } - }); + closer.register(() -> closeResultReaderExec(resultReaderExec)); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 4d028147af02..3d31d7e2c3ee 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -56,6 +57,7 @@ import org.apache.druid.frame.processor.manager.ProcessorManagers; import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -67,6 +69,8 @@ import org.apache.druid.msq.indexing.CountingOutputChannelFactory; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.InputChannelsImpl; +import org.apache.druid.msq.indexing.error.CanceledFault; +import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; @@ -94,7 +98,6 @@ import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; -import org.apache.druid.utils.CloseableUtils; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import javax.annotation.Nullable; @@ -104,7 +107,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -112,7 +116,29 @@ */ public class RunWorkOrder { - private final String controllerTaskId; + enum State + { + /** + * Initial state. Must be in this state to call {@link #startAsync()}. + */ + INIT, + + /** + * State entered upon calling {@link #startAsync()}. + */ + STARTED, + + /** + * State entered upon calling {@link #stop()}. + */ + STOPPING, + + /** + * State entered when a call to {@link #stop()} concludes. + */ + STOPPED + } + private final WorkOrder workOrder; private final InputChannelFactory inputChannelFactory; private final CounterTracker counterTracker; @@ -125,7 +151,9 @@ public class RunWorkOrder private final boolean reindex; private final boolean removeNullBytes; private final ByteTracker intermediateSuperSorterLocalStorageTracker; - private final AtomicBoolean started = new AtomicBoolean(); + private final AtomicReference state = new AtomicReference<>(State.INIT); + private final CountDownLatch stopLatch = new CountDownLatch(1); + private final AtomicReference> resultForListener = new AtomicReference<>(); @MonotonicNonNull private InputSliceReader inputSliceReader; @@ -141,7 +169,6 @@ public class RunWorkOrder private ListenableFuture stageOutputChannelsFuture; public RunWorkOrder( - final String controllerTaskId, final WorkOrder workOrder, final InputChannelFactory inputChannelFactory, final CounterTracker counterTracker, @@ -154,7 +181,6 @@ public RunWorkOrder( final boolean removeNullBytes ) { - this.controllerTaskId = controllerTaskId; this.workOrder = workOrder; this.inputChannelFactory = inputChannelFactory; this.counterTracker = counterTracker; @@ -180,15 +206,16 @@ public RunWorkOrder( * Execution proceeds asynchronously after this method returns. The {@link RunWorkOrderListener} passed to the * constructor of this instance can be used to track progress. */ - public void start() throws IOException + public void startAsync() { - if (started.getAndSet(true)) { - throw new ISE("Already started"); + if (!state.compareAndSet(State.INIT, State.STARTED)) { + throw new ISE("Cannot start from state[%s]", state); } final StageDefinition stageDef = workOrder.getStageDefinition(); try { + exec.registerCancellationId(cancellationId); makeInputSliceReader(); makeWorkOutputChannelFactory(); makeShuffleOutputChannelFactory(); @@ -205,16 +232,78 @@ public void start() throws IOException setUpCompletionCallbacks(); } catch (Throwable t) { - // If start() has problems, cancel anything that was already kicked off, and close the FrameContext. + stopUnchecked(); + } + } + + /** + * Stops an execution that was previously initiated through {@link #startAsync()} and closes the {@link FrameContext}. + * May be called to cancel execution. Must also be called after successful execution in order to ensure that resources + * are all properly cleaned up. + * + * Blocks until execution is fully stopped. + */ + public void stop() throws InterruptedException + { + if (state.compareAndSet(State.INIT, State.STOPPING) + || state.compareAndSet(State.STARTED, State.STOPPING)) { + // Initiate stopping. + Throwable e = null; + try { exec.cancel(cancellationId); } - catch (Throwable t2) { - t.addSuppressed(t2); + catch (Throwable e2) { + e = e2; } - CloseableUtils.closeAndSuppressExceptions(frameContext, t::addSuppressed); - throw t; + try { + frameContext.close(); + } + catch (Throwable e2) { + if (e == null) { + e = e2; + } else { + e.addSuppressed(e2); + } + } + + try { + // notifyListener will ignore this cancellation error if work has already succeeded. + notifyListener(Either.error(new MSQException(CanceledFault.instance()))); + } + catch (Throwable e2) { + if (e == null) { + e = e2; + } else { + e.addSuppressed(e2); + } + } + + stopLatch.countDown(); + + if (e != null) { + Throwables.throwIfInstanceOf(e, InterruptedException.class); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + stopLatch.await(); + } + + /** + * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link InterruptedException}, this method sets + * the interrupt flag and throws an unchecked exception. + */ + public void stopUnchecked() + { + try { + stop(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } @@ -459,19 +548,33 @@ public void onSuccess(final List workerResultAndOutputChannelsResolved) writeDurableStorageSuccessFile(); } - listener.onSuccess(resultObject); + notifyListener(Either.value(resultObject)); } @Override public void onFailure(final Throwable t) { - listener.onFailure(t); + notifyListener(Either.error(t)); } }, Execs.directExecutor() ); } + /** + * Notify {@link RunWorkOrderListener} that the job is done, if not already notified. + */ + private void notifyListener(final Either result) + { + if (resultForListener.compareAndSet(null, result)) { + if (result.isError()) { + listener.onFailure(result.error()); + } else { + listener.onSuccess(result.valueOrThrow()); + } + } + } + /** * Write {@link DurableStorageUtils#SUCCESS_MARKER_FILENAME} for a particular stage, if durable storage is enabled. */ @@ -561,7 +664,7 @@ private DurableStorageOutputChannelFactory makeDurableStorageOutputChannelFactor ) { return DurableStorageOutputChannelFactory.createStandardImplementation( - controllerTaskId, + workerContext.queryId(), workOrder.getWorkerNumber(), workOrder.getStageNumber(), workerContext.workerId(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java index 19c3c6570fe9..8bffd6f8179f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; /** - * Listener for various things that may happen during execution of {@link RunWorkOrder#start()}. Listener methods are + * Listener for various things that may happen during execution of {@link RunWorkOrder#startAsync()}. Listener methods are * fired in processing threads, so they must be thread-safe, and it is important that they run quickly. */ public interface RunWorkOrderListener diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java index 96c7ad20697d..90082fcf0dd0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java @@ -26,6 +26,7 @@ import org.apache.druid.msq.kernel.FrameContext; import org.apache.druid.msq.kernel.FrameProcessorFactory; import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.server.DruidNode; import java.io.File; @@ -98,4 +99,9 @@ public interface WorkerContext DruidNode selfNode(); DataServerQueryHandlerFactory dataServerQueryHandlerFactory(); + + /** + * Whether to include all counters in reports. See {@link MultiStageQueryContext#CTX_INCLUDE_ALL_COUNTERS} for detail. + */ + boolean includeAllCounters(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 1dc00946da41..702302f7ea1a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -48,7 +48,6 @@ import org.apache.druid.msq.counters.CounterTracker; import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.MSQWorkerTask; -import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.indexing.error.MSQErrorReport; @@ -367,28 +366,19 @@ private void handleNewWorkOrder( final WorkerStageKernel kernel = kernelHolder.kernel; final WorkOrder workOrder = kernel.getWorkOrder(); final StageDefinition stageDefinition = workOrder.getStageDefinition(); - final String cancellationId = cancellationIdFor(stageDefinition.getId()); + final String cancellationId = cancellationIdFor(stageDefinition.getId(), workOrder.getWorkerNumber()); log.info( - "Processing work order for stage[%s]%s", + "Starting work order for stage[%s], workerNumber[%d]%s", stageDefinition.getId(), + workOrder.getWorkerNumber(), (log.isDebugEnabled() ? StringUtils.format(", payload[%s]", context.jsonMapper().writeValueAsString(workOrder)) : "") ); - final FrameContext frameContext = kernelHolder.processorCloser.register(context.frameContext(workOrder)); - kernelHolder.processorCloser.register(() -> { - try { - workerExec.cancel(cancellationId); - } - catch (InterruptedException e) { - // Strange that cancellation would itself be interrupted. Log and suppress. - log.warn(e, "Cancellation interrupted for stage[%s]", stageDefinition.getId()); - Thread.currentThread().interrupt(); - } - }); + final FrameContext frameContext = context.frameContext(workOrder); - // Set up cleanup functions for this work order. + // Set up resultsCloser (called when we are done reading results). kernelHolder.resultsCloser.register(() -> FileUtils.deleteDirectory(frameContext.tempDir())); kernelHolder.resultsCloser.register(() -> removeStageOutputChannels(stageDefinition.getId())); @@ -397,13 +387,8 @@ private void handleNewWorkOrder( final InputChannelFactory inputChannelFactory = makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser); - // Start working on this stage immediately. - kernel.startReading(); - - final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty(); - final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final boolean includeAllCounters = context.includeAllCounters(); final RunWorkOrder runWorkOrder = new RunWorkOrder( - task.getControllerTaskId(), workOrder, inputChannelFactory, stageCounters.computeIfAbsent( @@ -415,11 +400,16 @@ private void handleNewWorkOrder( context, frameContext, makeRunWorkOrderListener(workOrder, controllerClient, criticalWarningCodes, maxVerboseParseExceptions), - MultiStageQueryContext.isReindex(queryContext), - MultiStageQueryContext.removeNullBytes(queryContext) + MultiStageQueryContext.isReindex(workOrder.getWorkerContext()), + MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext()) ); - runWorkOrder.start(); + // Set up processorCloser (called when processing is done). + kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked); + + // Start working on this stage immediately. + kernel.startReading(); + runWorkOrder.startAsync(); kernelHolder.partitionBoundariesFuture = runWorkOrder.getStagePartitionBoundariesFuture(); } @@ -568,6 +558,13 @@ public ListenableFuture readStageOutput( return getOrCreateStageOutputHolder(stageId, partitionNumber).readRemotelyFrom(offset); } + /** + * Accept a new {@link WorkOrder} for execution. + * + * For backwards-compatibility purposes, this method populates {@link WorkOrder#getOutputChannelMode()} + * and {@link WorkOrder#getWorkerContext()} if the controller did not set them. (They are there for newer controllers, + * but not older ones.) + */ @Override public void postWorkOrder(final WorkOrder workOrder) { @@ -585,28 +582,11 @@ public void postWorkOrder(final WorkOrder workOrder) ); } - final OutputChannelMode outputChannelMode; - - // This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder. - // (It will be there for newer controllers; this is a backwards-compatibility thing.) - if (workOrder.hasOutputChannelMode()) { - outputChannelMode = workOrder.getOutputChannelMode(); - } else { - final MSQSelectDestination selectDestination = - task != null - ? MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext())) - : MSQSelectDestination.TASKREPORT; - - outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode( - workOrder.getQueryDefinition(), - workOrder.getStageNumber(), - selectDestination, - task != null && MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())), - false - ); - } + final WorkOrder workOrderToUse = makeWorkOrderToUse( + workOrder, + task != null && task.getContext() != null ? QueryContext.of(task.getContext()) : QueryContext.empty() + ); - final WorkOrder workOrderToUse = workOrder.withOutputChannelMode(outputChannelMode); kernelManipulationQueue.add( kernelHolders -> kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse)) @@ -987,9 +967,9 @@ private StageOutputHolder getOrCreateStageOutputHolder(final StageId stageId, fi /** * Returns cancellation ID for a particular stage, to be used in {@link FrameProcessorExecutor#cancel(String)}. */ - private static String cancellationIdFor(final StageId stageId) + private static String cancellationIdFor(final StageId stageId, final int workerNumber) { - return stageId.toString(); + return StringUtils.format("%s_%s", stageId, workerNumber); } /** @@ -1017,6 +997,48 @@ private void doCancel() ); } + /** + * Returns a work order based on the provided "originalWorkOrder", but where {@link WorkOrder#hasOutputChannelMode()} + * and {@link WorkOrder#hasWorkerContext()} are both true. If the original work order didn't have those fields, they + * are populated from the "taskContext". Otherwise the "taskContext" is ignored. + * + * This method can be removed once we can rely on these fields always being set in the WorkOrder. + * (They will be there for newer controllers; this is a backwards-compatibility method.) + * + * @param originalWorkOrder work order from controller + * @param taskContext task context + */ + static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder, @Nullable final QueryContext taskContext) + { + // This condition can be removed once we can rely on QueryContext always being in the WorkOrder. + // (It will be there for newer controllers; this is a backwards-compatibility thing.) + final QueryContext queryContext; + if (originalWorkOrder.hasWorkerContext()) { + queryContext = originalWorkOrder.getWorkerContext(); + } else if (taskContext != null) { + queryContext = taskContext; + } else { + queryContext = QueryContext.empty(); + } + + // This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder. + // (It will be there for newer controllers; this is a backwards-compatibility thing.) + final OutputChannelMode outputChannelMode; + if (originalWorkOrder.hasOutputChannelMode()) { + outputChannelMode = originalWorkOrder.getOutputChannelMode(); + } else { + outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode( + originalWorkOrder.getQueryDefinition(), + originalWorkOrder.getStageNumber(), + MultiStageQueryContext.getSelectDestination(queryContext), + MultiStageQueryContext.isDurableStorageEnabled(queryContext), + false + ); + } + + return originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode); + } + /** * Log (at DEBUG level) a string explaining the status of all work assigned to this worker. */ @@ -1244,9 +1266,18 @@ public void setDone() private static class KernelHolder { private final WorkerStageKernel kernel; + private SettableFuture partitionBoundariesFuture; + + /** + * Closer for processing. This is closed when all processing for a stage has completed. + */ private final Closer processorCloser; + + /** + * Closer for results. This is closed when results for a stage are no longer needed. Always closed + * *after* {@link #processorCloser} is done closing. + */ private final Closer resultsCloser; - private SettableFuture partitionBoundariesFuture; public KernelHolder(WorkerStageKernel kernel) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1037aa6c2af0..ca93c673a4b9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -24,6 +24,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -45,11 +46,13 @@ import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; -import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.indexing.OverlordClient; @@ -66,6 +69,8 @@ */ public class IndexerControllerContext implements ControllerContext { + public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1; + private static final Logger log = new Logger(IndexerControllerContext.class); private final MSQControllerTask task; @@ -74,6 +79,7 @@ public class IndexerControllerContext implements ControllerContext private final ServiceClientFactory clientFactory; private final OverlordClient overlordClient; private final ServiceMetricEvent.Builder metricBuilder; + private final MemoryIntrospector memoryIntrospector; public IndexerControllerContext( final MSQControllerTask task, @@ -89,27 +95,27 @@ public IndexerControllerContext( this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); + this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); IndexTaskUtils.setTaskDimensions(metricBuilder, task); } @Override public ControllerQueryKernelConfig queryKernelConfig( - final MSQSpec querySpec, - final QueryDefinition queryDef + final String queryId, + final MSQSpec querySpec ) { - final MemoryIntrospector memoryIntrospector = injector.getInstance(MemoryIntrospector.class); final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, - queryDef.getFinalStageDefinition().getMaxWorkerCount() + querySpec.getTuningConfig().getMaxNumWorkers() ); final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters); log.debug( "Query[%s] using %s[%s], %s[%s], %s[%s].", - queryDef.getQueryId(), + queryId, MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, config.isDurableStorage(), MultiStageQueryContext.CTX_FAULT_TOLERANCE, @@ -146,11 +152,11 @@ public DruidNode selfNode() } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) { final SegmentSource includeSegmentSource = MultiStageQueryContext.getSegmentSources(task.getQuerySpec().getQuery().context()); - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( toolbox.getCoordinatorClient(), toolbox.getTaskActionClient(), includeSegmentSource @@ -163,6 +169,12 @@ public TaskActionClient taskActionClient() return toolbox.getTaskActionClient(); } + @Override + public TaskLockType taskLockType() + { + return task.getTaskLockType(); + } + @Override public WorkerClient newWorkerClient() { @@ -200,8 +212,31 @@ public WorkerManager newWorkerManager( ); } + @Override + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) + { + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per task, for maximum parallelism. + MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + querySpec.getQuery().context(), + memoryIntrospector.numProcessingThreads() + ) + ); + } + /** - * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests. + * Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used in tests. */ public static ControllerQueryKernelConfig makeQueryKernelConfig( final MSQSpec querySpec, @@ -209,7 +244,8 @@ public static ControllerQueryKernelConfig makeQueryKernelConfig( ) { final QueryContext queryContext = querySpec.getQuery().context(); - final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); + final int maxConcurrentStages = + MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext, DEFAULT_MAX_CONCURRENT_STAGES); final boolean isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext); final boolean isDurableStorageEnabled; @@ -247,9 +283,44 @@ public static ControllerQueryKernelConfig makeQueryKernelConfig( .destination(querySpec.getDestination()) .maxConcurrentStages(maxConcurrentStages) .maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes()) + .workerContextMap(makeWorkerContextMap(querySpec, isDurableStorageEnabled, maxConcurrentStages)) .build(); } + /** + * Helper method for {@link #makeQueryKernelConfig} and {@link #makeTaskContext}. Makes the worker context map, + * i.e., the map that will become {@link WorkOrder#getWorkerContext()}. + */ + public static Map makeWorkerContextMap( + final MSQSpec querySpec, + final boolean durableStorageEnabled, + final int maxConcurrentStages + ) + { + final QueryContext queryContext = querySpec.getQuery().context(); + final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext); + final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(queryContext); + final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final ImmutableMap.Builder builder = ImmutableMap.builder(); + + builder + .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, durableStorageEnabled) + .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) + .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) + .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, maxConcurrentStages) + .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) + .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); + + if (querySpec.getDestination().toSelectDestination() != null) { + builder.put( + MultiStageQueryContext.CTX_SELECT_DESTINATION, + querySpec.getDestination().toSelectDestination().getName() + ); + } + + return builder.build(); + } + /** * Helper method for {@link #newWorkerManager}, split out to be used in tests. * @@ -262,17 +333,16 @@ public static Map makeTaskContext( ) { final ImmutableMap.Builder taskContextOverridesBuilder = ImmutableMap.builder(); - final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); - final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context()); - final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context()); - taskContextOverridesBuilder - .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage()) - .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions) - .put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec)) - .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages()) - .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes) - .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters); + // Put worker context into the task context. That way, workers can get these context keys either from + // WorkOrder#getContext or Task#getContext. + taskContextOverridesBuilder.putAll( + makeWorkerContextMap( + querySpec, + queryKernelConfig.isDurableStorage(), + queryKernelConfig.getMaxConcurrentStages() + ) + ); // Put the lookup loading info in the task context to facilitate selective loading of lookups. if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) { @@ -288,13 +358,6 @@ public static Map makeTaskContext( ); } - if (querySpec.getDestination().toSelectDestination() != null) { - taskContextOverridesBuilder.put( - MultiStageQueryContext.CTX_SELECT_DESTINATION, - querySpec.getDestination().toSelectDestination().getName() - ); - } - // propagate the controller's tags to the worker task for enhanced metrics reporting @SuppressWarnings("unchecked") Map tags = (Map) controllerTaskContext.get(DruidMetrics.TAGS); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java similarity index 96% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java rename to extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 916dd3c1db38..48283bdd78a2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.input.table; +package org.apache.druid.msq.indexing; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -35,6 +35,12 @@ import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.msq.input.SlicerUtils; +import org.apache.druid.msq.input.table.DataSegmentWithLocation; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentsInputSlice; +import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -60,15 +66,15 @@ /** * Slices {@link TableInputSpec} into {@link SegmentsInputSlice} in tasks. */ -public class TableInputSpecSlicer implements InputSpecSlicer +public class IndexerTableInputSpecSlicer implements InputSpecSlicer { - private static final Logger log = new Logger(TableInputSpecSlicer.class); + private static final Logger log = new Logger(IndexerTableInputSpecSlicer.class); private final CoordinatorClient coordinatorClient; private final TaskActionClient taskActionClient; private final SegmentSource includeSegmentSource; - public TableInputSpecSlicer( + public IndexerTableInputSpecSlicer( CoordinatorClient coordinatorClient, TaskActionClient taskActionClient, SegmentSource includeSegmentSource diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 595e62d9a7fc..fbb0bff95563 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -116,7 +116,10 @@ public IndexerWorkerContext( this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; final QueryContext queryContext = QueryContext.of(task.getContext()); - this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext); + this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault( + queryContext, + IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES + ); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); } @@ -346,6 +349,12 @@ public DataServerQueryHandlerFactory dataServerQueryHandlerFactory() return dataServerQueryHandlerFactory; } + @Override + public boolean includeAllCounters() + { + return includeAllCounters; + } + private synchronized ServiceLocator makeControllerLocator(final String controllerId) { if (controllerLocator == null) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 4ddc8274b9d0..dc985e26fef7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -55,6 +55,7 @@ import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; @@ -234,8 +235,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception { // If we're in replace mode, acquire locks for all intervals before declaring the task ready. if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) { - final TaskLockType taskLockType = - MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), true); + final TaskLockType taskLockType = getTaskLockType(); final List intervals = ((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks(); log.debug( @@ -306,6 +306,26 @@ public int getPriority() return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } + @Nullable + public TaskLockType getTaskLockType() + { + if (isIngestion(querySpec)) { + return MultiStageQueryContext.validateAndGetTaskLockType( + QueryContext.of( + // Use the task context and override with the query context + QueryContexts.override( + getContext(), + querySpec.getQuery().getContext() + ) + ), + ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks() + ); + } else { + // Locks need to be acquired only if data is being ingested into a DataSource + return null; + } + } + private static String getDataSourceForTaskMetadata(final MSQSpec querySpec) { final MSQDestination destination = querySpec.getDestination(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java index dd59dfebd803..6c4ec10d6dfa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSlice.java @@ -32,7 +32,7 @@ /** * Input slice representing a set of segments to read. *
- * Sliced from {@link TableInputSpec} by {@link TableInputSpecSlicer}. + * Sliced from {@link TableInputSpec}. *
* Similar to {@link org.apache.druid.query.spec.MultipleSpecificSegmentSpec} from native queries. *
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java index 0c8578702103..2a45605826be 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java @@ -26,9 +26,11 @@ import org.apache.druid.msq.exec.ControllerClient; import org.apache.druid.msq.exec.OutputChannelMode; import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.query.QueryContext; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -51,9 +53,18 @@ public class WorkOrder @Nullable private final List workerIds; + /** + * Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons. + */ @Nullable private final OutputChannelMode outputChannelMode; + /** + * Always non-null for newer controllers. This is marked nullable for backwards-compatibility reasons. + */ + @Nullable + private final QueryContext workerContext; + @JsonCreator @SuppressWarnings("rawtypes") public WorkOrder( @@ -63,7 +74,8 @@ public WorkOrder( @JsonProperty("input") final List workerInputs, @JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder, @JsonProperty("workers") @Nullable final List workerIds, - @JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode + @JsonProperty("output") @Nullable final OutputChannelMode outputChannelMode, + @JsonProperty("context") @Nullable final Map workerContext ) { this.queryDefinition = Preconditions.checkNotNull(queryDefinition, "queryDefinition"); @@ -73,6 +85,7 @@ public WorkOrder( this.extraInfoHolder = extraInfoHolder; this.workerIds = workerIds; this.outputChannelMode = outputChannelMode; + this.workerContext = workerContext != null ? QueryContext.of(workerContext) : null; } @JsonProperty("query") @@ -124,6 +137,10 @@ public boolean hasOutputChannelMode() return outputChannelMode != null; } + /** + * Retrieves the output channel mode set by the controller. Null means the controller didn't set it, which means + * we're dealing with an older controller. + */ @Nullable @JsonProperty("output") @JsonInclude(JsonInclude.Include.NON_NULL) @@ -132,6 +149,29 @@ public OutputChannelMode getOutputChannelMode() return outputChannelMode; } + public boolean hasWorkerContext() + { + return workerContext != null; + } + + /** + * Retrieves the query context set by the controller. Null means the controller didn't set it, which means + * we're dealing with an older controller. + */ + @Nullable + public QueryContext getWorkerContext() + { + return workerContext; + } + + @Nullable + @JsonProperty("context") + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getContextForSerialization() + { + return workerContext != null ? workerContext.asMap() : null; + } + @Nullable public Object getExtraInfo() { @@ -155,7 +195,26 @@ public WorkOrder withOutputChannelMode(final OutputChannelMode newOutputChannelM workerInputs, extraInfoHolder, workerIds, - newOutputChannelMode + newOutputChannelMode, + workerContext != null ? workerContext.asMap() : null + ); + } + } + + public WorkOrder withWorkerContext(final QueryContext newContext) + { + if (Objects.equals(newContext, this.workerContext)) { + return this; + } else { + return new WorkOrder( + queryDefinition, + stageNumber, + workerNumber, + workerInputs, + extraInfoHolder, + workerIds, + outputChannelMode, + newContext.asMap() ); } } @@ -176,7 +235,8 @@ public boolean equals(Object o) && Objects.equals(workerInputs, workOrder.workerInputs) && Objects.equals(extraInfoHolder, workOrder.extraInfoHolder) && Objects.equals(workerIds, workOrder.workerIds) - && Objects.equals(outputChannelMode, workOrder.outputChannelMode); + && Objects.equals(outputChannelMode, workOrder.outputChannelMode) + && Objects.equals(workerContext, workOrder.workerContext); } @Override @@ -189,7 +249,8 @@ public int hashCode() workerInputs, extraInfoHolder, workerIds, - outputChannelMode + outputChannelMode, + workerContext ); } @@ -204,6 +265,7 @@ public String toString() ", extraInfoHolder=" + extraInfoHolder + ", workerIds=" + workerIds + ", outputChannelMode=" + outputChannelMode + + ", context=" + workerContext + '}'; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index 05e0f722ccd4..16ed68211d55 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -302,7 +302,8 @@ public Int2ObjectMap createWorkOrders( workerInputs.inputsForWorker(workerNumber), extraInfoHolder, config.getWorkerIds(), - outputChannelMode + outputChannelMode, + config.getWorkerContextMap() ); QueryValidator.validateWorkOrder(workOrder); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java index 5c754aedd4f4..f7516c63c929 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java @@ -21,9 +21,12 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.apache.druid.msq.kernel.WorkOrder; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -37,22 +40,22 @@ public class ControllerQueryKernelConfig private final boolean durableStorage; private final boolean faultTolerance; private final MSQDestination destination; - @Nullable - private final String controllerId; - + private final String controllerHost; @Nullable private final List workerIds; + private final Map workerContextMap; - private ControllerQueryKernelConfig( + ControllerQueryKernelConfig( int maxRetainedPartitionSketchBytes, int maxConcurrentStages, boolean pipeline, boolean durableStorage, boolean faultTolerance, MSQDestination destination, - @Nullable String controllerId, - @Nullable List workerIds + @Nullable String controllerHost, + @Nullable List workerIds, + Map workerContextMap ) { if (maxRetainedPartitionSketchBytes <= 0) { @@ -85,8 +88,9 @@ private ControllerQueryKernelConfig( this.durableStorage = durableStorage; this.faultTolerance = faultTolerance; this.destination = destination; - this.controllerId = controllerId; + this.controllerHost = controllerHost; this.workerIds = workerIds; + this.workerContextMap = workerContextMap; } public static Builder builder() @@ -130,6 +134,14 @@ public List getWorkerIds() return workerIds; } + /** + * Map to include in {@link WorkOrder}, as {@link WorkOrder#getWorkerContext()}. + */ + public Map getWorkerContextMap() + { + return workerContextMap; + } + @Override public boolean equals(Object o) { @@ -145,8 +157,10 @@ public boolean equals(Object o) && pipeline == that.pipeline && durableStorage == that.durableStorage && faultTolerance == that.faultTolerance - && Objects.equals(controllerId, that.controllerId) - && Objects.equals(workerIds, that.workerIds); + && Objects.equals(destination, that.destination) + && Objects.equals(controllerHost, that.controllerHost) + && Objects.equals(workerIds, that.workerIds) + && Objects.equals(workerContextMap, that.workerContextMap); } @Override @@ -158,8 +172,10 @@ public int hashCode() pipeline, durableStorage, faultTolerance, - controllerId, - workerIds + destination, + controllerHost, + workerIds, + workerContextMap ); } @@ -171,9 +187,11 @@ public String toString() ", maxConcurrentStages=" + maxConcurrentStages + ", pipeline=" + pipeline + ", durableStorage=" + durableStorage + - ", faultTolerant=" + faultTolerance + - ", controllerId='" + controllerId + '\'' + + ", faultTolerance=" + faultTolerance + + ", destination=" + destination + + ", controllerHost='" + controllerHost + '\'' + ", workerIds=" + workerIds + + ", workerContextMap=" + workerContextMap + '}'; } @@ -185,8 +203,9 @@ public static class Builder private boolean durableStorage; private boolean faultTolerant; private MSQDestination destination; - private String controllerId; + private String controllerHost; private List workerIds; + private Map workerContextMap = Collections.emptyMap(); /** * Use {@link #builder()}. @@ -231,9 +250,9 @@ public Builder destination(final MSQDestination destination) return this; } - public Builder controllerId(final String controllerId) + public Builder controllerHost(final String controllerHost) { - this.controllerId = controllerId; + this.controllerHost = controllerHost; return this; } @@ -243,6 +262,12 @@ public Builder workerIds(final List workerIds) return this; } + public Builder workerContextMap(final Map workerContextMap) + { + this.workerContextMap = workerContextMap; + return this; + } + public ControllerQueryKernelConfig build() { return new ControllerQueryKernelConfig( @@ -252,8 +277,9 @@ public ControllerQueryKernelConfig build() durableStorage, faultTolerant, destination, - controllerId, - workerIds + controllerHost, + workerIds, + workerContextMap ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 232e85166a0f..488479407cfa 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -32,8 +32,14 @@ public interface DataSegmentProvider * Returns a supplier that fetches the segment corresponding to the provided segmentId from deep storage. The segment * is not actually fetched until you call {@link Supplier#get()}. Once you call this, make sure to also call * {@link ResourceHolder#close()}. - *
+ * * It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. + * + * @param segmentId segment ID to fetch + * @param channelCounters counters to increment when the segment is closed + * @param isReindex true if this is a DML command (INSERT or REPLACE) writing into the same table it is + * reading from; false otherwise. When true, implementations must only allow reading from + * segments that are currently-used according to the Coordinator. */ Supplier> fetchSegment( SegmentId segmentId, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java deleted file mode 100644 index cc010a104c6c..000000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentTimelineView.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.querykit; - -import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.TimelineLookup; -import org.joda.time.Interval; - -import java.util.List; -import java.util.Optional; - -public interface DataSegmentTimelineView -{ - /** - * Returns the timeline for a datasource, if it 'exists'. The analysis object passed in must represent a scan-based - * datasource of a single table. (i.e., {@link DataSourceAnalysis#getBaseTableDataSource()} must be present.) - * - * @param dataSource table data source name - * @param intervals relevant intervals. The returned timeline will *at least* include all segments that overlap - * these intervals. It may also include more. Empty means the timeline may not contain any - * segments at all. - * - * @return timeline, if it 'exists' - * - * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table - */ - Optional> getTimeline( - String dataSource, - List intervals - ); -} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index e6ddb4d723dc..21848813e5d3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -123,8 +123,7 @@ public class DataSourcePlan /** * Build a plan. * - * @param queryKit query kit reference for recursive planning - * @param queryId query ID + * @param queryKitSpec reference for recursive planning * @param queryContext query context * @param dataSource datasource to plan * @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned @@ -132,20 +131,17 @@ public class DataSourcePlan * @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this * filter. Query processing must still apply the filter to generated correct results. * @param filterFields which fields from the filter to consider for pruning, or null to consider all fields. - * @param maxWorkerCount maximum number of workers for subqueries * @param minStageNumber starting stage number for subqueries * @param broadcast whether the plan should broadcast data for this datasource */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final DataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) @@ -180,47 +176,38 @@ public static DataSourcePlan forDataSource( return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { return forFilteredDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, (FilteredDataSource) dataSource, querySegmentSpec, - maxWorkerCount, minStageNumber, broadcast ); } else if (dataSource instanceof UnnestDataSource) { return forUnnest( - queryKit, - queryId, + queryKitSpec, queryContext, (UnnestDataSource) dataSource, querySegmentSpec, - maxWorkerCount, minStageNumber, broadcast ); } else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource, - maxWorkerCount, minStageNumber, - broadcast, - queryContext + broadcast ); } else if (dataSource instanceof UnionDataSource) { return forUnion( - queryKit, - queryId, + queryKitSpec, queryContext, (UnionDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, minStageNumber, broadcast ); @@ -234,25 +221,21 @@ public static DataSourcePlan forDataSource( switch (deducedJoinAlgorithm) { case BROADCAST: return forBroadcastHashJoin( - queryKit, - queryId, + queryKitSpec, queryContext, (JoinDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, minStageNumber, broadcast ); case SORT_MERGE: return forSortMergeJoin( - queryKit, - queryId, + queryKitSpec, (JoinDataSource) dataSource, querySegmentSpec, - maxWorkerCount, minStageNumber, broadcast ); @@ -414,23 +397,18 @@ private static DataSourcePlan forLookup( } private static DataSourcePlan forQuery( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryDataSource dataSource, - final int maxWorkerCount, final int minStageNumber, - final boolean broadcast, - @Nullable final QueryContext parentContext + final boolean broadcast ) { - final QueryDefinition subQueryDef = queryKit.makeQueryDefinition( - queryId, + final QueryDefinition subQueryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), - queryKit, - ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount), - maxWorkerCount, + ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()), minStageNumber ); @@ -445,25 +423,21 @@ private static DataSourcePlan forQuery( } private static DataSourcePlan forFilteredDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final FilteredDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) { final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, minStageNumber, broadcast ); @@ -485,26 +459,22 @@ private static DataSourcePlan forFilteredDataSource( * Build a plan for Unnest data source */ private static DataSourcePlan forUnnest( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnnestDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) { // Find the plan for base data source by recursing final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, minStageNumber, broadcast ); @@ -529,14 +499,12 @@ private static DataSourcePlan forUnnest( } private static DataSourcePlan forUnion( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnionDataSource unionDataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) @@ -544,21 +512,19 @@ private static DataSourcePlan forUnion( // This is done to prevent loss of generality since MSQ can plan any type of DataSource. List children = unionDataSource.getDataSources(); - final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final List newChildren = new ArrayList<>(); final List inputSpecs = new ArrayList<>(); final IntSet broadcastInputs = new IntOpenHashSet(); for (DataSource child : children) { DataSourcePlan childDataSourcePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, child, querySegmentSpec, filter, filterFields, - maxWorkerCount, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast ); @@ -582,30 +548,26 @@ private static DataSourcePlan forUnion( * Build a plan for broadcast hash-join. */ private static DataSourcePlan forBroadcastHashJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) { - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourceAnalysis analysis = dataSource.getAnalysis(); final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, analysis.getBaseDataSource(), querySegmentSpec, filter, filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), - maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast ); @@ -618,14 +580,12 @@ private static DataSourcePlan forBroadcastHashJoin( for (int i = 0; i < analysis.getPreJoinableClauses().size(); i++) { final PreJoinableClause clause = analysis.getPreJoinableClauses().get(i); final DataSourcePlan clausePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, clause.getDataSource(), new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY), null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. null, - maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. ); @@ -655,11 +615,9 @@ private static DataSourcePlan forBroadcastHashJoin( * Build a plan for sort-merge join. */ private static DataSourcePlan forSortMergeJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, final int minStageNumber, final boolean broadcast ) @@ -672,19 +630,16 @@ private static DataSourcePlan forSortMergeJoin( SortMergeJoinFrameProcessorFactory.validateCondition(dataSource.getConditionAnalysis()) ); - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); // Plan the left input. // We're confident that we can cast dataSource.getLeft() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan leftPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getLeft(), - maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -692,13 +647,10 @@ private static DataSourcePlan forSortMergeJoin( // We're confident that we can cast dataSource.getRight() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan rightPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getRight(), - maxWorkerCount, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -707,8 +659,9 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); + final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle(); final List leftPartitionKey = partitionKeys.get(0); - leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), maxWorkerCount)); + leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); // Build up the right stage. @@ -717,7 +670,7 @@ private static DataSourcePlan forSortMergeJoin( ); final List rightPartitionKey = partitionKeys.get(1); - rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), maxWorkerCount)); + rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), hashPartitionCount)); rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(), rightPartitionKey)); // Compute join signature. @@ -745,7 +698,7 @@ private static DataSourcePlan forSortMergeJoin( Iterables.getOnlyElement(rightPlan.getInputSpecs()) ) ) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .signature(joinSignatureBuilder.build()) .processorFactory( new SortMergeJoinFrameProcessorFactory( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index a795f6496053..3129bbfacb9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -41,11 +41,9 @@ public MultiQueryKit(final Map, QueryKit> toolKitMap) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, Query query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, int minStageNumber ) { @@ -54,11 +52,9 @@ public QueryDefinition makeQueryDefinition( if (specificToolKit != null) { //noinspection unchecked return specificToolKit.makeQueryDefinition( - queryId, + queryKitSpec, query, - this, resultShuffleSpecFactory, - maxWorkerCount, minStageNumber ); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index b259022bba5b..118091ccbd49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -30,23 +30,17 @@ public interface QueryKit> /** * Creates a {@link QueryDefinition} from a {@link Query}. * - * @param queryId query ID of the resulting {@link QueryDefinition} + * @param queryKitSpec collection of parameters necessary for planning {@link QueryDefinition} * @param query native query to translate - * @param toolKitForSubQueries kit that is used to translate native subqueries; i.e., - * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. * @param resultShuffleSpecFactory shuffle spec factory for the final output of this query. - * @param maxWorkerCount maximum number of workers: becomes - * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. */ QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, QueryType query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java new file mode 100644 index 000000000000..7cae4ed7d7bf --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecs; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.query.Query; + +import java.util.List; + +/** + * Collection of parameters for {@link QueryKit#makeQueryDefinition}. + */ +public class QueryKitSpec +{ + private final QueryKit> queryKit; + private final String queryId; + private final int maxLeafWorkerCount; + private final int maxNonLeafWorkerCount; + private final int targetPartitionsPerWorker; + + /** + * @param queryKit kit that is used to translate native subqueries; i.e., + * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. + * @param queryId queryId of the resulting {@link QueryDefinition} + * @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries + */ + public QueryKitSpec( + QueryKit> queryKit, + String queryId, + int maxLeafWorkerCount, + int maxNonLeafWorkerCount, + int targetPartitionsPerWorker + ) + { + this.queryId = queryId; + this.queryKit = queryKit; + this.maxLeafWorkerCount = maxLeafWorkerCount; + this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; + this.targetPartitionsPerWorker = targetPartitionsPerWorker; + } + + /** + * Instance of {@link QueryKit} for recursive calls. + */ + public QueryKit> getQueryKit() + { + return queryKit; + } + + /** + * Query ID to use when building {@link QueryDefinition}. + */ + public String getQueryId() + { + return queryId; + } + + /** + * Maximum worker count for a stage with the given inputs. Will use {@link #maxNonLeafWorkerCount} if there are + * any stage inputs, {@link #maxLeafWorkerCount} otherwise. + */ + public int getMaxWorkerCount(final List inputSpecs) + { + if (InputSpecs.getStageNumbers(inputSpecs).isEmpty()) { + return maxLeafWorkerCount; + } else { + return maxNonLeafWorkerCount; + } + } + + /** + * Maximum number of workers for non-leaf stages (where there are some stage inputs). + */ + public int getMaxNonLeafWorkerCount() + { + return maxNonLeafWorkerCount; + } + + /** + * Number of partitions to generate during a shuffle. + */ + public int getNumPartitionsForShuffle() + { + return maxNonLeafWorkerCount * targetPartitionsPerWorker; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b3686359d2a4..02542f8e7366 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -35,7 +35,6 @@ import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.query.Query; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.NaiveSortOperatorFactory; @@ -63,11 +62,9 @@ public WindowOperatorQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, WindowOperatorQuery originalQuery, - QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, int minStageNumber ) { @@ -89,20 +86,22 @@ public QueryDefinition makeQueryDefinition( log.info("Created operatorList with operator factories: [%s]", operatorList); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, minStageNumber, false ); - ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); - final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec); + ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( + operatorList.get(0), + queryKitSpec.getNumPartitionsForShuffle() + ); + final QueryDefinitionBuilder queryDefBuilder = + makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); @@ -130,7 +129,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber) .inputs(new StageInputSpec(firstStageNumber - 1)) .signature(finalWindowStageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(finalWindowStageShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, @@ -192,7 +191,8 @@ public QueryDefinition makeQueryDefinition( stageRowSignature = finalWindowStageRowSignature; nextShuffleSpec = finalWindowStageShuffleSpec; } else { - nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount); + nextShuffleSpec = + findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -229,7 +229,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) .signature(stageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, @@ -285,7 +285,7 @@ private List> getOperatorListFromQuery(WindowOperatorQuery return operatorList; } - private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int maxWorkerCount) + private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) { NaivePartitioningOperatorFactory partition = null; NaiveSortOperatorFactory sort = null; @@ -325,7 +325,7 @@ private ShuffleSpec findShuffleSpecForNextWindow(List operatorF keyColsOfWindow.add(kc); } - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount); + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 7e4ebf5e7fab..db56bd02f742 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -34,12 +34,12 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.DimensionComparisonUtils; -import org.apache.druid.query.Query; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.having.AlwaysHavingSpec; @@ -66,26 +66,22 @@ public GroupByQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final GroupByQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, final int minStageNumber ) { validateQuery(originalQuery); - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, minStageNumber, false ); @@ -139,9 +135,10 @@ public QueryDefinition makeQueryDefinition( // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' AS __time FROM bar PARTITIONED BY DAY - shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() - ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); + shuffleSpecFactoryPreAggregation = + intermediateClusterBy.isEmpty() + ? ShuffleSpecFactories.singlePartition() + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); @@ -166,7 +163,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .signature(intermediateSignature) .shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true)) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun)) ); @@ -186,7 +186,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec( shuffleSpecFactoryPostAggregation != null ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) @@ -387,7 +387,10 @@ private static void validateQuery(final GroupByQuery query) for (final OrderByColumnSpec column : defaultLimitSpec.getColumns()) { final Optional type = resultSignature.getColumnType(column.getDimension()); - if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(type.get().getType(), column.getDimensionComparator())) { + if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator( + type.get().getType(), + column.getDimensionComparator() + )) { throw new ISE( "Must use natural comparator for column [%s] of type [%s]", column.getDimension(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index f4f50106e813..8d23e289bb67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -33,13 +33,13 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.Order; import org.apache.druid.query.OrderBy; -import org.apache.druid.query.Query; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -86,24 +86,20 @@ public static RowSignature getAndValidateSignature(final ScanQuery scanQuery, fi // partition without a ClusterBy, we don't need to necessarily create it via the resultShuffleSpecFactory provided @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final ScanQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, final int minStageNumber ) { - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, minStageNumber, false ); @@ -177,7 +173,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .shuffleSpec(scanShuffleSpec) .signature(signatureToUse) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new ScanQueryFrameProcessorFactory(queryToRun)) ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 5eb20c387ed0..03cec9d192fc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -120,7 +120,6 @@ public class MultiStageQueryContext public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = SegmentSource.NONE; public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages"; - public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1; public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false; public static final String CTX_SELECT_DESTINATION = "selectDestination"; @@ -194,6 +193,12 @@ public class MultiStageQueryContext public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; + /** + * Number of partitions to target per worker when creating shuffle specs that involve specific numbers of + * partitions. This helps us utilize more parallelism when workers are multi-threaded. + */ + public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker"; + private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); public static String getMSQMode(final QueryContext queryContext) @@ -204,11 +209,14 @@ public static String getMSQMode(final QueryContext queryContext) ); } - public static int getMaxConcurrentStages(final QueryContext queryContext) + public static int getMaxConcurrentStagesWithDefault( + final QueryContext queryContext, + final int defaultMaxConcurrentStages + ) { return queryContext.getInt( CTX_MAX_CONCURRENT_STAGES, - DEFAULT_MAX_CONCURRENT_STAGES + defaultMaxConcurrentStages ); } @@ -343,16 +351,6 @@ public static MSQSelectDestination getSelectDestination(final QueryContext query ); } - @Nullable - public static MSQSelectDestination getSelectDestinationOrNull(final QueryContext queryContext) - { - return QueryContexts.getAsEnum( - CTX_SELECT_DESTINATION, - queryContext.getString(CTX_SELECT_DESTINATION), - MSQSelectDestination.class - ); - } - public static int getRowsInMemory(final QueryContext queryContext) { return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY); @@ -393,6 +391,14 @@ public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE); } + public static int getTargetPartitionsPerWorkerWithDefault( + final QueryContext queryContext, + final int defaultValue + ) + { + return queryContext.getInt(CTX_TARGET_PARTITIONS_PER_WORKER, defaultValue); + } + /** * See {@link #CTX_INCLUDE_ALL_COUNTERS}. */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java index d7364124483a..c1d1030fb08b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java @@ -108,6 +108,7 @@ public void testMoreInputFiles() Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles null, null, + null, null ); @@ -125,7 +126,7 @@ public void testMoreInputFiles() QueryValidator.validateWorkOrder(workOrder); } - private static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) + public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) { QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java new file mode 100644 index 000000000000..32cd36d09980 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +public class WorkerImplTest +{ + @Test + public void test_makeWorkOrderToUse_nothingMissing() + { + final WorkOrder workOrder = new WorkOrder( + QueryValidatorTest.createQueryDefinition(10, 2), + 0, + 0, + Collections.singletonList(() -> 1), + null, + null, + OutputChannelMode.MEMORY, + ImmutableMap.of("foo", "bar") + ); + + Assert.assertSame( + workOrder, + WorkerImpl.makeWorkOrderToUse( + workOrder, + QueryContext.of(ImmutableMap.of("foo", "baz")) /* Conflicts with workOrder context; should be ignored */ + ) + ); + } + + @Test + public void test_makeWorkOrderToUse_missingOutputChannelModeAndWorkerContext() + { + final Map taskContext = + ImmutableMap.of("foo", "bar", MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true); + + final WorkOrder workOrder = new WorkOrder( + QueryValidatorTest.createQueryDefinition(10, 2), + 1, + 2, + Collections.singletonList(() -> 1), + null, + null, + null, + null + ); + + Assert.assertEquals( + new WorkOrder( + workOrder.getQueryDefinition(), + workOrder.getStageNumber(), + workOrder.getWorkerNumber(), + workOrder.getInputs(), + null, + null, + OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE, + taskContext + ), + WorkerImpl.makeWorkOrderToUse(workOrder, QueryContext.of(taskContext)) + ); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 8d974285fb57..467a5bab49fe 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -46,85 +47,56 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; public class MSQControllerTaskTest { - private final List INTERVALS = - Collections.singletonList(Intervals.of( - "2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")); - - private final MSQSpec MSQ_SPEC = MSQSpec - .builder() - .destination(new DataSourceMSQDestination( - "target", - Granularities.DAY, - null, - INTERVALS, - null, - null - )) - .query(new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) - .dataSource("target") - .build() - ) - .columnMappings(new ColumnMappings(ImmutableList.of(new ColumnMapping("a0", "cnt")))) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .build(); + private static final List INTERVALS = Collections.singletonList( + Intervals.of("2011-04-01/2011-04-03") + ); + + private static MSQSpec.Builder msqSpecBuilder() + { + return MSQSpec + .builder() + .destination( + new DataSourceMSQDestination("target", Granularities.DAY, null, INTERVALS, null, null) + ) + .query( + new Druids.ScanQueryBuilder() + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .build() + ) + .columnMappings(new ColumnMappings(ImmutableList.of(new ColumnMapping("a0", "cnt")))) + .tuningConfig(MSQTuningConfig.defaultConfig()); + } @Test public void testGetInputSourceResources() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); - Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty()); + Assert.assertTrue(createControllerTask(msqSpecBuilder()).getInputSourceResources().isEmpty()); } @Test public void testGetDefaultLookupLoadingSpec() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); } @Test public void testGetDefaultBroadcastDatasourceLoadingSpec() { - MSQControllerTask controllerTask = new MSQControllerTask( - null, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, controllerTask.getBroadcastDatasourceLoadingSpec()); } @Test public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() { - MSQSpec build = MSQSpec + MSQSpec.Builder builder = MSQSpec .builder() .query(new Druids.ScanQueryBuilder() .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) @@ -137,54 +109,83 @@ public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext() .build() ) .columnMappings(new ColumnMappings(Collections.emptyList())) - .tuningConfig(MSQTuningConfig.defaultConfig()) - .build(); - MSQControllerTask controllerTask = new MSQControllerTask( - null, - build, - null, - null, - null, - null, - null, - null - ); + .tuningConfig(MSQTuningConfig.defaultConfig()); - // Va;idate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. - Assert.assertEquals(LookupLoadingSpec.NONE, controllerTask.getLookupLoadingSpec()); + // Validate that MSQ Controller task doesn't load any lookups even if context has lookup info populated. + Assert.assertEquals(LookupLoadingSpec.NONE, createControllerTask(builder).getLookupLoadingSpec()); } @Test public void testGetTaskAllocatorId() { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); + Assert.assertEquals(controllerTask.getId(), controllerTask.getTaskAllocatorId()); + } + + @Test + public void testGetTaskLockType() + { + final DataSourceMSQDestination appendDestination + = new DataSourceMSQDestination("target", Granularities.DAY, null, null, null, null); + Assert.assertEquals( + TaskLockType.SHARED, + createControllerTask(msqSpecBuilder().destination(appendDestination)).getTaskLockType() + ); + + final DataSourceMSQDestination replaceDestination + = new DataSourceMSQDestination("target", Granularities.DAY, null, INTERVALS, null, null); + Assert.assertEquals( + TaskLockType.EXCLUSIVE, + createControllerTask(msqSpecBuilder().destination(replaceDestination)).getTaskLockType() + ); + + // With 'useConcurrentLocks' in task context + final Map taskContext = Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS, true); + final MSQControllerTask appendTaskWithContext = new MSQControllerTask( + null, + msqSpecBuilder().destination(appendDestination).build(), null, null, null, null, null, - null + taskContext ); - Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId()); - } + Assert.assertEquals(TaskLockType.APPEND, appendTaskWithContext.getTaskLockType()); - @Test - public void testIsReady() throws Exception - { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, + final MSQControllerTask replaceTaskWithContext = new MSQControllerTask( + null, + msqSpecBuilder().destination(replaceDestination).build(), null, null, null, null, null, - null + taskContext ); + Assert.assertEquals(TaskLockType.REPLACE, replaceTaskWithContext.getTaskLockType()); + + // With 'useConcurrentLocks' in query context + final Map queryContext = Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS, true); + final ScanQuery query = new Druids.ScanQueryBuilder() + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .intervals(new MultipleIntervalSegmentSpec(INTERVALS)) + .dataSource("target") + .context(queryContext) + .build(); + Assert.assertEquals( + TaskLockType.APPEND, + createControllerTask(msqSpecBuilder().query(query).destination(appendDestination)).getTaskLockType() + ); + Assert.assertEquals( + TaskLockType.REPLACE, + createControllerTask(msqSpecBuilder().query(query).destination(replaceDestination)).getTaskLockType() + ); + } + + @Test + public void testIsReady() throws Exception + { TestTaskActionClient taskActionClient = new TestTaskActionClient( new TimeChunkLock( TaskLockType.REPLACE, @@ -195,24 +196,14 @@ public void testIsReady() throws Exception 0 ) ); - Assert.assertTrue(controllerTask.isReady(taskActionClient)); + Assert.assertTrue(createControllerTask(msqSpecBuilder()).isReady(taskActionClient)); } @Test public void testIsReadyWithRevokedLock() { - final String taskId = "taskId"; - MSQControllerTask controllerTask = new MSQControllerTask( - taskId, - MSQ_SPEC, - null, - null, - null, - null, - null, - null - ); - TestTaskActionClient taskActionClient = new TestTaskActionClient( + MSQControllerTask controllerTask = createControllerTask(msqSpecBuilder()); + TaskActionClient taskActionClient = new TestTaskActionClient( new TimeChunkLock( TaskLockType.REPLACE, "groupId", @@ -225,10 +216,17 @@ public void testIsReadyWithRevokedLock() ); DruidException exception = Assert.assertThrows( DruidException.class, - () -> controllerTask.isReady(taskActionClient)); + () -> controllerTask.isReady(taskActionClient) + ); Assert.assertEquals( "Lock of type[REPLACE] for interval[2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z] was revoked", - exception.getMessage()); + exception.getMessage() + ); + } + + private static MSQControllerTask createControllerTask(MSQSpec.Builder specBuilder) + { + return new MSQControllerTask("controller_1", specBuilder.build(), null, null, null, null, null, null, null); } private static class TestTaskActionClient implements TaskActionClient diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java similarity index 98% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index a27ae7d97804..ac864419abed 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -41,7 +42,7 @@ import java.util.Collections; -public class TableInputSpecSlicerTest extends InitializedNullHandlingTest +public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest { private static final String DATASOURCE = "test-ds"; private static final long BYTES_PER_SEGMENT = 1000; @@ -97,7 +98,7 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest BYTES_PER_SEGMENT ); private SegmentTimeline timeline; - private TableInputSpecSlicer slicer; + private IndexerTableInputSpecSlicer slicer; private TaskActionClient taskActionClient; @Before @@ -131,7 +132,7 @@ public RetType submit(TaskAction taskAction) } }; - slicer = new TableInputSpecSlicer( + slicer = new IndexerTableInputSpecSlicer( null /* not used for SegmentSource.NONE */, taskActionClient, SegmentSource.NONE diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java new file mode 100644 index 000000000000..765101359f66 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.kernel.controller; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; +import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ControllerQueryKernelConfigTest +{ + @Test + public void testBuilder() + { + int maxRetainedPartitionSketchBytes = 1; + int maxConcurrentStages = 2; + boolean pipeline = false; + boolean durableStorage = true; + boolean faultTolerance = true; + MSQDestination destination = DurableStorageMSQDestination.instance(); + String controllerHost = "controllerHost"; + List workerIds = ImmutableList.of("worker1", "worker2"); + Map workerContextMap = ImmutableMap.of("foo", "bar"); + + final ControllerQueryKernelConfig config1 = new ControllerQueryKernelConfig( + maxRetainedPartitionSketchBytes, + maxConcurrentStages, + pipeline, + durableStorage, + faultTolerance, + destination, + controllerHost, + workerIds, + workerContextMap + ); + + final ControllerQueryKernelConfig config2 = ControllerQueryKernelConfig + .builder() + .maxRetainedPartitionSketchBytes(maxRetainedPartitionSketchBytes) + .maxConcurrentStages(maxConcurrentStages) + .pipeline(pipeline) + .durableStorage(durableStorage) + .faultTolerance(faultTolerance) + .destination(destination) + .controllerHost(controllerHost) + .workerIds(workerIds) + .workerContextMap(workerContextMap) + .build(); + + Assert.assertEquals(config1, config2); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(ControllerQueryKernelConfig.class) + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java index 439aa148a84c..cde2b0ea4e9d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.querykit; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; @@ -47,19 +48,21 @@ public class FrameProcessorTestBase extends InitializedNullHandlingTest { protected static final StagePartition STAGE_PARTITION = new StagePartition(new StageId("q", 0), 0); + private ListeningExecutorService innerExec; protected FrameProcessorExecutor exec; @Before public void setUp() { - exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec"))); + innerExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")); + exec = new FrameProcessorExecutor(innerExec); } @After public void tearDown() throws Exception { - exec.getExecutorService().shutdownNow(); - exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES); + innerExec.shutdownNow(); + innerExec.awaitTermination(10, TimeUnit.MINUTES); } protected ReadableInput makeChannelFromCursorFactory( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e65104302032..970d873c96c8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -37,6 +37,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -54,14 +55,17 @@ import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.indexing.IndexerControllerContext; +import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; +import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQWorkerTask; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.input.InputSpecSlicer; -import org.apache.druid.msq.input.table.TableInputSpecSlicer; -import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; @@ -105,6 +109,7 @@ public class MSQTestControllerContext implements ControllerContext private Controller controller; private final WorkerMemoryParameters workerMemoryParameters; private final QueryContext queryContext; + private final MSQControllerTask controllerTask; public MSQTestControllerContext( ObjectMapper mapper, @@ -112,7 +117,7 @@ public MSQTestControllerContext( TaskActionClient taskActionClient, WorkerMemoryParameters workerMemoryParameters, List loadedSegments, - QueryContext queryContext + MSQControllerTask controllerTask ) { this.mapper = mapper; @@ -132,7 +137,8 @@ public MSQTestControllerContext( .collect(Collectors.toList()) ); this.workerMemoryParameters = workerMemoryParameters; - this.queryContext = queryContext; + this.controllerTask = controllerTask; + this.queryContext = controllerTask.getQuerySpec().getQuery().context(); } OverlordClient overlordClient = new NoopOverlordClient() @@ -269,11 +275,28 @@ public ListenableFuture cancelTask(String workerId) }; @Override - public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef) + public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec) { return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000)); } + @Override + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) + { + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + 1 + ); + } + @Override public void emitMetric(String metric, Number value) { @@ -304,9 +327,15 @@ public TaskActionClient taskActionClient() } @Override - public InputSpecSlicer newTableInputSpecSlicer() + public TaskLockType taskLockType() + { + return controllerTask.getTaskLockType(); + } + + @Override + public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) { - return new TableInputSpecSlicer( + return new IndexerTableInputSpecSlicer( coordinatorClient, taskActionClient, MultiStageQueryContext.getSegmentSources(queryContext) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java index a565283154fd..6a7db8aa5b63 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java @@ -103,7 +103,7 @@ public ListenableFuture runTask(String taskId, Object taskObject) taskActionClient, workerMemoryParameters, loadedSegmentMetadata, - cTask.getQuerySpec().getQuery().context() + cTask ); inMemoryControllerTask.put(cTask.getId(), cTask); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java index 411fbab8266e..0902e978641b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java @@ -160,6 +160,12 @@ public DataServerQueryHandlerFactory dataServerQueryHandlerFactory() return injector.getInstance(DataServerQueryHandlerFactory.class); } + @Override + public boolean includeAllCounters() + { + return true; + } + class FrameContextImpl implements FrameContext { private final File tempDir; diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index c0f79d30e581..f255fbe13a6b 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -46,12 +46,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -61,7 +63,6 @@ * If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a * same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks. */ -@SuppressWarnings("CheckReturnValue") public class FrameProcessorExecutor { private static final Logger log = new Logger(FrameProcessorExecutor.class); @@ -70,6 +71,10 @@ public class FrameProcessorExecutor private final Object lock = new Object(); + // Currently-active cancellationIds. + @GuardedBy("lock") + private final Set activeCancellationIds = new HashSet<>(); + // Futures that are active and therefore cancelable. // Does not include return futures: those are in cancelableReturnFutures. @GuardedBy("lock") @@ -96,19 +101,12 @@ public FrameProcessorExecutor(final ListeningExecutorService exec) this.exec = exec; } - /** - * Returns the underlying executor service used by this executor. - */ - public ListeningExecutorService getExecutorService() - { - return exec; - } - /** * Runs a processor until it is done, and returns a future that resolves when execution is complete. * - * If "cancellationId" is provided, it can be used with the {@link #cancel(String)} method to cancel all processors - * currently running with the same cancellationId. + * If "cancellationId" is provided, it must have previously been registered with {@link #registerCancellationId}. + * Then, it can be used with the {@link #cancel(String)} method to cancel all processors with that + * same cancellationId. */ public ListenableFuture runFully(final FrameProcessor processor, @Nullable final String cancellationId) { @@ -116,6 +114,11 @@ public ListenableFuture runFully(final FrameProcessor processor, @Null final List outputChannels = processor.outputChannels(); final SettableFuture finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId); + if (finished.isDone()) { + // Possibly due to starting life out being canceled. + return finished; + } + class ExecutorRunnable implements Runnable { private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels); @@ -152,7 +155,7 @@ public void run() final IntSet await = result.awaitSet(); if (await.isEmpty()) { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } else if (result.isAwaitAll() || await.size() == 1) { final List> readabilityFutures = new ArrayList<>(); @@ -164,7 +167,7 @@ public void run() } if (readabilityFutures.isEmpty()) { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } else { runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures)); } @@ -272,7 +275,7 @@ private void runProcessorAfterFutureResolves(final ListenableFuture futur public void onSuccess(final V ignored) { try { - exec.submit(ExecutorRunnable.this); + exec.execute(ExecutorRunnable.this); } catch (Throwable e) { fail(e); @@ -390,7 +393,7 @@ void doProcessorCleanup() throws IOException logProcessorStatusString(processor, finished, null); registerCancelableProcessor(processor, cancellationId); - exec.submit(runnable); + exec.execute(runnable); return finished; } @@ -423,8 +426,20 @@ public ListenableFuture runAllFully( } /** - * Cancels all processors associated with a given cancellationId. Waits for the processors to exit before - * returning. + * Registers a cancellationId, so it can be provided to {@link #runFully} or {@link #runAllFully}. To avoid the + * set of active cancellationIds growing without bound, callers must also call {@link #cancel(String)} on the + * same cancellationId when done using it. + */ + public void registerCancellationId(final String cancellationId) + { + synchronized (lock) { + activeCancellationIds.add(cancellationId); + } + } + + /** + * Deregisters a cancellationId and cancels any currently-running processors associated with that cancellationId. + * Waits for any canceled processors to exit before returning. */ public void cancel(final String cancellationId) throws InterruptedException { @@ -435,6 +450,7 @@ public void cancel(final String cancellationId) throws InterruptedException final Set> returnFuturesToCancel; synchronized (lock) { + activeCancellationIds.remove(cancellationId); futuresToCancel = cancelableFutures.removeAll(cancellationId); processorsToCancel = cancelableProcessors.removeAll(cancellationId); returnFuturesToCancel = cancelableReturnFutures.removeAll(cancellationId); @@ -457,6 +473,33 @@ public void cancel(final String cancellationId) throws InterruptedException } } + /** + * Returns an {@link Executor} that executes using the same underlying service, and that is also connected to + * cancellation through {@link #cancel(String)}. + * + * @param cancellationId cancellation ID for the executor + */ + public Executor asExecutor(@Nullable final String cancellationId) + { + return command -> runFully(new RunnableFrameProcessor(command), cancellationId); + } + + /** + * Shuts down the underlying executor service immediately. + */ + public void shutdownNow() + { + exec.shutdownNow(); + } + + /** + * Returns the underlying executor service used by this executor. + */ + ListeningExecutorService getExecutorService() + { + return exec; + } + /** * Register a future that will be canceled when the provided {@code cancellationId} is canceled. * @@ -472,6 +515,12 @@ > FutureType registerCancelableFuture( { if (cancellationId != null) { synchronized (lock) { + if (!activeCancellationIds.contains(cancellationId)) { + // Cancel and return immediately. + future.cancel(true); + return future; + } + final SetMultimap> map = isReturn ? cancelableReturnFutures : cancelableFutures; map.put(cancellationId, future); future.addListener( diff --git a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java index a1a1c0f87120..7f79a319c280 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java @@ -306,9 +306,11 @@ public void onSuccess(T result) } if (isDone) { - finished.compareAndSet(null, Either.value(processorManager.result())); - synchronized (runAllFullyLock) { + if (finished.get() == null) { + finished.compareAndSet(null, Either.value(processorManager.result())); + } + cleanupIfNoMoreProcessors(); } } else { diff --git a/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java new file mode 100644 index 000000000000..697879490e1e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; + +import java.util.Collections; +import java.util.List; + +/** + * Frame processor that simply runs a {@link Runnable}, once. + */ +public class RunnableFrameProcessor implements FrameProcessor +{ + private final Runnable runnable; + + public RunnableFrameProcessor(Runnable runnable) + { + this.runnable = runnable; + } + + @Override + public List inputChannels() + { + return Collections.emptyList(); + } + + @Override + public List outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait runIncrementally(IntSet readableInputs) + { + runnable.run(); + return ReturnOrAwait.returnObject(null); + } + + @Override + public void cleanup() + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java index e30f2e77b02b..b8b74a2b797e 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java @@ -297,7 +297,7 @@ public ListenableFuture run() setAllDoneIfPossible(); } }, - exec.getExecutorService() + exec.asExecutor(cancellationId) ); return FutureUtils.futureWithBaggage( @@ -813,7 +813,7 @@ public void onFailure(Throwable t) }, // Must run in exec, instead of in the same thread, to avoid running callback immediately if the // worker happens to finish super-quickly. - exec.getExecutorService() + exec.asExecutor(cancellationId) ); } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java index 0f50624078b4..4ed2c610525e 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java @@ -222,6 +222,7 @@ public void test_registerCancelableFuture() throws InterruptedException final SettableFuture future = SettableFuture.create(); final String cancellationId = "xyzzy"; + exec.registerCancellationId(cancellationId); Assert.assertSame(future, exec.registerCancelableFuture(future, false, cancellationId)); exec.cancel(cancellationId); @@ -236,6 +237,8 @@ public void test_cancel_sleepy() throws Exception { final SleepyFrameProcessor processor = new SleepyFrameProcessor(); final String cancellationId = "xyzzy"; + + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runFully(processor, cancellationId); processor.awaitRun(); @@ -254,6 +257,8 @@ public void test_futureCancel_sleepy() throws Exception { final SleepyFrameProcessor processor = new SleepyFrameProcessor(); final String cancellationId = "xyzzy"; + + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runFully(processor, cancellationId); processor.awaitRun(); @@ -314,6 +319,8 @@ public void test_cancel_concurrency() throws Exception // Start up all systems at once. for (final String systemId : systemGeneratorsMap.keySet()) { + exec.registerCancellationId(systemId); + for (InfiniteFrameProcessor generator : systemGeneratorsMap.get(systemId)) { processorFutureMap.put(generator, exec.runFully(generator, systemId)); } @@ -391,6 +398,22 @@ public void test_cancel_nonexistentCancellationId() throws InterruptedException // Just making sure no error is thrown when we refer to a nonexistent cancellationId. exec.cancel("nonexistent"); } + + @Test + public void test_runFully_nonexistentCancellationId() + { + final SleepyFrameProcessor processor = new SleepyFrameProcessor(); + final String cancellationId = "xyzzy"; + + // Don't registerCancellationId(cancellationId). + final ListenableFuture future = exec.runFully(processor, cancellationId); + + // Future should be immediately canceled, without running the processor. + Assert.assertTrue(future.isDone()); + Assert.assertTrue(future.isCancelled()); + Assert.assertFalse(processor.didGetInterrupt()); + Assert.assertFalse(processor.didCleanup()); + } } public abstract static class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest diff --git a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java index 7cd1e980428e..d0ae5a986a00 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java @@ -409,6 +409,8 @@ public void test_runAllFully_futureCancel() throws InterruptedException .mapToObj(i -> new SleepyFrameProcessor()) .collect(Collectors.toList()); + final String cancellationId = "xyzzy"; + exec.registerCancellationId(cancellationId); final ListenableFuture future = exec.runAllFully( possiblyDelay( ensureClose( @@ -418,7 +420,7 @@ public void test_runAllFully_futureCancel() throws InterruptedException ), maxOutstandingProcessors, bouncer, - "xyzzy" + cancellationId ); for (int i = 0; i < expectedRunningProcessors; i++) {