From 5e96a17f8f8f27c049428a75e886e0563d61ac8d Mon Sep 17 00:00:00 2001 From: Alan Chiu Date: Wed, 16 Oct 2019 18:22:59 -0700 Subject: [PATCH] kotlin: grpc codec (#472) gRPC codec for kotlin We are able to hit an Envoy backed gRPC server from `lyft.com` We ran into some issues with testing (debug pull: https://github.com/lyft/envoy-mobile/pull/495). The first is outgoing ALPN is required for gRPC connections: https://github.com/lyft/envoy-mobile/issues/502. gRPC doesn't support disabling this option when starting up a service. We'll have to revisit this effort in the future. For now, we are primarily missing https://github.com/lyft/envoy-mobile/pull/494 for local Envoy library e2e testing Signed-off-by: Alan Chiu Description: gRPC codec for kotlin Risk Level: low Testing: unit/end-to-end Docs Changes: n/a Release Notes: n/a [Optional Fixes #Issue] [Optional Deprecated:] Signed-off-by: JP Simard --- mobile/.pre-commit-config.yaml | 2 +- .../src/io/envoyproxy/envoymobile/BUILD | 4 + .../io/envoyproxy/envoymobile/GRPCClient.kt | 27 ++ .../envoymobile/GRPCRequestBuilder.kt | 80 +++++ .../envoymobile/GRPCResponseHandler.kt | 175 +++++++++++ .../envoymobile/GRPCStreamEmitter.kt | 41 +++ .../envoyproxy/envoymobile/StreamEmitter.kt | 1 - .../test/io/envoyproxy/envoymobile/BUILD | 30 ++ .../envoymobile/GRPCRequestBuilderTest.kt | 54 ++++ .../envoymobile/GRPCResponseHandlerTest.kt | 294 ++++++++++++++++++ .../envoymobile/GRPCStreamEmitterTest.kt | 102 ++++++ .../envoymobile/ResponseHandlerTest.kt | 6 +- .../swift/src/GRPCResponseHandler.swift | 1 + 13 files changed, 812 insertions(+), 5 deletions(-) create mode 100644 mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt create mode 100644 mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCRequestBuilder.kt create mode 100644 mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCResponseHandler.kt create mode 100644 mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCStreamEmitter.kt create mode 100644 mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCRequestBuilderTest.kt create mode 100644 mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCResponseHandlerTest.kt create mode 100644 mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCStreamEmitterTest.kt diff --git a/mobile/.pre-commit-config.yaml b/mobile/.pre-commit-config.yaml index ec6a90d013b7..461178c2e513 100644 --- a/mobile/.pre-commit-config.yaml +++ b/mobile/.pre-commit-config.yaml @@ -12,7 +12,7 @@ repos: - id: detect-private-key - id: mixed-line-ending - repo: https://github.com/codespell-project/codespell - rev: v1.15.0 + rev: v1.16.0 hooks: - id: codespell args: [-L uint] diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/BUILD b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/BUILD index 4b095d14bf92..cdbab126d7ac 100644 --- a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/BUILD +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/BUILD @@ -34,6 +34,10 @@ envoy_mobile_kt_library( "EnvoyClientBuilder.kt", "EnvoyException.kt", "EnvoyStreamEmitter.kt", + "GRPCClient.kt", + "GRPCRequestBuilder.kt", + "GRPCResponseHandler.kt", + "GRPCStreamEmitter.kt", "HTTPClient.kt", "Request.kt", "RequestBuilder.kt", diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt new file mode 100644 index 000000000000..26e62b523065 --- /dev/null +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCClient.kt @@ -0,0 +1,27 @@ +package io.envoyproxy.envoymobile + +// 1 byte for the compression flag, 4 bytes for the message length (int) +const val GRPC_PREFIX_LENGTH = 5 + +/** + * Client that supports sending and receiving gRPC traffic. + * + * @param httpClient The HTTP client to use for gRPC streams. + */ +class GRPCClient( + private val httpClient: HTTPClient +) { + + /** + * Send a gRPC request with the provided handler. + * + * @param request The outbound gRPC request. See `GRPCRequestBuilder` for creation. + * @param handler Handler for receiving responses. + * + * @returns GRPCStreamEmitter, An emitter that can be used for sending more traffic over the stream. + */ + fun send(request: Request, grpcResponseHandler: GRPCResponseHandler): GRPCStreamEmitter { + val emitter = httpClient.send(request, grpcResponseHandler.underlyingHandler) + return GRPCStreamEmitter(emitter) + } +} diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCRequestBuilder.kt b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCRequestBuilder.kt new file mode 100644 index 000000000000..1c7ec24daa5e --- /dev/null +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCRequestBuilder.kt @@ -0,0 +1,80 @@ +package io.envoyproxy.envoymobile + +/** + * Builder used for creating new gRPC `Request` instances. + */ +class GRPCRequestBuilder( + path: String, + authority: String, + useHTTPS: Boolean +) { + private val underlyingBuilder: RequestBuilder = RequestBuilder( + method = RequestMethod.POST, + scheme = if (useHTTPS) "https" else "http", + authority = authority, + path = path) + + init { + underlyingBuilder.addHeader("content-type", "application/grpc") + } + + /** + * Append a value to the header key. + * + * @param name the header key. + * @param value the value associated to the header key. + * @return this builder. + */ + fun addHeader(name: String, value: String): GRPCRequestBuilder { + underlyingBuilder.addHeader(name, value) + return this + } + + /** + * Remove the value in the specified header. + * + * @param name the header key to remove. + * @param value the value to be removed. + * @return this builder. + */ + fun removeHeader(name: String, value: String): GRPCRequestBuilder { + underlyingBuilder.removeHeader(name, value) + return this + } + + /** + * Remove all headers with this name. + * + * @param name the header key to remove. + * @return this builder. + */ + fun removeHeaders(name: String): GRPCRequestBuilder { + underlyingBuilder.removeHeaders(name) + return this + } + + /** + * Add a specific timeout for the gRPC request. This will be sent in the `grpc-timeout` header. + * + * @param timeoutMS Timeout, in milliseconds. + * @return this builder. + */ + fun addTimeoutMS(timeoutMS: Int?): GRPCRequestBuilder { + val headerName = "grpc-timeout" + if (timeoutMS == null) { + removeHeaders(headerName) + } else { + addHeader(headerName, "${timeoutMS}m") + } + return this + } + + /** + * Creates the Request object using the data set in the builder. + * + * @return the Request object. + */ + fun build(): Request { + return underlyingBuilder.build() + } +} diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCResponseHandler.kt b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCResponseHandler.kt new file mode 100644 index 000000000000..3956e6a1fd50 --- /dev/null +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCResponseHandler.kt @@ -0,0 +1,175 @@ +package io.envoyproxy.envoymobile + +import io.envoyproxy.envoymobile.engine.types.EnvoyError +import io.envoyproxy.envoymobile.engine.types.EnvoyErrorCode +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.util.concurrent.Executor + + +class GRPCResponseHandler( + val executor: Executor +) { + + /** + * Represents the process state of the response stream's body data. + */ + private sealed class ProcessState { + // Awaiting a gRPC compression flag. + object CompressionFlag : ProcessState() + + // Awaiting the length specification of the next message. + object MessageLength : ProcessState() + + // Awaiting a message with the specified length. + class Message(val messageLength: Int) : ProcessState() + } + + internal val underlyingHandler: ResponseHandler = ResponseHandler(executor) + + private var errorClosure: (error: EnvoyError) -> Unit = { } + /** + * Specify a callback for when response headers are received by the stream. + * + * @param closure: Closure which will receive the headers, status code, + * and flag indicating if the stream is complete. + * @return GRPCResponseHandler, this GRPCResponseHandler. + */ + fun onHeaders(closure: (headers: Map>, statusCode: Int) -> Unit): GRPCResponseHandler { + underlyingHandler.onHeaders { headers, _, _ -> + val grpcStatus = headers["grpc-status"]?.first()?.toIntOrNull() ?: 0 + closure(headers, grpcStatus) + } + return this + } + + /** + * Specify a callback for when a data frame is received by the stream. + * + * @param closure: Closure which will receive the data, + * and flag indicating if the stream is complete. + * @return GRPCResponseHandler, this GRPCResponseHandler. + */ + fun onMessage(closure: (byteBuffer: ByteBuffer) -> Unit): GRPCResponseHandler { + val byteBufferedOutputStream = ByteArrayOutputStream() + var processState: ProcessState = ProcessState.CompressionFlag + underlyingHandler.onData { byteBuffer, _ -> + + val byteBufferArray = if (byteBuffer.hasArray()) { + byteBuffer.array() + } else { + val array = ByteArray(byteBuffer.remaining()) + byteBuffer.get(array) + array + } + byteBufferedOutputStream.write(byteBufferArray) + + processState = processData(byteBufferedOutputStream, processState, closure) + } + + return this + } + + /** + * Specify a callback for when trailers are received by the stream. + * If the closure is called, the stream is complete. + * + * @param closure: Closure which will receive the trailers. + * @return GRPCResponseHandler, this GRPCResponseHandler. + */ + fun onTrailers(closure: (trailers: Map>) -> Unit): GRPCResponseHandler { + underlyingHandler.onTrailers(closure) + return this + } + + /** + * Specify a callback for when an internal Envoy exception occurs with the stream. + * If the closure is called, the stream is complete. + * + * @param closure: Closure which will be called when an error occurs. + * @return GRPCResponseHandler, this GRPCResponseHandler. + */ + fun onError(closure: (error: EnvoyError) -> Unit): GRPCResponseHandler { + this.errorClosure = closure + underlyingHandler.onError(closure) + return this + } + + /** + * Recursively processes a buffer of data, buffering it into messages based on state. + * When a message has been fully buffered, `onMessage` will be called with the message. + * + * @param bufferedStream The buffer of data from which to determine state and messages. + * @param processState The current process state of the buffering. + * @param onMessage Closure to call when a new message is available. + */ + private fun processData( + bufferedStream: ByteArrayOutputStream, + processState: ProcessState, + onMessage: (byteBuffer: ByteBuffer) -> Unit): ProcessState { + + var nextState = processState + + when (processState) { + is ProcessState.CompressionFlag -> { + val byteArray = bufferedStream.toByteArray() + if (byteArray.isEmpty()) { + // We don't have enough information to extract the compression flag, so we'll just return + return ProcessState.CompressionFlag + } + + val compressionFlag = byteArray[0] + // TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501 + if (compressionFlag.compareTo(0) != 0) { + errorClosure( + EnvoyError( + EnvoyErrorCode.ENVOY_UNDEFINED_ERROR, + "Unable to read compressed gRPC response message")) + + // no op the current onData and clean up + underlyingHandler.onHeaders { _, _, _ -> } + underlyingHandler.onData { _, _ -> } + underlyingHandler.onTrailers { } + underlyingHandler.onError { } + bufferedStream.reset() + } + + nextState = ProcessState.MessageLength + } + is ProcessState.MessageLength -> { + if (bufferedStream.size() < GRPC_PREFIX_LENGTH) { + // We don't have enough information to extract the message length, so we'll just return + return ProcessState.MessageLength + } + + val byteArray = bufferedStream.toByteArray() + val buffer = ByteBuffer.wrap(byteArray.sliceArray(1..4)) + buffer.order(ByteOrder.BIG_ENDIAN) + val messageLength = buffer.int + nextState = ProcessState.Message(messageLength) + } + is ProcessState.Message -> { + if (bufferedStream.size() < processState.messageLength + GRPC_PREFIX_LENGTH) { + // We don't have enough bytes to construct the message, so we'll just return + return ProcessState.Message(processState.messageLength) + } + + val byteArray = bufferedStream.toByteArray() + onMessage(ByteBuffer.wrap( + byteArray.sliceArray(GRPC_PREFIX_LENGTH until GRPC_PREFIX_LENGTH + processState.messageLength))) + bufferedStream.reset() + bufferedStream.write( + byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size)) + + if (byteArray.sliceArray(GRPC_PREFIX_LENGTH + processState.messageLength until byteArray.size).isEmpty()) { + return ProcessState.CompressionFlag + } else { + nextState = ProcessState.CompressionFlag + } + } + } + + return processData(bufferedStream, nextState, onMessage) + } +} diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCStreamEmitter.kt b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCStreamEmitter.kt new file mode 100644 index 000000000000..7af2f377bff6 --- /dev/null +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/GRPCStreamEmitter.kt @@ -0,0 +1,41 @@ +package io.envoyproxy.envoymobile + +import java.nio.ByteBuffer +import java.nio.ByteOrder + + +class GRPCStreamEmitter( + private val emitter: StreamEmitter +) { + /** + * Send a protobuf messageData's binary data over the gRPC stream. + * + * @param messageData Binary data of a protobuf messageData to send. + */ + fun sendMessage(messageData: ByteBuffer) { + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests + // Length-Prefixed-Message = Compressed-Flag | Message-Length | Message + // Compressed-Flag = 0 / 1, encoded as 1 byte unsigned integer + // Message-Length = length of Message, encoded as 4 byte unsigned integer (big endian) + // Message = binary representation of protobuf messageData + val byteBuffer = ByteBuffer.allocate(GRPC_PREFIX_LENGTH) + + // Compression flag (1 byte) - 0, not compressed + byteBuffer.put(0) + + // Message length + val messageLength = messageData.remaining() + byteBuffer.order(ByteOrder.BIG_ENDIAN) + byteBuffer.putInt(messageLength) + + emitter.sendData(byteBuffer) + emitter.sendData(messageData) + } + + /** + * Close this connection. + */ + fun close() { + emitter.close(null) + } +} diff --git a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/StreamEmitter.kt b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/StreamEmitter.kt index fb7408cb5a0e..fe5c23ee4fd0 100644 --- a/mobile/library/kotlin/src/io/envoyproxy/envoymobile/StreamEmitter.kt +++ b/mobile/library/kotlin/src/io/envoyproxy/envoymobile/StreamEmitter.kt @@ -18,7 +18,6 @@ interface CancelableStream { * Interface allowing for sending/emitting data on an Envoy stream. */ interface StreamEmitter : CancelableStream { - /** * For sending data to an associated stream. * diff --git a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/BUILD b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/BUILD index e168dbf0e81f..a12ec6feee43 100644 --- a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/BUILD +++ b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/BUILD @@ -71,3 +71,33 @@ envoy_mobile_kt_test( "//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib", ], ) + +envoy_mobile_kt_test( + name = "grpc_request_builder_test", + srcs = [ + "GRPCRequestBuilderTest.kt", + ], + deps = [ + "//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib", + ], +) + +envoy_mobile_kt_test( + name = "grpc_response_handler_test", + srcs = [ + "GRPCResponseHandlerTest.kt", + ], + deps = [ + "//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib", + ], +) + +envoy_mobile_kt_test( + name = "grpc_stream_emitter_test", + srcs = [ + "GRPCStreamEmitterTest.kt", + ], + deps = [ + "//library/kotlin/src/io/envoyproxy/envoymobile:envoy_interfaces_lib", + ], +) diff --git a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCRequestBuilderTest.kt b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCRequestBuilderTest.kt new file mode 100644 index 000000000000..32fc905d8249 --- /dev/null +++ b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCRequestBuilderTest.kt @@ -0,0 +1,54 @@ +package io.envoyproxy.envoymobile + +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class GRPCRequestBuilderTest { + + @Test + fun `using https will pass https as scheme`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", true) + .build() + + assertThat(request.scheme).isEqualTo("https") + } + + @Test + fun `not using https will pass http as scheme`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", false) + .build() + + assertThat(request.scheme).isEqualTo("http") + } + + @Test + fun `application gprc is set as content-type header`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", false) + .build() + assertThat(request.headers["content-type"]).containsExactly("application/grpc") + } + + @Test + fun `POST is used as method`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", false) + .build() + assertThat(request.method).isEqualTo(RequestMethod.POST) + } + + @Test + fun `timeout is set as grpc-timeout header`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", false) + .addTimeoutMS(200) + .build() + + assertThat(request.headers["grpc-timeout"]).containsExactly("200m") + } + + @Test + fun `grpc-timeout header is not present when no timeout is set`() { + val request = GRPCRequestBuilder("/pb.api.v1.Foo/GetBar", "foo.bar.com", false) + .build() + + assertThat(request.headers.containsKey("grpc-timeout")).isFalse() + } +} diff --git a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCResponseHandlerTest.kt b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCResponseHandlerTest.kt new file mode 100644 index 000000000000..bcf3230a438b --- /dev/null +++ b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCResponseHandlerTest.kt @@ -0,0 +1,294 @@ +package io.envoyproxy.envoymobile + +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit + +class GRPCResponseHandlerTest { + + @Test(timeout = 200L) + fun `headers and grpc status is passed to on headers`() { + val countDownLatch = CountDownLatch(1) + + val handler = GRPCResponseHandler(Executor { }) + .onHeaders { headers, grpcStatus -> + assertThat(headers).isEqualTo(mapOf("grpc-status" to listOf("1"), "other" to listOf("foo", "bar"))) + assertThat(grpcStatus).isEqualTo(1) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onHeaders( + mapOf("grpc-status" to listOf("1"), "other" to listOf("foo", "bar")), + true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } + + @Test(timeout = 200L) + fun `trailers are passed on trailers`() { + val countDownLatch = CountDownLatch(1) + + val handler = GRPCResponseHandler(Executor { }) + .onTrailers { trailers -> + assertThat(trailers).isEqualTo(mapOf("foo" to listOf("bar"), "baz" to listOf("1", "2"))) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onTrailers( + mapOf("foo" to listOf("bar"), "baz" to listOf("1", "2"))) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } + + @Test(timeout = 200L) + fun `messages are passed when it fits in the same chunk`() { + val countDownLatch = CountDownLatch(1) + + val data = "data".toByteArray(Charsets.UTF_8) + + val prefix = ByteBuffer.allocate(5) + prefix.put(0) + prefix.order(ByteOrder.BIG_ENDIAN) + prefix.putInt(data.size) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix.array()) + outputStream.write(data) + val byteBuffer = ByteBuffer.wrap(outputStream.toByteArray()) + + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + assertThat(message.array().toString(Charsets.UTF_8)).isEqualTo("data") + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(byteBuffer, true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } + + @Test(timeout = 200L) + fun `messages are buffered and passed until all chunks are received`() { + val countDownLatch = CountDownLatch(1) + + val part1 = "data".toByteArray(Charsets.UTF_8) + val part2 = "_by_".toByteArray(Charsets.UTF_8) + val part3 = "parts".toByteArray(Charsets.UTF_8) + + val prefix = ByteBuffer.allocate(5) + prefix.put(0) + prefix.order(ByteOrder.BIG_ENDIAN) + prefix.putInt(part1.size + part2.size + part3.size) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix.array()) + outputStream.write(part1) + + val initialBuffer = ByteBuffer.wrap(outputStream.toByteArray()) + + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + assertThat(message.array().toString(Charsets.UTF_8)).isEqualTo("data_by_parts") + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(initialBuffer, false) + handler.underlyingHandler.underlyingCallbacks.onData(ByteBuffer.wrap(part2), false) + handler.underlyingHandler.underlyingCallbacks.onData(ByteBuffer.wrap(part3), true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } + + @Test(timeout = 200L) + fun `multiple messages in the same chunk will be passed down`() { + val countDownLatch = CountDownLatch(2) + + val part1 = "part1".toByteArray(Charsets.UTF_8) + val part2 = "part2".toByteArray(Charsets.UTF_8) + + val prefix1 = ByteBuffer.allocate(5) + prefix1.put(0) + prefix1.order(ByteOrder.BIG_ENDIAN) + prefix1.putInt(part1.size) + + val prefix2 = ByteBuffer.allocate(5) + prefix2.put(0) + prefix2.order(ByteOrder.BIG_ENDIAN) + prefix2.putInt(part2.size) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix1.array()) + outputStream.write(part1) + outputStream.write(prefix2.array()) + outputStream.write(part2) + + val part1AndPart2 = ByteBuffer.wrap(outputStream.toByteArray()) + + val messages = mutableListOf() + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + messages.add(message) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(part1AndPart2, false) + + countDownLatch.await(1L, TimeUnit.SECONDS) + assertThat(messages[0].array().toString(Charsets.UTF_8)).isEqualTo("part1") + assertThat(messages[1].array().toString(Charsets.UTF_8)).isEqualTo("part2") + } + + @Test(timeout = 200L) + fun `multiple messages in different chunks will be passed down`() { + val countDownLatch = CountDownLatch(2) + + val part1 = "part1".toByteArray(Charsets.UTF_8) + val part2a = "part2a".toByteArray(Charsets.UTF_8) + val part2b = "_part2b".toByteArray(Charsets.UTF_8) + + val prefix1 = ByteBuffer.allocate(5) + prefix1.put(0) + prefix1.order(ByteOrder.BIG_ENDIAN) + prefix1.putInt(part1.size) + + val prefix2 = ByteBuffer.allocate(5) + prefix2.put(0) + prefix2.order(ByteOrder.BIG_ENDIAN) + prefix2.putInt(part2a.size + part2b.size) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix1.array()) + outputStream.write(part1) + outputStream.write(prefix2.array()) + outputStream.write(part2a) + val part1AndPart2a = ByteBuffer.wrap(outputStream.toByteArray()) + + val messages = mutableListOf() + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + messages.add(message) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(part1AndPart2a, false) + handler.underlyingHandler.underlyingCallbacks.onData(ByteBuffer.wrap(part2b), false) + + countDownLatch.await(1L, TimeUnit.SECONDS) + assertThat(messages[0].array().toString(Charsets.UTF_8)).isEqualTo("part1") + assertThat(messages[1].array().toString(Charsets.UTF_8)).isEqualTo("part2a_part2b") + } + + @Test(timeout = 200L) + fun `empty messages in same will send empty message down`() { + val countDownLatch = CountDownLatch(2) + + val part2 = "part2".toByteArray(Charsets.UTF_8) + + val prefix1 = ByteBuffer.allocate(5) + prefix1.put(0) + prefix1.order(ByteOrder.BIG_ENDIAN) + prefix1.putInt(0) + + val prefix2 = ByteBuffer.allocate(5) + prefix2.put(0) + prefix2.order(ByteOrder.BIG_ENDIAN) + prefix2.putInt(part2.size) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix1.array()) + outputStream.write(prefix2.array()) + outputStream.write(part2) + val resultMessages = ByteBuffer.wrap(outputStream.toByteArray()) + + val messages = mutableListOf() + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + messages.add(message) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(resultMessages, false) + + countDownLatch.await(1L, TimeUnit.SECONDS) + assertThat(messages[0].array()).isEmpty() + assertThat(messages[1].array().toString(Charsets.UTF_8)).isEqualTo("part2") + } + + + @Test(timeout = 200L) + fun `zero length messages are passed as empty byte buffers`() { + val countDownLatch = CountDownLatch(1) + val prefix = ByteBuffer.allocate(5) + prefix.put(0) + prefix.order(ByteOrder.BIG_ENDIAN) + prefix.putInt(0) + + val outputStream = ByteArrayOutputStream() + outputStream.write(prefix.array()) + val data = ByteBuffer.wrap(outputStream.toByteArray()) + + val messages = mutableListOf() + val handler = GRPCResponseHandler(Executor { }) + .onMessage { message -> + messages.add(message) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onData(data, false) + + countDownLatch.await(1L, TimeUnit.SECONDS) + assertThat(messages[0].array()).isEmpty() + + } + + @Test(timeout = 200L) + fun `first grpc status is passed on headers`() { + val countDownLatch = CountDownLatch(1) + + val handler = GRPCResponseHandler(Executor { }) + .onHeaders { _, grpcStatus -> + assertThat(grpcStatus).isEqualTo(1) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onHeaders(mapOf("grpc-status" to listOf("1", "2")), true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + + } + + @Test(timeout = 200L) + fun `missing grpc status will default status to 0`() { + val countDownLatch = CountDownLatch(1) + + val handler = GRPCResponseHandler(Executor { }) + .onHeaders { _, grpcStatus -> + assertThat(grpcStatus).isEqualTo(0) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onHeaders(mapOf(), true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } + + @Test(timeout = 200L) + fun `invalid grpc status will default status to 0`() { + val countDownLatch = CountDownLatch(1) + + val handler = GRPCResponseHandler(Executor { }) + .onHeaders { _, grpcStatus -> + assertThat(grpcStatus).isEqualTo(0) + countDownLatch.countDown() + } + + handler.underlyingHandler.underlyingCallbacks.onHeaders(mapOf("grpc-status" to listOf("invalid")), true) + + countDownLatch.await(1L, TimeUnit.SECONDS) + } +} diff --git a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCStreamEmitterTest.kt b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCStreamEmitterTest.kt new file mode 100644 index 000000000000..a4f4bc2a39f5 --- /dev/null +++ b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/GRPCStreamEmitterTest.kt @@ -0,0 +1,102 @@ +package io.envoyproxy.envoymobile + +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.nio.ByteOrder + +class GRPCStreamEmitterTest { + + // TODO: Problems with nhaarman/mockito-kotlin https://github.com/lyft/envoy-mobile/issues/504 + // This is a total hack to get something to work + private lateinit var emitter: StreamEmitter + + private val dataOutputStream = ByteArrayOutputStream() + private var isCloseCalledWithNull = false + + @Before + fun setup() { + emitter = object : StreamEmitter { + + override fun cancel() { + throw UnsupportedOperationException("unexpected usage of mock emitter") + } + + override fun sendData(byteBuffer: ByteBuffer): StreamEmitter { + dataOutputStream.write(byteBuffer.array()) + return this + } + + override fun sendMetadata(metadata: Map>): StreamEmitter { + throw UnsupportedOperationException("unexpected usage of mock emitter") + } + + override fun close(trailers: Map>?) { + isCloseCalledWithNull = trailers == null + } + } + } + + @After + fun teardown() { + dataOutputStream.reset() + isCloseCalledWithNull = false + } + + @Test + fun `prefix and data is sent on send data`() { + val grpcStreamEmitter = GRPCStreamEmitter(emitter) + + val payload = "data".toByteArray(Charsets.UTF_8) + val message = ByteBuffer.wrap(payload) + grpcStreamEmitter.sendMessage(message) + + assertThat(dataOutputStream.toByteArray()).hasSize(payload.size + GRPC_PREFIX_LENGTH) + } + + @Test + fun `compression flag is set on the first bit of the prefix`() { + val grpcStreamEmitter = GRPCStreamEmitter(emitter) + + val payload = "data".toByteArray(Charsets.UTF_8) + val message = ByteBuffer.wrap(payload) + grpcStreamEmitter.sendMessage(message) + + assertThat(dataOutputStream.toByteArray()[0]).isEqualTo(0) + } + + @Test + fun `message length is set on the 1-4 bytes of the prefix`() { + val grpcStreamEmitter = GRPCStreamEmitter(emitter) + + val payload = "data".toByteArray(Charsets.UTF_8) + val message = ByteBuffer.wrap(payload) + grpcStreamEmitter.sendMessage(message) + + assertThat(ByteBuffer.wrap(dataOutputStream.toByteArray().sliceArray(1..4)).order(ByteOrder.BIG_ENDIAN).int).isEqualTo(payload.size) + } + + @Test + fun `message is sent after the prefix`() { + val grpcStreamEmitter = GRPCStreamEmitter(emitter) + + val payload = "data".toByteArray(Charsets.UTF_8) + val message = ByteBuffer.wrap(payload) + grpcStreamEmitter.sendMessage(message) + + assertThat(dataOutputStream.toByteArray().sliceArray(GRPC_PREFIX_LENGTH until dataOutputStream.size()).toString(Charsets.UTF_8)).isEqualTo("data") + + } + + @Test + fun `close is called with empty data frame`() { + val grpcStreamEmitter = GRPCStreamEmitter(emitter) + + grpcStreamEmitter.close() + + assertThat(isCloseCalledWithNull).isTrue() + } +} diff --git a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/ResponseHandlerTest.kt b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/ResponseHandlerTest.kt index 9f21d7fc46af..3855b05e72e8 100644 --- a/mobile/library/kotlin/test/io/envoyproxy/envoymobile/ResponseHandlerTest.kt +++ b/mobile/library/kotlin/test/io/envoyproxy/envoymobile/ResponseHandlerTest.kt @@ -7,7 +7,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit class ResponseHandlerTest { - @Test(timeout = 100L) + @Test(timeout = 200L) fun `parsing status code from headers returns first status code`() { val countDownLatch = CountDownLatch(1) val headers = mapOf(":status" to listOf("204", "200"), "other" to listOf("1")) @@ -21,7 +21,7 @@ class ResponseHandlerTest { countDownLatch.await(1L, TimeUnit.SECONDS) } - @Test(timeout = 100L) + @Test(timeout = 200L) fun `parsing invalid status code from headers returns 0`() { val countDownLatch = CountDownLatch(1) val headers = mapOf(":status" to listOf("invalid"), "other" to listOf("1")) @@ -35,7 +35,7 @@ class ResponseHandlerTest { countDownLatch.await(1L, TimeUnit.SECONDS) } - @Test(timeout = 100L) + @Test(timeout = 200L) fun `parsing missing status code from headers returns 0`() { val countDownLatch = CountDownLatch(1) val headers = mapOf("other" to listOf("1")) diff --git a/mobile/library/swift/src/GRPCResponseHandler.swift b/mobile/library/swift/src/GRPCResponseHandler.swift index a37d6c7bc15f..70dedf7d788c 100644 --- a/mobile/library/swift/src/GRPCResponseHandler.swift +++ b/mobile/library/swift/src/GRPCResponseHandler.swift @@ -124,6 +124,7 @@ public final class GRPCResponseHandler: NSObject { } guard compressionFlag == 0 else { + // TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501 assertionFailure("gRPC decompression is not supported") buffer.removeAll() state = .expectingCompressionFlag