Skip to content

Commit

Permalink
Adding some more java docs and fixing spot bugs, intellij inspections
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptoe committed Nov 13, 2022
1 parent c92b217 commit 58da3cf
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public ControllerImpl(
{
this.task = task;
this.context = context;
this.isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context());
this.isDurableStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec()
.getQuery()
.context());

}

Expand Down Expand Up @@ -583,6 +585,10 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
return queryDef;
}

/**
* Adds the workorders for worker to {@link ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel} determines that there
* are work orders which needs reprocessing.
*/
private void addToRetryQueue(ControllerQueryKernel kernel, int worker, MSQFault fault)
{
List<WorkOrder> retriableWorkOrders = kernel.getWorkInCaseWorkerElgibileForRetryElseThrow(worker, fault);
Expand All @@ -599,6 +605,11 @@ private void addToRetryQueue(ControllerQueryKernel kernel, int worker, MSQFault
return workOrders;
}
});
} else {
log.info(
"Worker[%d] has no active workOrders that need relaunch therefore not relaunching",
worker
);
}
}

Expand Down Expand Up @@ -1031,14 +1042,14 @@ private Int2ObjectMap<List<SegmentIdWithShardSpec>> makeSegmentGeneratorWorkerFa
* @param queryKernel
* @param contactFn
* @param workers set of workers to contact
* @param succescCallBack on successfull api call, custom callback
* @param successCallBack on successfull api call, custom callback
* @param retryOnFailure if set to true, adds this worker to retry queue. If false, cancel all the futures and propergate the exception to the caller.
*/
private void contactWorkersForStage(
final ControllerQueryKernel queryKernel,
final TaskContactFn<Void> contactFn,
final TaskContactFn contactFn,
final IntSet workers,
final OnSuccess succescCallBack,
final TaskContactSuccesss successCallBack,
final boolean retryOnFailure
)
{
Expand All @@ -1063,7 +1074,7 @@ private void contactWorkersForStage(
@Override
public void onSuccess(@Nullable Void result)
{
succescCallBack.onSuccess(taskId, workerNumber);
successCallBack.onSuccess(taskId, workerNumber);
settableFuture.set(true);
}

Expand Down Expand Up @@ -1964,7 +1975,7 @@ private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(

/**
* Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments.
*
* <p>
* Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption.
*/
static void performSegmentPublish(
Expand Down Expand Up @@ -2101,13 +2112,17 @@ private Pair<ControllerQueryKernel, ListenableFuture<?>> run() throws IOExceptio
return Pair.of(queryKernel, workerTaskLauncherFuture);
}


private void retryFailedTasks() throws InterruptedException
{
// if no work orders to rety skip
if (workOrdersToRetry.size() == 0) {
return;
}
Set<Integer> workersNeedToBeFullyStarted = new HashSet<>();

// transform work orders from map<Worker,Set<WorkOrders> to Map<StageId,Map<Worker,WorkOrder>>
// since we would want workOrders of processed per stage
Map<StageId, Map<Integer, WorkOrder>> stageWorkerOrders = new HashMap<>();

for (Map.Entry<Integer, Set<WorkOrder>> workerStages : workOrdersToRetry.entrySet()) {
Expand All @@ -2126,6 +2141,7 @@ private void retryFailedTasks() throws InterruptedException
}
}

// wait till the workers identified above are fully ready
workerTaskLauncher.waitUntilWorkersReady(workersNeedToBeFullyStarted);

for (Map.Entry<StageId, Map<Integer, WorkOrder>> stageWorkOrders : stageWorkerOrders.entrySet()) {
Expand All @@ -2139,6 +2155,8 @@ private void retryFailedTasks() throws InterruptedException
new IntArraySet(stageWorkOrders.getValue().keySet()),
(taskId, workerNumber) -> {
queryKernel.workOrdersSentForWorker(stageWorkOrders.getKey(), workerNumber);

// remove sucessfully contacted workOrders from workOrdersToRetry
workOrdersToRetry.compute(workerNumber, (task, workOrderSet) -> {
if (workOrderSet == null || workOrderSet.size() == 0 || !workOrderSet.remove(stageWorkOrders.getValue()
.get(
Expand Down Expand Up @@ -2280,10 +2298,12 @@ private void sendPartitionBoundaries()
&& queryKernel.doesStageHaveResultPartitions(stageId)) {
IntSet workersToSendPartitionBoundaries = queryKernel.getWorkersToSendPartitionBoundaries(stageId);
if (workersToSendPartitionBoundaries.isEmpty()) {
log.debug("No workers for stage[%s] ready to recieve partition boundaries", stageId);
return;
}
final ClusterByPartitions partitions = queryKernel.getResultPartitionBoundariesForStage(stageId);

if (log.isDebugEnabled()) {
final ClusterByPartitions partitions = queryKernel.getResultPartitionBoundariesForStage(stageId);
log.debug(
"Query [%s] sending out partition boundaries for stage %d: %s for workers %s",
stageId.getQueryId(),
Expand All @@ -2306,7 +2326,7 @@ private void sendPartitionBoundaries()
queryKernel,
queryDef,
stageId.getStageNumber(),
queryKernel.getResultPartitionBoundariesForStage(stageId),
partitions,
workersToSendPartitionBoundaries
);
}
Expand Down Expand Up @@ -2396,17 +2416,17 @@ private void throwKernelExceptionIfNotUnknown()
/**
* Interface used by {@link #contactWorkersForStage}.
*/
private interface TaskContactFn<Void>
private interface TaskContactFn
{
ListenableFuture<Void> contactTask(WorkerClient client, String taskId, int workerNumber);
}


private interface OnSuccess
/**
* Interface used when {@link TaskContactFn#contactTask(WorkerClient, String, int)} return future is successfull.
*/
private interface TaskContactSuccesss
{
void onSuccess(String taskId, int workerNumber);

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ public class Limits
/**
* Maximum worker retries. Initial run is not a retry. The worker will be spawned 1 + retryNumber of times before erroring out.
*/
public static int WORKER_RETRY_LIMIT = 2;
public static final int WORKER_RETRY_LIMIT = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public int getRetryCount()
return retryCount;
}

/**
* Creates a new retry {@link MSQWorkerTask} with the same context as the current task, but with the retry count
* incremented by 1
*/
public MSQWorkerTask getRetryTask()
{
return new MSQWorkerTask(controllerTaskId, getDataSource(), workerNumber, getContext(), retryCount + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private enum State
// retry worker set
private final Set<Integer> retryWorkerSet = ConcurrentHashMap.newKeySet();

private final Map<Integer, List<String>> workerToTaskIds = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new ConcurrentHashMap<>();
private final RetryTask retryTask;

public MSQWorkerTaskLauncher(
Expand Down Expand Up @@ -528,8 +528,6 @@ private void relaunchTasks()
}

MSQWorkerTask toRetry = tracker.msqWorkerTask;


MSQWorkerTask retryTask = toRetry.getRetryTask();

// check retry limits
Expand All @@ -541,6 +539,7 @@ private void relaunchTasks()
taskTrackers.put(retryTask.getId(), new TaskTracker(retryTask.getWorkerNumber(), retryTask));
context.workerManager().run(retryTask.getId(), retryTask);
taskHistory.add(retryTask.getId());

synchronized (taskIds) {
// replace taskId with the retry taskID for the same worker number
taskIds.set(toRetry.getWorkerNumber(), retryTask.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int workerNu
}

/**
* Gets all the stages currently being tracked and filtres out all effectively finished stages.
* Gets all the stages currently being tracked and filters out all effectively finished stages.
* <br/>
* From the remaining stages, checks if (stage,worker) needs to be retried.
* <br/>
Expand Down

0 comments on commit 58da3cf

Please sign in to comment.