-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve API for bidi and server streaming calls #130
Conversation
Instead of requiring callers to handle oneOf(Headers,Message,Trailers) objects in each bidi or server streaming call, instead just change the response channel to return the response message type. If an error occurs at the end of the call (due to non-zero grpc-status), then cancel the channel with an exception.
* This allows us to easily verify headers, messages, trailers, and errors without having to use fold/maybeFold | ||
* manually in each location. | ||
*/ | ||
private suspend fun <Output> streamResults(channel: ReceiveChannel<StreamResult<Output>>): ServerStreamingResult<Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now users don't need to handle this complexity to use bidi or server streaming calls.
}, | ||
) | ||
for (response in stream.responseChannel()) { | ||
println(response.sentence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an example of the improved API experience for callers.
*/ | ||
suspend fun receiveAndClose(): ResponseMessage<Output> | ||
suspend fun receiveAndClose(): Output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now all of the streaming interfaces work the same.
if (e is SocketTimeoutException) { | ||
onResult(StreamResult.Complete(Code.DEADLINE_EXCEEDED, cause = e)) | ||
return@runBlocking | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discovered via additional testing - we weren't surfacing SocketTimeoutException as DEADLINE_EXCEEDED.
onCompletion = { result -> | ||
val streamTrailers = result.trailers | ||
val error = result.connectException() | ||
StreamResult.Complete(error?.code ?: Code.OK, cause = error, streamTrailers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mapping is unnecessary for connect calls and was dropping non-connect exceptions (like SocketTimeoutException) and turning them into Code.OK.
val message = resultChannel.receive() | ||
val additionalMessage = resultChannel.receiveCatching() | ||
if (additionalMessage.isSuccess) { | ||
throw ConnectException(code = Code.UNKNOWN, message = "unary stream has multiple messages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to that we'd suggest the reason is unknown (per the code) when we know what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches the behavior from connect-go: https://github.com/connectrpc/connect-go/blob/96effedc8ac84da9f49392e5f7bdfee2e648247b/connect.go#L367C1-L368
I think in this case, someone has broken the contract of what a client streaming call should adhere to, so we return UNKNOWN. I don't think any other codes make sense for us to return here, as they're typically used by server implementations.
}, | ||
) | ||
val message = resultChannel.receive() | ||
val additionalMessage = resultChannel.receiveCatching() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand why we have message
and additionalMessage
here - can you elaborate on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For client only streaming, the server should send only a single response. This just ensures that a server implementation doesn't send > 1 response (this validation was missing before).
*/ | ||
fun resultChannel(): ReceiveChannel<StreamResult<Output>> | ||
fun responseChannel(): ReceiveChannel<Output> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean consumers have no way of reading stream response headers? Where is StreamResult
used now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumers can continue to access stream response headers from interceptors - I've opened #131 with some ideas on making it easier for people to access these based on patterns in connect-go.
After the conversation on connectrpc/connect-kotlin#130, this PR makes a few changes to demonstrate ways in which consumers can more easily iterate over results from an async stream. It also introduces a convenience accessor for `StreamResult` for these use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think overall these changes are fine, but definitely think #131 should be prioritized after merging this.
Can we delete StreamResult
now?
I also want to call out that Swift provides a similar StreamResult
interface, but I think it's easier for clients to consume with Swift's APIs. To demonstrate: connectrpc/connect-swift#202
Would also like to see if @jzbrooks has any feedback on this interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to @pkwarren for tagging me on Slack to look at this. I like the simplification here, and it will definitely help some of my coworkers who were having trouble understanding our app code for bidi streaming.
I will also take a look at #131, but in practice we don't currently have a need for it in our apps.
Not at the moment - it is still used by the interceptor API. |
After the conversation on connectrpc/connect-kotlin#130, this PR makes a few changes to demonstrate ways in which consumers can easily iterate over results from an async stream. It also introduces a convenience accessor for `StreamResult` for these use cases.
Instead of requiring callers to handle oneOf(Headers,Message,Trailers) objects in each bidi or server streaming call, instead just change the response channel to return the response message type. If an error occurs at the end of the call (due to non-zero grpc-status), then cancel the channel with an exception.