Skip to content

Commit

Permalink
Restore non-cancellable DispatchQueryCreationFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Jun 24, 2021
1 parent ea5d988 commit 2ee7c94
Showing 1 changed file with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.dispatcher;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.Session;
Expand Down Expand Up @@ -49,7 +50,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static io.trino.execution.QueryState.QUEUED;
import static io.trino.execution.QueryState.RUNNING;
import static io.trino.spi.StandardErrorCode.QUERY_TEXT_TOO_LARGE;
Expand Down Expand Up @@ -142,7 +142,19 @@ public ListenableFuture<Void> createQuery(QueryId queryId, Slug slug, SessionCon
checkArgument(!query.isEmpty(), "query must not be empty string");
checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);

return nonCancellationPropagating(Futures.submit(() -> createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager), dispatchExecutor));
// It is important to return a future implementation which ignores cancellation request.
// Using NonCancellationPropagatingFuture is not enough; it does not propagate cancel to wrapped future
// but it would still return true on call to isCancelled() after cancel() is called on it.
DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
dispatchExecutor.execute(() -> {
try {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
finally {
queryCreationFuture.set(null);
}
});
return queryCreationFuture;
}

/**
Expand Down Expand Up @@ -312,4 +324,27 @@ public void failQuery(QueryId queryId, Throwable cause)
queryTracker.tryGetQuery(queryId)
.ifPresent(query -> query.fail(cause));
}

private static class DispatchQueryCreationFuture
extends AbstractFuture<Void>
{
@Override
protected boolean set(Void value)
{
return super.set(value);
}

@Override
protected boolean setException(Throwable throwable)
{
return super.setException(throwable);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
// query submission cannot be canceled
return false;
}
}
}

0 comments on commit 2ee7c94

Please sign in to comment.