Skip to content

Commit

Permalink
Bring retry queries to the beginning of the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
highker committed Apr 23, 2021
1 parent 2a2ac84 commit a34cf43
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public Optional<ErrorCode> getErrorCode()
return Optional.ofNullable(basicQueryInfo.getErrorCode());
}

@Override
public boolean isRetry()
{
return false;
}

@Override
public void recordHeartbeat() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,24 @@ public class LocalDispatchQuery
private final Consumer<QueryExecution> querySubmitter;
private final SettableFuture<?> submitted = SettableFuture.create();

private final boolean retry;

public LocalDispatchQuery(
QueryStateMachine stateMachine,
QueryMonitor queryMonitor,
ListenableFuture<QueryExecution> queryExecutionFuture,
ClusterSizeMonitor clusterSizeMonitor,
Executor queryExecutor,
Consumer<QueryExecution> querySubmitter)
Consumer<QueryExecution> querySubmitter,
boolean retry)
{
this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.queryExecutionFuture = requireNonNull(queryExecutionFuture, "queryExecutionFuture is null");
this.clusterSizeMonitor = requireNonNull(clusterSizeMonitor, "clusterSizeMonitor is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.querySubmitter = requireNonNull(querySubmitter, "querySubmitter is null");
this.retry = retry;

addExceptionCallback(queryExecutionFuture, throwable -> {
if (stateMachine.transitionToFailed(throwable)) {
Expand Down Expand Up @@ -261,6 +265,12 @@ public Optional<ErrorCode> getErrorCode()
return stateMachine.getFailureInfo().map(ExecutionFailureInfo::getErrorCode);
}

@Override
public boolean isRetry()
{
return retry;
}

@Override
public void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public DispatchQuery createDispatchQuery(
queryExecutionFuture,
clusterSizeMonitor,
executor,
queryManager::createQuery);
queryManager::createQuery,
retryCount > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ public interface ManagedQueryExecution
* @return Returns non-empty value iff error has occurred and query failed state is visible.
*/
Optional<ErrorCode> getErrorCode();

boolean isRetry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class InternalResourceGroup
@GuardedBy("root")
private final Set<InternalResourceGroup> dirtySubGroups = new HashSet<>();
@GuardedBy("root")
private UpdateablePriorityQueue<ManagedQueryExecution> queuedQueries = new FifoQueue<>();
private TieredQueue<ManagedQueryExecution> queuedQueries = new TieredQueue<>(FifoQueue::new);
@GuardedBy("root")
private final Set<ManagedQueryExecution> runningQueries = new HashSet<>();
@GuardedBy("root")
Expand Down Expand Up @@ -522,27 +522,27 @@ public void setSchedulingPolicy(SchedulingPolicy policy)

// Switch to the appropriate queue implementation to implement the desired policy
Queue<InternalResourceGroup> queue;
UpdateablePriorityQueue<ManagedQueryExecution> queryQueue;
TieredQueue<ManagedQueryExecution> queryQueue;
switch (policy) {
case FAIR:
queue = new FifoQueue<>();
queryQueue = new FifoQueue<>();
queryQueue = new TieredQueue<>(FifoQueue::new);
break;
case WEIGHTED:
queue = new StochasticPriorityQueue<>();
queryQueue = new StochasticPriorityQueue<>();
queryQueue = new TieredQueue<>(StochasticPriorityQueue::new);
break;
case WEIGHTED_FAIR:
queue = new WeightedFairQueue<>();
queryQueue = new IndexedPriorityQueue<>();
queryQueue = new TieredQueue<>(IndexedPriorityQueue::new);
break;
case QUERY_PRIORITY:
// Sub groups must use query priority to ensure ordering
for (InternalResourceGroup group : subGroups.values()) {
group.setSchedulingPolicy(QUERY_PRIORITY);
}
queue = new IndexedPriorityQueue<>();
queryQueue = new IndexedPriorityQueue<>();
queryQueue = new TieredQueue<>(IndexedPriorityQueue::new);
break;
default:
throw new UnsupportedOperationException("Unsupported scheduling policy: " + policy);
Expand Down Expand Up @@ -659,8 +659,15 @@ public void run(ManagedQueryExecution query)
private void enqueueQuery(ManagedQueryExecution query)
{
checkState(Thread.holdsLock(root), "Must hold lock to enqueue a query");

synchronized (root) {
queuedQueries.addOrUpdate(query, getQueryPriority(query.getSession()));
int priority = getQueryPriority(query.getSession());
if (query.isRetry()) {
queuedQueries.prioritize(query, priority);
}
else {
queuedQueries.addOrUpdate(query, priority);
}
InternalResourceGroup group = this;
while (group.parent.isPresent()) {
group.parent.get().descendantQueuedQueries++;
Expand Down Expand Up @@ -877,7 +884,7 @@ private int getHighestQueryPriority()
{
checkState(Thread.holdsLock(root), "Must hold lock");
synchronized (root) {
checkState(queuedQueries instanceof IndexedPriorityQueue, "Queued queries not ordered");
checkState(queuedQueries.getLowPriorityQueue() instanceof IndexedPriorityQueue, "Queued queries not ordered");
if (queuedQueries.isEmpty()) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.resourceGroups;

import java.util.Iterator;
import java.util.function.Supplier;

import static com.google.common.collect.Iterators.concat;
import static java.util.Objects.requireNonNull;

final class TieredQueue<E>
implements UpdateablePriorityQueue<E>
{
private final UpdateablePriorityQueue<E> highPriorityQueue;
private final UpdateablePriorityQueue<E> lowPriorityQueue;

public TieredQueue(UpdateablePriorityQueue<E> highPriorityQueue, UpdateablePriorityQueue<E> lowPriorityQueue)
{
this.highPriorityQueue = requireNonNull(highPriorityQueue, "highPriorityQueue is null");
this.lowPriorityQueue = requireNonNull(lowPriorityQueue, "lowPriorityQueue is null");
}

public TieredQueue(Supplier<UpdateablePriorityQueue<E>> supplier)
{
this(supplier.get(), supplier.get());
}

@Override
public boolean addOrUpdate(E element, long priority)
{
return lowPriorityQueue.addOrUpdate(element, priority);
}

public boolean prioritize(E element, long priority)
{
return highPriorityQueue.addOrUpdate(element, priority);
}

@Override
public boolean contains(E element)
{
return highPriorityQueue.contains(element) || lowPriorityQueue.contains(element);
}

@Override
public boolean remove(E element)
{
boolean highPriorityRemoved = highPriorityQueue.remove(element);
boolean lowPriorityRemoved = lowPriorityQueue.remove(element);
return highPriorityRemoved || lowPriorityRemoved;
}

@Override
public E poll()
{
Iterator<E> iterator = iterator();
if (!iterator.hasNext()) {
return null;
}
E element = iterator.next();
iterator.remove();
return element;
}

@Override
public E peek()
{
Iterator<E> iterator = iterator();
if (!iterator.hasNext()) {
return null;
}
return iterator.next();
}

@Override
public int size()
{
return highPriorityQueue.size() + lowPriorityQueue.size();
}

@Override
public boolean isEmpty()
{
return highPriorityQueue.isEmpty() && lowPriorityQueue.isEmpty();
}

@Override
public Iterator<E> iterator()
{
return concat(highPriorityQueue.iterator(), lowPriorityQueue.iterator());
}

public UpdateablePriorityQueue<E> getLowPriorityQueue()
{
return lowPriorityQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void testSimpleExecutionCreationFailure()
immediateFailedFuture(new IllegalStateException("abc")),
createClusterSizeMonitor(0),
directExecutor(),
execution -> {});
execution -> {},
false);

assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), GENERIC_INTERNAL_ERROR.toErrorCode());
Expand All @@ -106,7 +107,8 @@ public void testQueryQueuedExceptionBeforeDispatch()
immediateFailedFuture(new IllegalStateException("abc")),
createClusterSizeMonitor(0),
directExecutor(),
execution -> {});
execution -> {},
false);

assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), QUERY_QUEUE_FULL.toErrorCode());
Expand All @@ -127,7 +129,8 @@ public void testErrorInQuerySubmitter()
directExecutor(),
execution -> {
throw new AccessDeniedException("sdf");
});
},
false);

assertEquals(query.getBasicQueryInfo().getState(), QUEUED);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
Expand All @@ -154,7 +157,8 @@ public void testTimeOutWaitingForClusterResources()
immediateFuture(null),
createClusterSizeMonitor(1),
directExecutor(),
execution -> {});
execution -> {},
false);

assertEquals(query.getBasicQueryInfo().getState(), QUEUED);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
Expand Down Expand Up @@ -182,7 +186,8 @@ public void testQueryCancellation()
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
execution -> {});
execution -> {},
false);

assertEquals(query.getBasicQueryInfo().getState(), QUEUED);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
Expand All @@ -208,7 +213,8 @@ public void testQueryDispatched()
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
execution -> {});
execution -> {},
false);

assertEquals(query.getBasicQueryInfo().getState(), QUEUED);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public Optional<ErrorCode> getErrorCode()
return Optional.empty();
}

@Override
public boolean isRetry()
{
return false;
}

@Override
public BasicQueryInfo getBasicQueryInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ public void testStochasticPriorityQueue()
assertTrue(populateAndExtract(new StochasticPriorityQueue<>()).size() == 3);
}

@Test
public void testTieredQueue()
{
TieredQueue<Integer> queue = new TieredQueue<>(FifoQueue::new);
assertEquals(populateAndExtract(queue), ImmutableList.of(1, 2, 3));

queue.prioritize(4, 0);
queue.prioritize(5, 0);
assertEquals(populateAndExtract(queue), ImmutableList.of(4, 5, 1, 2, 3));
}

private static List<Integer> populateAndExtract(UpdateablePriorityQueue<Integer> queue)
{
queue.addOrUpdate(1, 1);
Expand Down

0 comments on commit a34cf43

Please sign in to comment.