Skip to content

Commit

Permalink
fix(ServerCalls): Fix regression in Status cause for exceptions throw…
Browse files Browse the repository at this point in the history
…n by implementations (#456)

* fix regression in status cause for status exceptions thrown by implementations

* remove output

* rename tests

---------

Co-authored-by: James Ward <[email protected]>
  • Loading branch information
andrewparmet and jamesward authored Nov 16, 2023
1 parent 852257d commit 11c517d
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 7 deletions.
4 changes: 2 additions & 2 deletions stub/src/main/java/io/grpc/kotlin/ServerCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,20 @@ import io.grpc.ServerCallHandler
import io.grpc.ServerMethodDefinition
import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.channels.onFailure
import io.grpc.Metadata as GrpcMetadata

/**
Expand Down Expand Up @@ -262,6 +261,7 @@ object ServerCalls {
val closeStatus = when (failure) {
null -> Status.OK
is CancellationException -> Status.CANCELLED.withCause(failure)
is StatusException, is StatusRuntimeException -> Status.fromThrowable(failure)
else -> Status.fromThrowable(failure).withCause(failure)
}
val trailers = failure?.let { Status.trailersFromThrowable(it) } ?: GrpcMetadata()
Expand Down
115 changes: 110 additions & 5 deletions stub/src/test/java/io/grpc/kotlin/ServerCallsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import com.google.common.truth.Truth.assertThat
import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import io.grpc.*
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.GreeterGrpcKt
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineStub
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineImplBase
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -1015,15 +1013,18 @@ class ServerCallsTest : AbstractCallsTest() {
}

@Test
fun testStatusExceptionPropagatesStack() = runBlocking {
fun testPropagateStackTraceForStatusException() = runBlocking {
val thrownStatusCause = CompletableDeferred<Throwable?>()

val serverImpl = object : GreeterCoroutineImplBase() {
override suspend fun sayHello(request: HelloRequest): HelloReply {
internalServerCall()
}

private fun internalServerCall(): Nothing {
throw StatusException(Status.INTERNAL)
val exception = Exception("causal exception")
thrownStatusCause.complete(exception)
throw Status.INTERNAL.withCause(exception).asException()
}
}

Expand Down Expand Up @@ -1059,7 +1060,111 @@ class ServerCallsTest : AbstractCallsTest() {
assertThat(clientException.status.code).isEqualTo(Status.Code.INTERNAL)
val statusCause = receivedStatusCause.await()
// but the exception should propagate to server interceptors, with stack trace intact
assertThat(statusCause).isNotNull()
assertThat(statusCause).isEqualTo(thrownStatusCause.await())
assertThat(statusCause!!.stackTraceToString()).contains("internalServerCall")
}

@Test
fun testPropagateStackTraceForStatusRuntimeException() = runBlocking {
val thrownStatusCause = CompletableDeferred<Throwable?>()

val serverImpl = object : GreeterCoroutineImplBase() {
override suspend fun sayHello(request: HelloRequest): HelloReply {
internalServerCall()
}

private fun internalServerCall(): Nothing {
val exception = Exception("causal exception")
thrownStatusCause.complete(exception)
throw Status.INTERNAL.withCause(exception).asRuntimeException()
}
}

val receivedStatusCause = CompletableDeferred<Throwable?>()

val interceptor = object : ServerInterceptor {
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
requestHeaders: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
next.startCall(
object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
override fun close(status: Status, trailers: Metadata) {
receivedStatusCause.complete(status.cause)
super.close(status, trailers)
}
},
requestHeaders
)
}

val channel = makeChannel(serverImpl, interceptor)

val stub = GreeterGrpc.newBlockingStub(channel)
val clientException = assertThrows<StatusRuntimeException> {
stub.sayHello(helloRequest(""))
}

// the exception should not propagate to the client
assertThat(clientException.cause).isNull()

assertThat(clientException.status.code).isEqualTo(Status.Code.INTERNAL)
val statusCause = receivedStatusCause.await()
// but the exception should propagate to server interceptors, with stack trace intact
assertThat(statusCause).isEqualTo(thrownStatusCause.await())
assertThat(statusCause!!.stackTraceToString()).contains("internalServerCall")
}

@Test
fun testPropagateStackTraceForNonStatusException() = runBlocking {
val thrownStatusCause = CompletableDeferred<Throwable?>()

val serverImpl = object : GreeterCoroutineImplBase() {
override suspend fun sayHello(request: HelloRequest): HelloReply {
internalServerCall()
}

private fun internalServerCall(): Nothing {
val exception = Exception("causal exception")
thrownStatusCause.complete(exception)
throw exception
}
}

val receivedStatusCause = CompletableDeferred<Throwable?>()

val interceptor = object : ServerInterceptor {
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
requestHeaders: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
next.startCall(
object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
override fun close(status: Status, trailers: Metadata) {
receivedStatusCause.complete(status.cause)
super.close(status, trailers)
}
},
requestHeaders
)
}

val channel = makeChannel(serverImpl, interceptor)

val stub = GreeterGrpc.newBlockingStub(channel)
val clientException = assertThrows<StatusRuntimeException> {
stub.sayHello(helloRequest(""))
}

// the exception should not propagate to the client
assertThat(clientException.cause).isNull()

assertThat(clientException.status.code).isEqualTo(Status.Code.UNKNOWN)
val statusCause = receivedStatusCause.await()
// but the exception should propagate to server interceptors, with stack trace intact
assertThat(statusCause).isEqualTo(thrownStatusCause.await())
assertThat(statusCause!!.stackTraceToString()).contains("internalServerCall")
}

Expand Down

0 comments on commit 11c517d

Please sign in to comment.