Skip to content

Commit

Permalink
7.x.x: Response streaming (#558)
Browse files Browse the repository at this point in the history
* Add support for returning streamed responses

* Remove old response streaming fiunctions

* Remove unused function

* Comment update
  • Loading branch information
adam-fowler committed Apr 5, 2024
1 parent 8b2e0a6 commit 1262117
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 406 deletions.
43 changes: 0 additions & 43 deletions Sources/SotoCore/AWSClient+EndpointDiscovery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,49 +160,6 @@ extension AWSClient {
)
}

/// Execute a request with an input object and return the output object generated from the response
/// - parameters:
/// - operationName: Name of the AWS operation
/// - path: path to append to endpoint URL
/// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc)
/// - serviceConfig: AWS service configuration used in request creation and signing
/// - input: Input object
/// - hostPrefix: Prefix to append to host name
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - stream: Closure to stream payload response into
/// - returns:
/// Output object that completes when response is received
public func execute<Output: AWSDecodableShape, Input: AWSEncodableShape>(
operation operationName: String,
path: String,
httpMethod: HTTPMethod,
serviceConfig: AWSServiceConfig,
input: Input,
hostPrefix: String? = nil,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
stream: @escaping AWSResponseStream
) async throws -> Output {
return try await self.execute(
execute: { endpoint in
return try await self.execute(
operation: operationName,
path: path,
httpMethod: httpMethod,
serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig,
input: input,
hostPrefix: hostPrefix,
logger: logger,
stream: stream
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
logger: logger
)
}

private func execute<Output>(
execute: @escaping (String?) async throws -> Output,
isEnabled: Bool,
Expand Down
56 changes: 11 additions & 45 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,47 +374,6 @@ extension AWSClient {
)
}

/// Execute a request with an input object and return the output object generated from the response
/// - parameters:
/// - operationName: Name of the AWS operation
/// - path: path to append to endpoint URL
/// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc)
/// - serviceConfig: AWS Service configuration
/// - input: Input object
/// - hostPrefix: String to prefix host name with
/// - logger: Logger to log request details to
/// - returns:
/// Output object that completes when response is received
public func execute<Output: AWSDecodableShape, Input: AWSEncodableShape>(
operation operationName: String,
path: String,
httpMethod: HTTPMethod,
serviceConfig: AWSServiceConfig,
input: Input,
hostPrefix: String? = nil,
logger: Logger = AWSClient.loggingDisabled,
stream: @escaping AWSResponseStream
) async throws -> Output {
return try await self.execute(
operation: operationName,
createRequest: {
try AWSRequest(
operation: operationName,
path: path,
httpMethod: httpMethod,
input: input,
hostPrefix: hostPrefix,
configuration: serviceConfig
)
},
processResponse: { response in
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
)
}

/// internal version of execute
internal func execute<Output>(
operation operationName: String,
Expand Down Expand Up @@ -604,18 +563,18 @@ extension AWSClient {
assert((200..<300).contains(response.status.code), "Shouldn't get here if error was returned")

let raw = Output._options.contains(.rawPayload) == true
let awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw)
let awsResponse = try await AWSResponse(from: response, streaming: raw)
.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig)

return try awsResponse.generateOutputShape(operation: operationName)
return try awsResponse.generateOutputShape(operation: operationName, serviceProtocol: serviceConfig.serviceProtocol)
}

/// Create error from HTTPResponse. This is only called if we received an unsuccessful http status code.
internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) async throws -> Error {
// if we can create an AWSResponse and create an error from it return that
let awsResponse: AWSResponse
do {
awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol)
awsResponse = try await AWSResponse(from: response, streaming: false)
} catch {
// else return "Unhandled error message" with rawBody attached
let context = AWSErrorContext(
Expand All @@ -640,7 +599,14 @@ extension AWSClient {
responseCode: response.status,
headers: response.headers
)
return AWSRawError(rawBody: awsResponseWithMiddleware.body.asString(), context: context)
let responseBody: String?
switch awsResponseWithMiddleware.body.storage {
case .byteBuffer(let buffer):
responseBody = String(buffer: buffer)
default:
responseBody = nil
}
return AWSRawError(rawBody: responseBody, context: context)
}
} catch {
return error
Expand Down
11 changes: 5 additions & 6 deletions Sources/SotoCore/Concurrency/AnyAsyncSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@
//
//===----------------------------------------------------------------------===//

@usableFromInline
struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
@usableFromInline typealias AsyncIteratorNextCallback = () async throws -> Element?
public struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
public typealias AsyncIteratorNextCallback = () async throws -> Element?

@usableFromInline struct AsyncIterator: AsyncIteratorProtocol {
public struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline let nextCallback: AsyncIteratorNextCallback

@inlinable init(nextCallback: @escaping AsyncIteratorNextCallback) {
self.nextCallback = nextCallback
}

@inlinable mutating func next() async throws -> Element? {
@inlinable public mutating func next() async throws -> Element? {
try await self.nextCallback()
}
}
Expand All @@ -54,7 +53,7 @@ struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
}
}

@inlinable func makeAsyncIterator() -> AsyncIterator {
@inlinable public func makeAsyncIterator() -> AsyncIterator {
.init(nextCallback: self.makeAsyncIteratorCallback())
}
}
2 changes: 1 addition & 1 deletion Sources/SotoCore/Encoder/DictionaryDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ extension __DictionaryDecoder {
return try self.unbox(value, as: Data.self)
} else if type == Date.self {
return try self.unbox(value, as: Date.self)
} else if type == AWSPayload.self {
} else if type == HTTPBody.self {
return value
} else {
self.storage.push(container: value)
Expand Down
35 changes: 21 additions & 14 deletions Sources/SotoCore/HTTP/AWSHTTPTypes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,29 @@ import Logging
import NIOCore
import NIOHTTP1

/// Function that streamed response chunks are sent ot
public typealias AWSResponseStream = (ByteBuffer, EventLoop) -> EventLoopFuture<Void>

/// Storage for HTTP body which can be either a ByteBuffer or an AsyncSequence of
/// ByteBuffers
struct HTTPBody {
public struct HTTPBody: Sendable {
enum Storage {
case byteBuffer(ByteBuffer)
case asyncSequence(sequence: AnyAsyncSequence<ByteBuffer>, length: Int?)
}

let storage: Storage

init() {
public init() {
self.storage = .byteBuffer(ByteBuffer())
}

init(_ byteBuffer: ByteBuffer) {
public init(_ byteBuffer: ByteBuffer) {
self.storage = .byteBuffer(byteBuffer)
}

init<BufferSequence: AsyncSequence>(_ sequence: BufferSequence, length: Int?) where BufferSequence.Element == ByteBuffer {
self.storage = .asyncSequence(sequence: .init(sequence), length: length)
}

func collect(upTo length: Int) async throws -> ByteBuffer {
public func collect(upTo length: Int) async throws -> ByteBuffer {
switch self.storage {
case .byteBuffer(let buffer):
return buffer
Expand All @@ -51,7 +48,7 @@ struct HTTPBody {
}
}

var length: Int? {
public var length: Int? {
switch self.storage {
case .byteBuffer(let buffer):
return buffer.readableBytes
Expand All @@ -60,19 +57,21 @@ struct HTTPBody {
}
}

var isStreaming: Bool {
if case .asyncSequence = self.storage {
public var isStreaming: Bool {
switch self.storage {
case .byteBuffer:
return false
case .asyncSequence:
return true
}
return false
}
}

extension HTTPBody: AsyncSequence {
typealias Element = ByteBuffer
typealias AsyncIterator = AnyAsyncSequence<ByteBuffer>.AsyncIterator
public typealias Element = ByteBuffer
public typealias AsyncIterator = AnyAsyncSequence<ByteBuffer>.AsyncIterator

func makeAsyncIterator() -> AsyncIterator {
public func makeAsyncIterator() -> AsyncIterator {
switch self.storage {
case .byteBuffer(let buffer):
return AnyAsyncSequence(buffer.asyncSequence(chunkSize: buffer.readableBytes)).makeAsyncIterator()
Expand All @@ -82,6 +81,14 @@ extension HTTPBody: AsyncSequence {
}
}

extension HTTPBody: Decodable {
// HTTPBody has to conform to Decodable so I can add it to AWSShape objects (which conform to Decodable). But we don't want the
// Encoder/Decoder ever to process a AWSPayload
public init(from decoder: Decoder) throws {
preconditionFailure("Cannot decode an HTTPBody")
}
}

/// HTTP Request
struct AWSHTTPRequest {
let url: URL
Expand Down
10 changes: 0 additions & 10 deletions Sources/SotoCore/HTTP/AsyncHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,4 @@ extension AsyncHTTPClient.HTTPClient {
body: .init(response.body, length: nil)
)
}

func execute(
request: AWSHTTPRequest,
timeout: TimeAmount,
on eventLoop: EventLoop,
logger: Logger,
stream: @escaping AWSResponseStream
) async throws -> HTTPClientResponse {
preconditionFailure("Not supported")
}
}
5 changes: 2 additions & 3 deletions Sources/SotoCore/Message/AWSResponse+HAL.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import NIOCore
extension AWSResponse {
/// return if body is hypertext application language
var isHypertextApplicationLanguage: Bool {
guard case .json = self.body,
let contentType = self.headers["content-type"].first,
guard let contentType = self.headers["content-type"].first,
contentType.contains("hal+json")
else {
return false
Expand All @@ -30,7 +29,7 @@ extension AWSResponse {

/// process hal+json data. Extract properties from HAL
func getHypertextApplicationLanguageDictionary() throws -> [String: Any] {
guard case .json(let buffer) = self.body else { return [:] }
guard case .byteBuffer(let buffer) = self.body.storage else { return [:] }
// extract embedded resources from HAL
guard let data = buffer.getData(at: buffer.readerIndex, length: buffer.readableBytes) else { return [:] }
let jsonObject = try JSONSerialization.jsonObject(with: data, options: [])
Expand Down
Loading

0 comments on commit 1262117

Please sign in to comment.