diff --git a/library/swift/src/GRPCResponseHandler.swift b/library/swift/src/GRPCResponseHandler.swift index 70dedf7d78..5e33a20437 100644 --- a/library/swift/src/GRPCResponseHandler.swift +++ b/library/swift/src/GRPCResponseHandler.swift @@ -3,6 +3,8 @@ import Foundation /// Handler for responses sent over gRPC. @objcMembers public final class GRPCResponseHandler: NSObject { + private var internalErrorClosure: ((_ error: EnvoyError) -> Void)? + /// Represents the state of a response stream's body data. private enum State { /// Awaiting a gRPC compression flag. @@ -56,10 +58,12 @@ public final class GRPCResponseHandler: NSObject { var buffer = Data() var state = State.expectingCompressionFlag self.underlyingHandler.onData { chunk, _ in + // This closure deliberately retains `self` while the underlying handler's + // `onData` closure is kept in memory so that messages/errors can be processed. // Appending might result in extra copying that can be optimized in the future. buffer.append(chunk) // gRPC always sends trailers, so the stream will not complete here. - GRPCResponseHandler.processBuffer(&buffer, state: &state, onMessage: closure) + self.processBuffer(&buffer, state: &state, onMessage: closure) } return self @@ -91,6 +95,7 @@ public final class GRPCResponseHandler: NSObject { @escaping (_ error: EnvoyError) -> Void) -> GRPCResponseHandler { + self.internalErrorClosure = closure self.underlyingHandler.onError(closure) return self } @@ -114,8 +119,8 @@ public final class GRPCResponseHandler: NSObject { /// - parameter buffer: The buffer of data from which to determine state and messages. /// - parameter state: The current state of the buffering. /// - parameter onMessage: Closure to call when a new message is available. - private static func processBuffer(_ buffer: inout Data, state: inout State, - onMessage: (_ message: Data) -> Void) + private func processBuffer(_ buffer: inout Data, state: inout State, + onMessage: (_ message: Data) -> Void) { switch state { case .expectingCompressionFlag: @@ -125,7 +130,11 @@ public final class GRPCResponseHandler: NSObject { guard compressionFlag == 0 else { // TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501 - assertionFailure("gRPC decompression is not supported") + // Call the handler with an error and ignore all future updates. + let error = EnvoyError( + errorCode: 0, message: "Unable to read compressed gRPC response message", cause: nil) + self.internalErrorClosure?(error) + self.resetHandlers() buffer.removeAll() state = .expectingCompressionFlag return @@ -158,4 +167,13 @@ public final class GRPCResponseHandler: NSObject { self.processBuffer(&buffer, state: &state, onMessage: onMessage) } + + private func resetHandlers() { + self.internalErrorClosure = nil + self.underlyingHandler + .onHeaders { _, _, _ in } + .onData { _, _ in } + .onTrailers { _ in } + .onError { _ in } + } } diff --git a/library/swift/test/GRPCResponseHandlerTests.swift b/library/swift/test/GRPCResponseHandlerTests.swift index 6bbc8f6dd8..61211a9ca0 100644 --- a/library/swift/test/GRPCResponseHandlerTests.swift +++ b/library/swift/test/GRPCResponseHandlerTests.swift @@ -129,6 +129,47 @@ final class GRPCResponseHandlerTests: XCTestCase { XCTAssertTrue(expectedMessages.isEmpty) } + // TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501 + func testCalledWithErrorWhenCompressedMessageReceived() { + let expectation = self.expectation(description: "Error closure is called") + let firstMessage = Data([ + 0x1, // Compression flag + 0x0, 0x0, 0x0, 0x5, // Length bytes + ] + kMessage1) + + let handler = GRPCResponseHandler() + .onMessage { _ in } // Compiler segfaults without this + .onError { error in + let message = "Unable to read compressed gRPC response message" + XCTAssertEqual(message, error.message) + expectation.fulfill() + } + + handler.underlyingHandler.underlyingCallbacks.onData(firstMessage, false) + self.waitForExpectations(timeout: 0.1) + } + + // TODO: Support gRPC compression https://github.com/lyft/envoy-mobile/issues/501 + func testDoesNotCallOtherCallbacksAfterReceivingCompressedMessageError() { + let errorExpectation = self.expectation(description: "Error closure is called") + let otherExpectation = self.expectation(description: "Other closures are not called") + otherExpectation.isInverted = true + let firstMessage = Data([ + 0x1, // Compression flag + 0x0, 0x0, 0x0, 0x5, // Length bytes + ] + kMessage1) + + let handler = GRPCResponseHandler() + .onMessage { _ in otherExpectation.fulfill() } + .onTrailers { _ in otherExpectation.fulfill() } + .onError { _ in errorExpectation.fulfill() } + + handler.underlyingHandler.underlyingCallbacks.onData(firstMessage, false) + handler.underlyingHandler.underlyingCallbacks.onData(firstMessage, false) + handler.underlyingHandler.underlyingCallbacks.onTrailers([:]) + self.waitForExpectations(timeout: 0.1) + } + // MARK: - gRPC status parsing func testParsingGRPCStatusFromHeadersReturnsFirstStatus() {