Skip to content

Commit

Permalink
kotlin: grpc codec (#472)
Browse files Browse the repository at this point in the history
gRPC codec for kotlin

We are able to hit an Envoy backed gRPC server from `lyft.com`

We ran into some issues with testing (debug pull: envoyproxy/envoy-mobile#495). The first is outgoing ALPN is required for gRPC connections: envoyproxy/envoy-mobile#502. gRPC doesn't support disabling this option when starting up a service. We'll have to revisit this effort in the future.

For now, we are primarily missing envoyproxy/envoy-mobile#494 for local Envoy library e2e testing

Signed-off-by: Alan Chiu <[email protected]>

Description: gRPC codec for kotlin
Risk Level: low
Testing: unit/end-to-end
Docs Changes: n/a
Release Notes: n/a
[Optional Fixes #Issue]
[Optional Deprecated:]

Signed-off-by: JP Simard <[email protected]>
  • Loading branch information
Alan Chiu authored and jpsim committed Nov 28, 2022
1 parent 8bd2ad7 commit 5e96a17
Show file tree
Hide file tree
Showing 13 changed files with 812 additions and 5 deletions.
2 changes: 1 addition & 1 deletion mobile/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repos:
- id: detect-private-key
- id: mixed-line-ending
- repo: https://github.com/codespell-project/codespell
rev: v1.15.0
rev: v1.16.0
hooks:
- id: codespell
args: [-L uint]
4 changes: 4 additions & 0 deletions mobile/library/kotlin/src/io/envoyproxy/envoymobile/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ envoy_mobile_kt_library(
"EnvoyClientBuilder.kt",
"EnvoyException.kt",
"EnvoyStreamEmitter.kt",
"GRPCClient.kt",
"GRPCRequestBuilder.kt",
"GRPCResponseHandler.kt",
"GRPCStreamEmitter.kt",
"HTTPClient.kt",
"Request.kt",
"RequestBuilder.kt",
Expand Down
27 changes: 27 additions & 0 deletions mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.envoyproxy.envoymobile

// 1 byte for the compression flag, 4 bytes for the message length (int)
const val GRPC_PREFIX_LENGTH = 5

/**
* Client that supports sending and receiving gRPC traffic.
*
* @param httpClient The HTTP client to use for gRPC streams.
*/
class GRPCClient(
private val httpClient: HTTPClient
) {

/**
* Send a gRPC request with the provided handler.
*
* @param request The outbound gRPC request. See `GRPCRequestBuilder` for creation.
* @param handler Handler for receiving responses.
*
* @returns GRPCStreamEmitter, An emitter that can be used for sending more traffic over the stream.
*/
fun send(request: Request, grpcResponseHandler: GRPCResponseHandler): GRPCStreamEmitter {
val emitter = httpClient.send(request, grpcResponseHandler.underlyingHandler)
return GRPCStreamEmitter(emitter)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.envoyproxy.envoymobile

/**
* Builder used for creating new gRPC `Request` instances.
*/
class GRPCRequestBuilder(
path: String,
authority: String,
useHTTPS: Boolean
) {
private val underlyingBuilder: RequestBuilder = RequestBuilder(
method = RequestMethod.POST,
scheme = if (useHTTPS) "https" else "http",
authority = authority,
path = path)

init {
underlyingBuilder.addHeader("content-type", "application/grpc")
}

/**
* Append a value to the header key.
*
* @param name the header key.
* @param value the value associated to the header key.
* @return this builder.
*/
fun addHeader(name: String, value: String): GRPCRequestBuilder {
underlyingBuilder.addHeader(name, value)
return this
}

/**
* Remove the value in the specified header.
*
* @param name the header key to remove.
* @param value the value to be removed.
* @return this builder.
*/
fun removeHeader(name: String, value: String): GRPCRequestBuilder {
underlyingBuilder.removeHeader(name, value)
return this
}

/**
* Remove all headers with this name.
*
* @param name the header key to remove.
* @return this builder.
*/
fun removeHeaders(name: String): GRPCRequestBuilder {
underlyingBuilder.removeHeaders(name)
return this
}

/**
* Add a specific timeout for the gRPC request. This will be sent in the `grpc-timeout` header.
*
* @param timeoutMS Timeout, in milliseconds.
* @return this builder.
*/
fun addTimeoutMS(timeoutMS: Int?): GRPCRequestBuilder {
val headerName = "grpc-timeout"
if (timeoutMS == null) {
removeHeaders(headerName)
} else {
addHeader(headerName, "${timeoutMS}m")
}
return this
}

/**
* Creates the Request object using the data set in the builder.
*
* @return the Request object.
*/
fun build(): Request {
return underlyingBuilder.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package io.envoyproxy.envoymobile

import io.envoyproxy.envoymobile.engine.types.EnvoyError
import io.envoyproxy.envoymobile.engine.types.EnvoyErrorCode
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.Executor


class GRPCResponseHandler(
val executor: Executor
) {

/**
* Represents the process state of the response stream's body data.
*/
private sealed class ProcessState {
// Awaiting a gRPC compression flag.
object CompressionFlag : ProcessState()

// Awaiting the length specification of the next message.
object MessageLength : ProcessState()

// Awaiting a message with the specified length.
class Message(val messageLength: Int) : ProcessState()
}

internal val underlyingHandler: ResponseHandler = ResponseHandler(executor)

private var errorClosure: (error: EnvoyError) -> Unit = { }
/**
* Specify a callback for when response headers are received by the stream.
*
* @param closure: Closure which will receive the headers, status code,
* and flag indicating if the stream is complete.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onHeaders(closure: (headers: Map<String, List<String>>, statusCode: Int) -> Unit): GRPCResponseHandler {
underlyingHandler.onHeaders { headers, _, _ ->
val grpcStatus = headers["grpc-status"]?.first()?.toIntOrNull() ?: 0
closure(headers, grpcStatus)
}
return this
}

/**
* Specify a callback for when a data frame is received by the stream.
*
* @param closure: Closure which will receive the data,
* and flag indicating if the stream is complete.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onMessage(closure: (byteBuffer: ByteBuffer) -> Unit): GRPCResponseHandler {
val byteBufferedOutputStream = ByteArrayOutputStream()
var processState: ProcessState = ProcessState.CompressionFlag
underlyingHandler.onData { byteBuffer, _ ->

val byteBufferArray = if (byteBuffer.hasArray()) {
byteBuffer.array()
} else {
val array = ByteArray(byteBuffer.remaining())
byteBuffer.get(array)
array
}
byteBufferedOutputStream.write(byteBufferArray)

processState = processData(byteBufferedOutputStream, processState, closure)
}

return this
}

/**
* Specify a callback for when trailers are received by the stream.
* If the closure is called, the stream is complete.
*
* @param closure: Closure which will receive the trailers.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onTrailers(closure: (trailers: Map<String, List<String>>) -> Unit): GRPCResponseHandler {
underlyingHandler.onTrailers(closure)
return this
}

/**
* Specify a callback for when an internal Envoy exception occurs with the stream.
* If the closure is called, the stream is complete.
*
* @param closure: Closure which will be called when an error occurs.
* @return GRPCResponseHandler, this GRPCResponseHandler.
*/
fun onError(closure: (error: EnvoyError) -> Unit): GRPCResponseHandler {
this.errorClosure = closure
underlyingHandler.onError(closure)
return this
}

/**
* Recursively processes a buffer of data, buffering it into messages based on state.
* When a message has been fully buffered, `onMessage` will be called with the message.
*
* @param bufferedStream The buffer of data from which to determine state and messages.
* @param processState The current process state of the buffering.
* @param onMessage Closure to call when a new message is available.
*/
private fun processData(
bufferedStream: ByteArrayOutputStream,
processState: ProcessState,
onMessage: (byteBuffer: ByteBuffer) -> Unit): ProcessState {

var nextState = processState

when (processState) {
is ProcessState.CompressionFlag -> {
val byteArray = bufferedStream.toByteArray()
if (byteArray.isEmpty()) {
// We don't have enough information to extract the compression flag, so we'll just return
return ProcessState.CompressionFlag
}

val compressionFlag = byteArray[0]
// TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501
if (compressionFlag.compareTo(0) != 0) {
errorClosure(
EnvoyError(
EnvoyErrorCode.ENVOY_UNDEFINED_ERROR,
"Unable to read compressed gRPC response message"))

// no op the current onData and clean up
underlyingHandler.onHeaders { _, _, _ -> }
underlyingHandler.onData { _, _ -> }
underlyingHandler.onTrailers { }
underlyingHandler.onError { }
bufferedStream.reset()
}

nextState = ProcessState.MessageLength
}
is ProcessState.MessageLength -> {
if (bufferedStream.size() < GRPC_PREFIX_LENGTH) {
// We don't have enough information to extract the message length, so we'll just return
return ProcessState.MessageLength
}

val byteArray = bufferedStream.toByteArray()
val buffer = ByteBuffer.wrap(byteArray.sliceArray(1..4))
buffer.order(ByteOrder.BIG_ENDIAN)
val messageLength = buffer.int
nextState = ProcessState.Message(messageLength)
}
is ProcessState.Message -> {
if (bufferedStream.size() < processState.messageLength + GRPC_PREFIX_LENGTH) {
// We don't have enough bytes to construct the message, so we'll just return
return ProcessState.Message(processState.messageLength)
}

val byteArray = bufferedStream.toByteArray()
onMessage(ByteBuffer.wrap(
byteArray.sliceArray(GRPC_PREFIX_LENGTH until GRPC_PREFIX_LENGTH + processState.messageLength)))
bufferedStream.reset()
bufferedStream.write(
byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size))

if (byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size).isEmpty()) {
return ProcessState.CompressionFlag
} else {
nextState = ProcessState.CompressionFlag
}
}
}

return processData(bufferedStream, nextState, onMessage)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.envoyproxy.envoymobile

import java.nio.ByteBuffer
import java.nio.ByteOrder


class GRPCStreamEmitter(
private val emitter: StreamEmitter
) {
/**
* Send a protobuf messageData's binary data over the gRPC stream.
*
* @param messageData Binary data of a protobuf messageData to send.
*/
fun sendMessage(messageData: ByteBuffer) {
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// Length-Prefixed-Message = Compressed-Flag | Message-Length | Message
// Compressed-Flag = 0 / 1, encoded as 1 byte unsigned integer
// Message-Length = length of Message, encoded as 4 byte unsigned integer (big endian)
// Message = binary representation of protobuf messageData
val byteBuffer = ByteBuffer.allocate(GRPC_PREFIX_LENGTH)

// Compression flag (1 byte) - 0, not compressed
byteBuffer.put(0)

// Message length
val messageLength = messageData.remaining()
byteBuffer.order(ByteOrder.BIG_ENDIAN)
byteBuffer.putInt(messageLength)

emitter.sendData(byteBuffer)
emitter.sendData(messageData)
}

/**
* Close this connection.
*/
fun close() {
emitter.close(null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ interface CancelableStream {
* Interface allowing for sending/emitting data on an Envoy stream.
*/
interface StreamEmitter : CancelableStream {

/**
* For sending data to an associated stream.
*
Expand Down
30 changes: 30 additions & 0 deletions mobile/library/kotlin/test/io/envoyproxy/envoymobile/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,33 @@ envoy_mobile_kt_test(
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_request_builder_test",
srcs = [
"GRPCRequestBuilderTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_response_handler_test",
srcs = [
"GRPCResponseHandlerTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)

envoy_mobile_kt_test(
name = "grpc_stream_emitter_test",
srcs = [
"GRPCStreamEmitterTest.kt",
],
deps = [
"//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib",
],
)
Loading

0 comments on commit 5e96a17

Please sign in to comment.