Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlin: grpc codec #472

Merged
merged 23 commits into from
Oct 17, 2019
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repos:
- id: detect-private-key
- id: mixed-line-ending
- repo: https://github.com/codespell-project/codespell
rev: v1.15.0
rev: v1.16.0
hooks:
- id: codespell
args: [-L uint]
4 changes: 4 additions & 0 deletions library/kotlin/src/io/envoyproxy/envoymobile/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ envoy_mobile_kt_library(
"EnvoyClientBuilder.kt",
"EnvoyException.kt",
"EnvoyStreamEmitter.kt",
"GRPCClient.kt",
"GRPCRequestBuilder.kt",
"GRPCResponseHandler.kt",
"GRPCStreamEmitter.kt",
"HTTPClient.kt",
"Request.kt",
"RequestBuilder.kt",
Expand Down
27 changes: 27 additions & 0 deletions library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.envoyproxy.envoymobile

// 1 byte for the compression flag, 4 bytes for the message length (int)
const val GRPC_PREFIX_LENGTH = 5

/**
* Client that supports sending and receiving gRPC traffic.
*
* @param httpClient The HTTP client to use for gRPC streams.
*/
class GRPCClient(
private val httpClient: HTTPClient
) {

/**
* Send a gRPC request with the provided handler.
*
* @param request The outbound gRPC request. See `GRPCRequestBuilder` for creation.
* @param handler Handler for receiving responses.
*
* @returns GRPCStreamEmitter, An emitter that can be used for sending more traffic over the stream.
*/
fun send(request: Request, grpcResponseHandler: GRPCResponseHandler): GRPCStreamEmitter {
val emitter = httpClient.send(request, grpcResponseHandler.underlyingHandler)
return GRPCStreamEmitter(emitter)
}
}
80 changes: 80 additions & 0 deletions library/kotlin/src/io/envoyproxy/envoymobile/GRPCRequestBuilder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.envoyproxy.envoymobile

/**
* Builder used for creating new gRPC `Request` instances.
*/
class GRPCRequestBuilder(
path: String,
authority: String,
useHTTPS: Boolean
) {
private val underlyingBuilder: RequestBuilder = RequestBuilder(
method = RequestMethod.POST,
scheme = if (useHTTPS) "https" else "http",
authority = authority,
path = path)

init {
underlyingBuilder.addHeader("content-type", "application/grpc")
}

/**
* Append a value to the header key.
*
* @param name the header key.
* @param value the value associated to the header key.
* @return this builder.
*/
fun addHeader(name: String, value: String): GRPCRequestBuilder {
underlyingBuilder.addHeader(name, value)
return this
}

/**
* Remove the value in the specified header.
*
* @param name the header key to remove.
* @param value the value to be removed.
* @return this builder.
*/
fun removeHeader(name: String, value: String): GRPCRequestBuilder {
underlyingBuilder.removeHeader(name, value)
return this
}

/**
* Remove all headers with this name.
*
* @param name the header key to remove.
* @return this builder.
*/
fun removeHeaders(name: String): GRPCRequestBuilder {
underlyingBuilder.removeHeaders(name)
return this
}

/**
* Add a specific timeout for the gRPC request. This will be sent in the `grpc-timeout` header.
*
* @param timeoutMS Timeout, in milliseconds.
* @return this builder.
*/
fun addTimeoutMS(timeoutMS: Int?): GRPCRequestBuilder {
val headerName = "grpc-timeout"
if (timeoutMS == null) {
removeHeaders(headerName)
} else {
addHeader(headerName, "${timeoutMS}m")
}
return this
}

/**
* Creates the Request object using the data set in the builder.
*
* @return the Request object.
*/
fun build(): Request {
return underlyingBuilder.build()
}
}
175 changes: 175 additions & 0 deletions library/kotlin/src/io/envoyproxy/envoymobile/GRPCResponseHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package io.envoyproxy.envoymobile

import io.envoyproxy.envoymobile.engine.types.EnvoyError
import io.envoyproxy.envoymobile.engine.types.EnvoyErrorCode
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.Executor


class GRPCResponseHandler(
val executor: Executor
) {

/**
* Represents the process state of the response stream's body data.
*/
private sealed class ProcessState {
// Awaiting a gRPC compression flag.
object CompressionFlag : ProcessState()

// Awaiting the length specification of the next message.
object MessageLength : ProcessState()

// Awaiting a message with the specified length.
class Message(val messageLength: Int) : ProcessState()
}

internal val underlyingHandler: ResponseHandler = ResponseHandler(executor)

private var errorClosure: (error: EnvoyError) -> Unit = { }
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Specify a callback for when response headers are received by the stream.
*
* @param closure: Closure which will receive the headers, status code,
* and flag indicating if the stream is complete.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onHeaders(closure: (headers: Map<String, List<String>>, statusCode: Int) -> Unit): GRPCResponseHandler {
underlyingHandler.onHeaders { headers, _, _ ->
val grpcStatus = headers["grpc-status"]?.first()?.toIntOrNull() ?: 0
closure(headers, grpcStatus)
}
return this
}

/**
* Specify a callback for when a data frame is received by the stream.
*
* @param closure: Closure which will receive the data,
* and flag indicating if the stream is complete.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onMessage(closure: (byteBuffer: ByteBuffer) -> Unit): GRPCResponseHandler {
val byteBufferedOutputStream = ByteArrayOutputStream()
var processState: ProcessState = ProcessState.CompressionFlag
underlyingHandler.onData { byteBuffer, _ ->

val byteBufferArray = if (byteBuffer.hasArray()) {
byteBuffer.array()
} else {
val array = ByteArray(byteBuffer.remaining())
byteBuffer.get(array)
array
}
byteBufferedOutputStream.write(byteBufferArray)

processState = processData(byteBufferedOutputStream, processState, closure)
}

return this
}

/**
* Specify a callback for when trailers are received by the stream.
* If the closure is called, the stream is complete.
*
* @param closure: Closure which will receive the trailers.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onTrailers(closure: (trailers: Map<String, List<String>>) -> Unit): GRPCResponseHandler {
buildbreaker marked this conversation as resolved.
Show resolved Hide resolved
underlyingHandler.onTrailers(closure)
return this
}

/**
* Specify a callback for when an internal Envoy exception occurs with the stream.
* If the closure is called, the stream is complete.
*
* @param closure: Closure which will be called when an error occurs.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onError(closure: (error: EnvoyError) -> Unit): GRPCResponseHandler {
buildbreaker marked this conversation as resolved.
Show resolved Hide resolved
this.errorClosure = closure
underlyingHandler.onError(closure)
return this
}

/**
* Recursively processes a buffer of data, buffering it into messages based on state.
* When a message has been fully buffered, `onMessage` will be called with the message.
*
* @param bufferedStream The buffer of data from which to determine state and messages.
* @param processState The current process state of the buffering.
* @param onMessage Closure to call when a new message is available.
*/
private fun processData(
bufferedStream: ByteArrayOutputStream,
processState: ProcessState,
onMessage: (byteBuffer: ByteBuffer) -> Unit): ProcessState {

var nextState = processState

when (processState) {
is ProcessState.CompressionFlag -> {
val byteArray = bufferedStream.toByteArray()
if (byteArray.isEmpty()) {
// We don't have enough information to extract the compression flag, so we'll just return
return ProcessState.CompressionFlag
}

val compressionFlag = byteArray[0]
// TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501
if (compressionFlag.compareTo(0) != 0) {
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
errorClosure(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can update iOS with this functionality as well (bailing and calling onError if this happens)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The important detail here is that we still keep the stream alive 😬

If we can safely close the stream, that would be much much better.

I just left it like this for now since I do consider this a volatile feature and the behavior is a bit up in the air with respect to messages which are gRPC compressed. If we appropriately close the stream, it feels more of a intentional design to exclude compression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just call close() here and call the consumer with onError? I think that'd be a good way to shut down safely

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't close() only propagate downward to the user rather than upward to Envoy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also don't have a close() handler in the response case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized why I didn't do this originally on iOS. Storing the errorClosure on the GRPCResponseHandler is problematic because nothing necessarily keeps the GRPCResponseHandler in memory since only the closures need to stick around. When I ended up using the GRPCResponseHandler in practice, this was totally fine because the closures passed to the underlying callbacks have their own lifecycle. Maybe we should just assert here instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EnvoyError(
EnvoyErrorCode.ENVOY_UNDEFINED_ERROR,
"Unable to read compressed gRPC response message"))

// no op the current onData and clean up
underlyingHandler.onHeaders { _, _, _ -> }
underlyingHandler.onData { _, _ -> }
buildbreaker marked this conversation as resolved.
Show resolved Hide resolved
underlyingHandler.onTrailers { }
underlyingHandler.onError { }
bufferedStream.reset()
}

nextState = ProcessState.MessageLength
}
is ProcessState.MessageLength -> {
if (bufferedStream.size() < GRPC_PREFIX_LENGTH) {
// We don't have enough information to extract the message length, so we'll just return
return ProcessState.MessageLength
}

val byteArray = bufferedStream.toByteArray()
val buffer = ByteBuffer.wrap(byteArray.sliceArray(1..4))
buffer.order(ByteOrder.BIG_ENDIAN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the language does the right thing here and does the conversion to native format when you call .int below this should be fine

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it will write it in big endian here

val messageLength = buffer.int
nextState = ProcessState.Message(messageLength)
}
is ProcessState.Message -> {
if (bufferedStream.size() < processState.messageLength + GRPC_PREFIX_LENGTH) {
// We don't have enough bytes to construct the message, so we'll just return
return ProcessState.Message(processState.messageLength)
}

val byteArray = bufferedStream.toByteArray()
onMessage(ByteBuffer.wrap(
byteArray.sliceArray(GRPC_PREFIX_LENGTH until GRPC_PREFIX_LENGTH + processState.messageLength)))
buildbreaker marked this conversation as resolved.
Show resolved Hide resolved
bufferedStream.reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this end up doing additional copying when we reset then re-write bytes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does an additional copy. If it helps any, we do use ByteArrayOutputStream as our unoptimized ByteArray I/O sink. We can revisit this strategy pretty easily further down the road

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding a todo with optimization notes 🤷‍♂

bufferedStream.write(
byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size))

if (byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size).isEmpty()) {
return ProcessState.CompressionFlag
} else {
nextState = ProcessState.CompressionFlag
}
}
}

return processData(bufferedStream, nextState, onMessage)
}
}
41 changes: 41 additions & 0 deletions library/kotlin/src/io/envoyproxy/envoymobile/GRPCStreamEmitter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer
import java.nio.ByteOrder


class GRPCStreamEmitter(
private val emitter: StreamEmitter
) {
/**
* Send a protobuf messageData's binary data over the gRPC stream.
*
* @param messageData Binary data of a protobuf messageData to send.
*/
fun sendMessage(messageData: ByteBuffer) {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// Length-Prefixed-Message = Compressed-Flag | Message-Length | Message
// Compressed-Flag = 0 / 1, encoded as 1 byte unsigned integer
// Message-Length = length of Message, encoded as 4 byte unsigned integer (big endian)
// Message = binary representation of protobuf messageData
val byteBuffer = ByteBuffer.allocate(GRPC_PREFIX_LENGTH)

// Compression flag (1 byte) - 0, not compressed
byteBuffer.put(0)

// Message length
val messageLength = messageData.remaining()
byteBuffer.order(ByteOrder.BIG_ENDIAN)
buildbreaker marked this conversation as resolved.
Show resolved Hide resolved
byteBuffer.putInt(messageLength)

emitter.sendData(byteBuffer)
emitter.sendData(messageData)
}

/**
* Close this connection.
*/
fun close() {
emitter.close(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ interface CancelableStream {
* Interface allowing for sending/emitting data on an Envoy stream.
*/
interface StreamEmitter : CancelableStream {

/**
* For sending data to an associated stream.
*
Expand Down
30 changes: 30 additions & 0 deletions library/kotlin/test/io/envoyproxy/envoymobile/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,33 @@ envoy_mobile_kt_test(
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_request_builder_test",
srcs = [
"GRPCRequestBuilderTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_response_handler_test",
srcs = [
"GRPCResponseHandlerTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_stream_emitter_test",
srcs = [
"GRPCStreamEmitterTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)
Loading