Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL Reactive: RequestContext cleanup #27147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,40 @@ protected <O> O invokeAndTransform(
DataFetcherResult.Builder<Object> resultBuilder,
Object[] transformedArguments) throws Exception {

Uni<?> uni = handleUserMethodCall(dfe, transformedArguments);
return (O) uni
.onItemOrFailure()
.transformToUni((result, throwable, emitter) -> {
if (throwable != null) {
eventEmitter.fireOnDataFetchError(dfe.getExecutionId().toString(), throwable);
if (throwable instanceof GraphQLException) {
GraphQLException graphQLException = (GraphQLException) throwable;
errorResultHelper.appendPartialResult(resultBuilder, dfe, graphQLException);
} else if (throwable instanceof Exception) {
emitter.fail(SmallRyeGraphQLServerMessages.msg.dataFetcherException(operation, throwable));
return;
} else if (throwable instanceof Error) {
emitter.fail(throwable);
return;
}
} else {
try {
resultBuilder.data(fieldHelper.transformOrAdaptResponse(result, dfe));
} catch (AbstractDataFetcherException te) {
te.appendDataFetcherResult(resultBuilder, dfe);
ManagedContext requestContext = Arc.container().requestContext();
try {
RequestContextHelper.reactivate(requestContext, dfe);
Uni<?> uni = handleUserMethodCall(dfe, transformedArguments);
return (O) uni
.onItemOrFailure()
.transformToUni((result, throwable, emitter) -> {
if (throwable != null) {
eventEmitter.fireOnDataFetchError(dfe.getExecutionId().toString(), throwable);
if (throwable instanceof GraphQLException) {
GraphQLException graphQLException = (GraphQLException) throwable;
errorResultHelper.appendPartialResult(resultBuilder, dfe, graphQLException);
} else if (throwable instanceof Exception) {
emitter.fail(SmallRyeGraphQLServerMessages.msg.dataFetcherException(operation, throwable));
return;
} else if (throwable instanceof Error) {
emitter.fail(throwable);
return;
}
} else {
try {
resultBuilder.data(fieldHelper.transformOrAdaptResponse(result, dfe));
} catch (AbstractDataFetcherException te) {
te.appendDataFetcherResult(resultBuilder, dfe);
}
}
}

emitter.complete(resultBuilder.build());
})
.subscribe()
.asCompletionStage();
emitter.complete(resultBuilder.build());
})
.subscribe()
.asCompletionStage();
} finally {
requestContext.deactivate();
}
}

protected abstract Uni<?> handleUserMethodCall(DataFetchingEnvironment dfe, final Object[] transformedArguments)
Expand All @@ -76,7 +82,7 @@ protected <O> O invokeFailure(DataFetcherResult.Builder<Object> resultBuilder) {
protected CompletionStage<List<T>> invokeBatch(DataFetchingEnvironment dfe, Object[] arguments) {
ManagedContext requestContext = Arc.container().requestContext();
try {
BlockingHelper.reactivate(requestContext, dfe);
RequestContextHelper.reactivate(requestContext, dfe);
return handleUserBatchLoad(dfe, arguments)
.subscribe().asCompletionStage();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import java.util.concurrent.Callable;

import graphql.schema.DataFetchingEnvironment;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.ManagedContext;
import io.smallrye.graphql.schema.model.Execute;
import io.smallrye.graphql.schema.model.Operation;
import io.vertx.core.Context;
import io.vertx.core.Promise;

public class BlockingHelper {
public final class BlockingHelper {

private BlockingHelper() {
}

public static boolean blockingShouldExecuteNonBlocking(Operation operation, Context vc) {
// Rule is that by default this should execute blocking except if marked as non-blocking)
Expand All @@ -33,15 +33,4 @@ public static void runBlocking(Context vc, Callable<Object> contextualCallable,
}
}, result);
}

public static void reactivate(ManagedContext requestContext, DataFetchingEnvironment dfe) {
if (!requestContext.isActive()) {
Object maybeState = dfe.getGraphQlContext().getOrDefault("state", null);
if (maybeState != null) {
requestContext.activate((InjectableContext.ContextState) maybeState);
} else {
requestContext.activate();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,34 @@ public QuarkusDefaultDataFetcher(Operation operation, Type type) {
public <T> T invokeAndTransform(DataFetchingEnvironment dfe, DataFetcherResult.Builder<Object> resultBuilder,
Object[] transformedArguments) throws Exception {

Context vc = Vertx.currentContext();
if (runBlocking(dfe) || BlockingHelper.blockingShouldExecuteNonBlocking(operation, vc)) {
return super.invokeAndTransform(dfe, resultBuilder, transformedArguments);
} else {
return invokeAndTransformBlocking(dfe, resultBuilder, transformedArguments, vc);
ManagedContext requestContext = Arc.container().requestContext();
try {
RequestContextHelper.reactivate(requestContext, dfe);
Context vc = Vertx.currentContext();
if (runBlocking(dfe) || BlockingHelper.blockingShouldExecuteNonBlocking(operation, vc)) {
return super.invokeAndTransform(dfe, resultBuilder, transformedArguments);
} else {
return invokeAndTransformBlocking(dfe, resultBuilder, transformedArguments, vc);
}
} finally {
requestContext.deactivate();
}
}

@Override
public CompletionStage<List<T>> invokeBatch(DataFetchingEnvironment dfe, Object[] arguments) {

Context vc = Vertx.currentContext();
if (runBlocking(dfe) || BlockingHelper.blockingShouldExecuteNonBlocking(operation, vc)) {
return super.invokeBatch(dfe, arguments);
} else {
return invokeBatchBlocking(dfe, arguments, vc);
ManagedContext requestContext = Arc.container().requestContext();
try {
RequestContextHelper.reactivate(requestContext, dfe);
Context vc = Vertx.currentContext();
if (runBlocking(dfe) || BlockingHelper.blockingShouldExecuteNonBlocking(operation, vc)) {
return super.invokeBatch(dfe, arguments);
} else {
return invokeBatchBlocking(dfe, arguments, vc);
}
} finally {
requestContext.deactivate();
}
}

Expand Down Expand Up @@ -83,23 +95,19 @@ private <T> T invokeAndTransformBlocking(final DataFetchingEnvironment dfe, Data

@SuppressWarnings("unchecked")
private CompletionStage<List<T>> invokeBatchBlocking(DataFetchingEnvironment dfe, Object[] arguments, Context vc) {
ManagedContext requestContext = Arc.container().requestContext();
try {
BlockingHelper.reactivate(requestContext, dfe);
SmallRyeThreadContext threadContext = Arc.container().select(SmallRyeThreadContext.class).get();
final Promise<List<T>> result = Promise.promise();

// We need some make sure that we call given the context
Callable<Object> contextualCallable = threadContext.contextualCallable(() -> {
return (List<T>) operationInvoker.invokePrivileged(arguments);
});

// Here call blocking with context
BlockingHelper.runBlocking(vc, contextualCallable, result);
return result.future().toCompletionStage();
} finally {
requestContext.deactivate();
}

SmallRyeThreadContext threadContext = Arc.container().select(SmallRyeThreadContext.class).get();
final Promise<List<T>> result = Promise.promise();

// We need some make sure that we call given the context
Callable<Object> contextualCallable = threadContext.contextualCallable(() -> {
return (List<T>) operationInvoker.invokePrivileged(arguments);
});

// Here call blocking with context
BlockingHelper.runBlocking(vc, contextualCallable, result);
return result.future().toCompletionStage();

}

private boolean runBlocking(DataFetchingEnvironment dfe) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.smallrye.graphql.runtime.spi.datafetcher;

import graphql.schema.DataFetchingEnvironment;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.ManagedContext;

public final class RequestContextHelper {

private RequestContextHelper() {
}

public static void reactivate(ManagedContext requestContext, DataFetchingEnvironment dfe) {
if (!requestContext.isActive()) {
Object maybeState = dfe.getGraphQlContext().getOrDefault("state", null);
if (maybeState != null) {
requestContext.activate((InjectableContext.ContextState) maybeState);
} else {
requestContext.activate();
}
}
}
}