Skip to content

Commit

Permalink
MSQ: Include worker context maps in WorkOrders. (apache#17076)
Browse files Browse the repository at this point in the history
* MSQ: Include worker context maps in WorkOrders.

This provides a mechanism to send contexts to workers in long-lived,
shared JVMs that are not part of the task system.

* Style, coverage.
  • Loading branch information
gianm authored and kfaraz committed Oct 4, 2024
1 parent 5bdf288 commit 3e43050
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.util.MultiStageQueryContext;
Expand All @@ -43,7 +42,7 @@ public interface ControllerContext
/**
* Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
*/
ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef);
ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec);

/**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
Expand Down Expand Up @@ -88,7 +87,7 @@ public interface ControllerContext
*
* @param queryId query ID
* @param querySpec query spec
* @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)}
* @param queryKernelConfig config from {@link #queryKernelConfig(String, MSQSpec)}
* @param workerFailureListener listener that receives callbacks when workers fail
*/
WorkerManager newWorkerManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ public void addToKernelManipulationQueue(Consumer<ControllerQueryKernel> kernelC
private QueryDefinition initializeQueryDefAndState(final Closer closer)
{
this.selfDruidNode = context.selfNode();
this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient());
closer.register(netClient);
this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient()));
this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec);

final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
Expand Down Expand Up @@ -593,7 +593,6 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef);

queryKernelConfig = context.queryKernelConfig(querySpec, queryDef);
workerManager = context.newWorkerManager(
queryId,
querySpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
Expand Down Expand Up @@ -388,7 +387,6 @@ private void handleNewWorkOrder(
final InputChannelFactory inputChannelFactory =
makeBaseInputChannelFactory(workOrder, controllerClient, kernelHolder.processorCloser);

final QueryContext queryContext = task != null ? QueryContext.of(task.getContext()) : QueryContext.empty();
final boolean includeAllCounters = context.includeAllCounters();
final RunWorkOrder runWorkOrder = new RunWorkOrder(
workOrder,
Expand All @@ -402,8 +400,8 @@ private void handleNewWorkOrder(
context,
frameContext,
makeRunWorkOrderListener(workOrder, controllerClient, criticalWarningCodes, maxVerboseParseExceptions),
MultiStageQueryContext.isReindex(queryContext),
MultiStageQueryContext.removeNullBytes(queryContext)
MultiStageQueryContext.isReindex(workOrder.getWorkerContext()),
MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext())
);

// Set up processorCloser (called when processing is done).
Expand Down Expand Up @@ -560,6 +558,13 @@ public ListenableFuture<InputStream> readStageOutput(
return getOrCreateStageOutputHolder(stageId, partitionNumber).readRemotelyFrom(offset);
}

/**
* Accept a new {@link WorkOrder} for execution.
*
* For backwards-compatibility purposes, this method populates {@link WorkOrder#getOutputChannelMode()}
* and {@link WorkOrder#getWorkerContext()} if the controller did not set them. (They are there for newer controllers,
* but not older ones.)
*/
@Override
public void postWorkOrder(final WorkOrder workOrder)
{
Expand All @@ -577,28 +582,11 @@ public void postWorkOrder(final WorkOrder workOrder)
);
}

final OutputChannelMode outputChannelMode;

// This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
if (workOrder.hasOutputChannelMode()) {
outputChannelMode = workOrder.getOutputChannelMode();
} else {
final MSQSelectDestination selectDestination =
task != null
? MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext()))
: MSQSelectDestination.TASKREPORT;

outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
workOrder.getQueryDefinition(),
workOrder.getStageNumber(),
selectDestination,
task != null && MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())),
false
);
}
final WorkOrder workOrderToUse = makeWorkOrderToUse(
workOrder,
task != null && task.getContext() != null ? QueryContext.of(task.getContext()) : QueryContext.empty()
);

final WorkOrder workOrderToUse = workOrder.withOutputChannelMode(outputChannelMode);
kernelManipulationQueue.add(
kernelHolders ->
kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse))
Expand Down Expand Up @@ -1009,6 +997,48 @@ private void doCancel()
);
}

/**
* Returns a work order based on the provided "originalWorkOrder", but where {@link WorkOrder#hasOutputChannelMode()}
* and {@link WorkOrder#hasWorkerContext()} are both true. If the original work order didn't have those fields, they
* are populated from the "taskContext". Otherwise the "taskContext" is ignored.
*
* This method can be removed once we can rely on these fields always being set in the WorkOrder.
* (They will be there for newer controllers; this is a backwards-compatibility method.)
*
* @param originalWorkOrder work order from controller
* @param taskContext task context
*/
static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder, @Nullable final QueryContext taskContext)
{
// This condition can be removed once we can rely on QueryContext always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
final QueryContext queryContext;
if (originalWorkOrder.hasWorkerContext()) {
queryContext = originalWorkOrder.getWorkerContext();
} else if (taskContext != null) {
queryContext = taskContext;
} else {
queryContext = QueryContext.empty();
}

// This stack of conditions can be removed once we can rely on OutputChannelMode always being in the WorkOrder.
// (It will be there for newer controllers; this is a backwards-compatibility thing.)
final OutputChannelMode outputChannelMode;
if (originalWorkOrder.hasOutputChannelMode()) {
outputChannelMode = originalWorkOrder.getOutputChannelMode();
} else {
outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
originalWorkOrder.getQueryDefinition(),
originalWorkOrder.getStageNumber(),
MultiStageQueryContext.getSelectDestination(queryContext),
MultiStageQueryContext.isDurableStorageEnabled(queryContext),
false
);
}

return originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode);
}

/**
* Log (at DEBUG level) a string explaining the status of all work assigned to this worker.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics;
Expand All @@ -66,6 +66,8 @@
*/
public class IndexerControllerContext implements ControllerContext
{
public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;

private static final Logger log = new Logger(IndexerControllerContext.class);

private final MSQControllerTask task;
Expand Down Expand Up @@ -96,21 +98,21 @@ public IndexerControllerContext(

@Override
public ControllerQueryKernelConfig queryKernelConfig(
final MSQSpec querySpec,
final QueryDefinition queryDef
final String queryId,
final MSQSpec querySpec
)
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
queryDef.getFinalStageDefinition().getMaxWorkerCount()
querySpec.getTuningConfig().getMaxNumWorkers()
);

final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters);

log.debug(
"Query[%s] using %s[%s], %s[%s], %s[%s].",
queryDef.getQueryId(),
queryId,
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
config.isDurableStorage(),
MultiStageQueryContext.CTX_FAULT_TOLERANCE,
Expand Down Expand Up @@ -210,15 +212,16 @@ public int defaultTargetPartitionsPerWorker()
}

/**
* Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests.
* Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used in tests.
*/
public static ControllerQueryKernelConfig makeQueryKernelConfig(
final MSQSpec querySpec,
final ControllerMemoryParameters memoryParameters
)
{
final QueryContext queryContext = querySpec.getQuery().context();
final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext);
final int maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext, DEFAULT_MAX_CONCURRENT_STAGES);
final boolean isFaultToleranceEnabled = MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
final boolean isDurableStorageEnabled;

Expand Down Expand Up @@ -256,9 +259,44 @@ public static ControllerQueryKernelConfig makeQueryKernelConfig(
.destination(querySpec.getDestination())
.maxConcurrentStages(maxConcurrentStages)
.maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes())
.workerContextMap(makeWorkerContextMap(querySpec, isDurableStorageEnabled, maxConcurrentStages))
.build();
}

/**
* Helper method for {@link #makeQueryKernelConfig} and {@link #makeTaskContext}. Makes the worker context map,
* i.e., the map that will become {@link WorkOrder#getWorkerContext()}.
*/
public static Map<String, Object> makeWorkerContextMap(
final MSQSpec querySpec,
final boolean durableStorageEnabled,
final int maxConcurrentStages
)
{
final QueryContext queryContext = querySpec.getQuery().context();
final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext);
final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(queryContext);
final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext);
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();

builder
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, durableStorageEnabled)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, maxConcurrentStages)
.put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
.put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters);

if (querySpec.getDestination().toSelectDestination() != null) {
builder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
querySpec.getDestination().toSelectDestination().getName()
);
}

return builder.build();
}

/**
* Helper method for {@link #newWorkerManager}, split out to be used in tests.
*
Expand All @@ -271,17 +309,16 @@ public static Map<String, Object> makeTaskContext(
)
{
final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = ImmutableMap.builder();
final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
final boolean removeNullBytes = MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());
final boolean includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context());

taskContextOverridesBuilder
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, queryKernelConfig.isDurableStorage())
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
.put(MultiStageQueryContext.CTX_IS_REINDEX, MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, queryKernelConfig.getMaxConcurrentStages())
.put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
.put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, includeAllCounters);
// Put worker context into the task context. That way, workers can get these context keys either from
// WorkOrder#getContext or Task#getContext.
taskContextOverridesBuilder.putAll(
makeWorkerContextMap(
querySpec,
queryKernelConfig.isDurableStorage(),
queryKernelConfig.getMaxConcurrentStages()
)
);

// Put the lookup loading info in the task context to facilitate selective loading of lookups.
if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) != null) {
Expand All @@ -297,13 +334,6 @@ public static Map<String, Object> makeTaskContext(
);
}

if (querySpec.getDestination().toSelectDestination() != null) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
querySpec.getDestination().toSelectDestination().getName()
);
}

// propagate the controller's tags to the worker task for enhanced metrics reporting
@SuppressWarnings("unchecked")
Map<String, Object> tags = (Map<String, Object>) controllerTaskContext.get(DruidMetrics.TAGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ public IndexerWorkerContext(
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;

final QueryContext queryContext = QueryContext.of(task.getContext());
this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStages(queryContext);
this.maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
queryContext,
IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
);
this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext);
}

Expand Down
Loading

0 comments on commit 3e43050

Please sign in to comment.