Skip to content

Commit

Permalink
Merge pull request #14697 from gaetancollaud/feature/grpc-context-blo…
Browse files Browse the repository at this point in the history
…cking-simple

Forward gRPC context when using @Blocking
  • Loading branch information
gsmet authored Feb 8, 2021
2 parents 3121bac + ca056c3 commit b4ab068
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 3 deletions.
5 changes: 5 additions & 0 deletions extensions/grpc/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Consumer;
import java.util.function.Function;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
Expand Down Expand Up @@ -96,11 +97,18 @@ synchronized void setDelegate(ServerCall.Listener<ReqT> delegate) {

private synchronized void executeOnContextOrEnqueue(Consumer<ServerCall.Listener<ReqT>> consumer) {
if (this.delegate != null) {
final Context grpcContext = Context.current();
vertx.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> f) {
consumer.accept(delegate);
f.complete();
final Context previous = Context.current();
grpcContext.attach();
try {
consumer.accept(delegate);
f.complete();
} finally {
grpcContext.detach(previous);
}
}
}, true, null);
} else {
Expand Down Expand Up @@ -139,4 +147,4 @@ public void onReady() {
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.quarkus.grpc.runtime.supports;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.vertx.core.Vertx;

class BlockingServerInterceptorTest {
public static final Context.Key<String> USERNAME = Context.key("username");

BlockingServerInterceptor blockingServerInterceptor;
Vertx vertx;

@BeforeEach
void setup() {
vertx = Vertx.vertx();
blockingServerInterceptor = new BlockingServerInterceptor(vertx, Arrays.asList("blocking"));
}

@Test
@Timeout(10)
void testContextPropagation() throws Exception {
final ServerCall serverCall = mock(ServerCall.class);
final BlockingServerCallHandler serverCallHandler = new BlockingServerCallHandler();
final MethodDescriptor methodDescriptor = mock(MethodDescriptor.class);
when(methodDescriptor.getFullMethodName()).thenReturn("my-service/blocking");
when(serverCall.getMethodDescriptor()).thenReturn(methodDescriptor);

// setting grpc context
final Context context = Context.current().withValue(USERNAME, "my-user");

final ServerCall.Listener listener = blockingServerInterceptor.interceptCall(serverCall, null, serverCallHandler);
serverCallHandler.awaitSetup();

// simulate GRPC call
context.wrap(() -> listener.onMessage("hello")).run();

// await for the message to be received
serverCallHandler.await();

// check that the thread is a worker thread
assertThat(serverCallHandler.threadName).contains("vert.x").contains("worker");

// check that the context was propagated correctly
assertThat(serverCallHandler.contextUserName).isEqualTo("my-user");
}

static class BlockingServerCallHandler implements ServerCallHandler {
String threadName;
String contextUserName;
private CountDownLatch latch = new CountDownLatch(1);
private CountDownLatch setupLatch = new CountDownLatch(1);

@Override
public ServerCall.Listener startCall(ServerCall serverCall, Metadata metadata) {
final ServerCall.Listener listener = new ServerCall.Listener() {
@Override
public void onMessage(Object message) {
threadName = Thread.currentThread().getName();
contextUserName = USERNAME.get();
super.onMessage(message);
latch.countDown();
}
};
setupLatch.countDown();
return listener;
}

public void awaitSetup() throws InterruptedException {
setupLatch.await();
Thread.sleep(100);
}

public void await() throws InterruptedException {
latch.await();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline

0 comments on commit b4ab068

Please sign in to comment.