From 88c019f041afd84941745c4f97f016892e72f2db Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 19 Jun 2023 15:44:24 +0100 Subject: [PATCH] Remove AWSClient EventLoop APIs (#553) * Remove AWSClient.execute calls with EventLoop * Fix STSAssumeRole after removing EventLoop execute * Fix AWSClient waiter code * Remove commented out code * Remove EL credential functions * Merge async files with parent implementations * Delete AWSClient+Waiter+async.swift * Fix up paginate code * Remove EventLoop from execute closure * Changes requested in PR * Update Sources/SotoCore/Waiters/AWSClient+Waiter.swift Co-authored-by: Tim Condon <0xTim@users.noreply.github.com> --------- Co-authored-by: Tim Condon <0xTim@users.noreply.github.com> --- Sources/SotoCore/AWSClient+Paginate.swift | 11 +- Sources/SotoCore/AWSClient+async.swift | 438 ------------------ Sources/SotoCore/AWSClient.swift | 383 +++++++-------- Sources/SotoCore/AWSService+async.swift | 57 --- Sources/SotoCore/AWSService.swift | 56 +-- .../SotoCore/Credential/STSAssumeRole.swift | 31 +- .../Waiters/AWSClient+Waiter+async.swift | 86 ---- .../SotoCore/Waiters/AWSClient+Waiter.swift | 66 ++- Tests/SotoCoreTests/AWSClientTests.swift | 4 +- .../EndpointDiscoveryTests.swift | 6 +- Tests/SotoCoreTests/PaginateTests.swift | 4 +- Tests/SotoCoreTests/WaiterTests.swift | 45 +- 12 files changed, 334 insertions(+), 853 deletions(-) delete mode 100644 Sources/SotoCore/AWSClient+async.swift delete mode 100644 Sources/SotoCore/AWSService+async.swift delete mode 100644 Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift diff --git a/Sources/SotoCore/AWSClient+Paginate.swift b/Sources/SotoCore/AWSClient+Paginate.swift index 0c00c5b99..5c9006ddb 100644 --- a/Sources/SotoCore/AWSClient+Paginate.swift +++ b/Sources/SotoCore/AWSClient+Paginate.swift @@ -27,12 +27,11 @@ extension AWSClient { public struct PaginatorSequence: AsyncSequence where Input.Token: Equatable { public typealias Element = Output let input: Input - let command: (Input, Logger, EventLoop?) async throws -> Output + let command: (Input, Logger) async throws -> Output let inputKey: KeyPath? let outputKey: KeyPath let moreResultsKey: KeyPath? let logger: Logger - let eventLoop: EventLoop? /// Initialize PaginatorSequence /// - Parameters: @@ -43,12 +42,11 @@ extension AWSClient { /// - eventLoop: EventLoop to run everything on public init( input: Input, - command: @escaping ((Input, Logger, EventLoop?) async throws -> Output), + command: @escaping ((Input, Logger) async throws -> Output), inputKey: KeyPath? = nil, outputKey: KeyPath, moreResultsKey: KeyPath? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) { self.input = input self.command = command @@ -56,7 +54,6 @@ extension AWSClient { self.inputKey = inputKey self.moreResultsKey = moreResultsKey self.logger = AWSClient.loggingDisabled - self.eventLoop = eventLoop } /// Iterator for iterating over `PaginatorSequence` @@ -71,7 +68,7 @@ extension AWSClient { public mutating func next() async throws -> Output? { if let input = input { - let output = try await self.sequence.command(input, self.sequence.logger, self.sequence.eventLoop) + let output = try await self.sequence.command(input, self.sequence.logger) if let token = output[keyPath: sequence.outputKey], sequence.inputKey == nil || token != input[keyPath: sequence.inputKey!], sequence.moreResultsKey == nil || output[keyPath: sequence.moreResultsKey!] == true diff --git a/Sources/SotoCore/AWSClient+async.swift b/Sources/SotoCore/AWSClient+async.swift deleted file mode 100644 index 2ba28f993..000000000 --- a/Sources/SotoCore/AWSClient+async.swift +++ /dev/null @@ -1,438 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Soto for AWS open source project -// -// Copyright (c) 2017-2023 the Soto project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Soto project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Dispatch -import Foundation -import Logging -import Metrics -import NIOCore -import SotoSignerV4 - -extension AWSClient { - /// Shutdown AWSClient asynchronously. - /// - /// Before an `AWSClient` is deleted you need to call this function or the synchronous - /// version `syncShutdown` to do a clean shutdown of the client. It cleans up `CredentialProvider` tasks and shuts down - /// the HTTP client if it was created by the `AWSClient`. - public func shutdown() async throws { - guard self.isShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged else { - throw ClientError.alreadyShutdown - } - // shutdown credential provider ignoring any errors as credential provider that doesn't initialize - // can cause the shutdown process to fail - try? await self.credentialProvider.shutdown(on: self.eventLoopGroup.any()).get() - // if httpClient was created by AWSClient then it is required to shutdown the httpClient. - switch self.httpClientProvider { - case .createNew, .createNewWithEventLoopGroup: - do { - try await self.httpClient.shutdown() - } catch { - self.clientLogger.log(level: self.options.errorLogLevel, "Error shutting down HTTP client", metadata: [ - "aws-error": "\(error)", - ]) - throw error - } - - case .shared: - return - } - } - - /// execute a request with an input object and an empty response - /// - parameters: - /// - operationName: Name of the AWS operation - /// - path: path to append to endpoint URL - /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) - /// - serviceConfig: AWS Service configuration - /// - input: Input object - /// - hostPrefix: String to prefix host name with - /// - logger: Logger to log request details to - public func execute( - operation operationName: String, - path: String, - httpMethod: HTTPMethod, - serviceConfig: AWSServiceConfig, - input: Input, - hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled - ) async throws { - return try await self.execute( - operation: operationName, - createRequest: { - try AWSRequest( - operation: operationName, - path: path, - httpMethod: httpMethod, - input: input, - hostPrefix: hostPrefix, - configuration: serviceConfig - ) - }, - execute: { request, eventLoop, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) - }, - processResponse: { _ in - return - }, - config: serviceConfig, - logger: logger - ) - } - - /// Execute an empty request and an empty response - /// - parameters: - /// - operationName: Name of the AWS operation - /// - path: path to append to endpoint URL - /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) - /// - serviceConfig: AWS Service configuration - /// - logger: Logger to log request details to - public func execute( - operation operationName: String, - path: String, - httpMethod: HTTPMethod, - serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled - ) async throws { - return try await self.execute( - operation: operationName, - createRequest: { - try AWSRequest( - operation: operationName, - path: path, - httpMethod: httpMethod, - configuration: serviceConfig - ) - }, - execute: { request, eventLoop, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) - }, - processResponse: { _ in - return - }, - config: serviceConfig, - logger: logger - ) - } - - /// Execute an empty request and return the output object generated from the response - /// - parameters: - /// - operationName: Name of the AWS operation - /// - path: path to append to endpoint URL - /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) - /// - serviceConfig: AWS Service configuration - /// - logger: Logger to log request details to - /// - returns: - /// Output object that completes when response is received - public func execute( - operation operationName: String, - path: String, - httpMethod: HTTPMethod, - serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> Output { - return try await self.execute( - operation: operationName, - createRequest: { - try AWSRequest( - operation: operationName, - path: path, - httpMethod: httpMethod, - configuration: serviceConfig - ) - }, - execute: { request, eventLoop, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) - }, - processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) - }, - config: serviceConfig, - logger: logger - ) - } - - /// Execute a request with an input object and return the output object generated from the response - /// - parameters: - /// - operationName: Name of the AWS operation - /// - path: path to append to endpoint URL - /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) - /// - serviceConfig: AWS Service configuration - /// - input: Input object - /// - hostPrefix: String to prefix host name with - /// - logger: Logger to log request details to - /// - returns: - /// Output object that completes when response is received - public func execute( - operation operationName: String, - path: String, - httpMethod: HTTPMethod, - serviceConfig: AWSServiceConfig, - input: Input, - hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> Output { - return try await self.execute( - operation: operationName, - createRequest: { - try AWSRequest( - operation: operationName, - path: path, - httpMethod: httpMethod, - input: input, - hostPrefix: hostPrefix, - configuration: serviceConfig - ) - }, - execute: { request, eventLoop, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) - }, - processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) - }, - config: serviceConfig, - logger: logger - ) - } - - /// Execute a request with an input object and return the output object generated from the response - /// - parameters: - /// - operationName: Name of the AWS operation - /// - path: path to append to endpoint URL - /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) - /// - serviceConfig: AWS Service configuration - /// - input: Input object - /// - hostPrefix: String to prefix host name with - /// - logger: Logger to log request details to - /// - returns: - /// Output object that completes when response is received - public func execute( - operation operationName: String, - path: String, - httpMethod: HTTPMethod, - serviceConfig: AWSServiceConfig, - input: Input, - hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled, - stream: @escaping AWSResponseStream - ) async throws -> Output { - return try await self.execute( - operation: operationName, - createRequest: { - try AWSRequest( - operation: operationName, - path: path, - httpMethod: httpMethod, - input: input, - hostPrefix: hostPrefix, - configuration: serviceConfig - ) - }, - execute: { request, eventLoop, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger, stream: stream) - }, - processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) - }, - config: serviceConfig, - logger: logger - ) - } - - /// internal version of execute - internal func execute( - operation operationName: String, - createRequest: @escaping () throws -> AWSRequest, - execute: @escaping (AWSHTTPRequest, EventLoop, Logger) async throws -> AWSHTTPResponse, - processResponse: @escaping (AWSHTTPResponse) throws -> Output, - config: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> Output { - let logger = logger.attachingRequestId( - Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), - operation: operationName, - service: config.service - ) - let eventLoop = self.eventLoopGroup.any() - let dimensions: [(String, String)] = [("aws-service", config.service), ("aws-operation", operationName)] - let startTime = DispatchTime.now().uptimeNanoseconds - - Counter(label: "aws_requests_total", dimensions: dimensions).increment() - logger.log(level: self.options.requestLogLevel, "AWS Request") - do { - // get credentials - let credential = try await credentialProvider.getCredential(on: eventLoop, logger: logger).get() - // construct signer - let signer = AWSSigner(credentials: credential, name: config.signingName, region: config.region.rawValue) - // create request and sign with signer - let awsRequest = try createRequest() - .applyMiddlewares(config.middlewares + self.middlewares, config: config) - .createHTTPRequest(signer: signer, serviceConfig: config) - // send request to AWS and process result - let streaming: Bool - switch awsRequest.body.payload { - case .stream: - streaming = true - default: - streaming = false - } - try Task.checkCancellation() - let response = try await self.invoke( - with: config, - eventLoop: eventLoop, - logger: logger, - request: { eventLoop in try await execute(awsRequest, eventLoop, logger) }, - processResponse: processResponse, - streaming: streaming - ) - logger.trace("AWS Response") - Metrics.Timer( - label: "aws_request_duration", - dimensions: dimensions, - preferredDisplayUnit: .seconds - ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) - return response - } catch { - Counter(label: "aws_request_errors", dimensions: dimensions).increment() - // AWSErrorTypes have already been logged - if error as? AWSErrorType == nil { - // log error message - logger.error("AWSClient error", metadata: [ - "aws-error-message": "\(error)", - ]) - } - throw error - } - } - - func invoke( - with serviceConfig: AWSServiceConfig, - eventLoop: EventLoop, - logger: Logger, - request: @escaping (EventLoop) async throws -> AWSHTTPResponse, - processResponse: @escaping (AWSHTTPResponse) throws -> Output, - streaming: Bool - ) async throws -> Output { - var attempt = 0 - while true { - do { - let response = try await request(eventLoop) - // if it returns an HTTP status code outside 2xx then throw an error - guard (200..<300).contains(response.status.code) else { - throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) - } - let output = try processResponse(response) - return output - } catch { - // if streaming and the error returned is an AWS error fail immediately. Do not attempt - // to retry as the streaming function will not know you are retrying - if streaming, - error is AWSErrorType || error is AWSRawError - { - throw error - } - // If I get a retry wait time for this error then attempt to retry request - if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) { - logger.trace("Retrying request", metadata: [ - "aws-retry-time": "\(Double(retryTime.nanoseconds) / 1_000_000_000)", - ]) - try await Task.sleep(nanoseconds: UInt64(retryTime.nanoseconds)) - } else { - throw error - } - } - attempt += 1 - } - } - - /// Get credential used by client - /// - Parameters: - /// - eventLoop: optional eventLoop to run operation on - /// - logger: optional logger to use - /// - Returns: Credential - public func getCredential(on eventLoop: EventLoop? = nil, logger: Logger = AWSClient.loggingDisabled) async throws -> Credential { - let eventLoop = eventLoop ?? self.eventLoopGroup.next() - if let asyncCredentialProvider = self.credentialProvider as? AsyncCredentialProvider { - return try await asyncCredentialProvider.getCredential(on: eventLoop, logger: logger) - } else { - return try await self.credentialProvider.getCredential(on: eventLoop, logger: logger).get() - } - } - - /// Generate a signed URL - /// - parameters: - /// - url : URL to sign - /// - httpMethod: HTTP method to use (.GET, .PUT, .PUSH etc) - /// - httpHeaders: Headers that are to be used with this URL. Be sure to include these headers when you used the returned URL - /// - expires: How long before the signed URL expires - /// - serviceConfig: additional AWS service configuration used to sign the url - /// - logger: Logger to output to - /// - returns: - /// A signed URL - public func signURL( - url: URL, - httpMethod: HTTPMethod, - headers: HTTPHeaders = HTTPHeaders(), - expires: TimeAmount, - serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> URL { - let logger = logger.attachingRequestId( - Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), - operation: "signHeaders", - service: serviceConfig.service - ) - let signer = try await self.createSigner(serviceConfig: serviceConfig, logger: logger) - guard let cleanURL = signer.processURL(url: url) else { - throw AWSClient.ClientError.invalidURL - } - return signer.signURL(url: cleanURL, method: httpMethod, headers: headers, expires: expires) - } - - /// Generate signed headers - /// - parameters: - /// - url : URL to sign - /// - httpMethod: HTTP method to use (.GET, .PUT, .PUSH etc) - /// - httpHeaders: Headers that are to be used with this URL. - /// - body: Payload to sign as well. While it is unnecessary to provide the body for S3 other services may require it - /// - serviceConfig: additional AWS service configuration used to sign the url - /// - logger: Logger to output to - /// - returns: - /// A set of signed headers that include the original headers supplied - public func signHeaders( - url: URL, - httpMethod: HTTPMethod, - headers: HTTPHeaders = HTTPHeaders(), - body: AWSPayload, - serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> HTTPHeaders { - let logger = logger.attachingRequestId( - Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), - operation: "signHeaders", - service: serviceConfig.service - ) - let signer = try await self.createSigner(serviceConfig: serviceConfig, logger: logger) - guard let cleanURL = signer.processURL(url: url) else { - throw AWSClient.ClientError.invalidURL - } - let body: AWSSigner.BodyData? = body.asByteBuffer().map { .byteBuffer($0) } - return signer.signHeaders(url: cleanURL, method: httpMethod, headers: headers, body: body) - } - - func createSigner(serviceConfig: AWSServiceConfig, logger: Logger) async throws -> AWSSigner { - let credential = try await credentialProvider.getCredential(on: eventLoopGroup.next(), logger: logger).get() - return AWSSigner(credentials: credential, name: serviceConfig.signingName, region: serviceConfig.region.rawValue) - } -} diff --git a/Sources/SotoCore/AWSClient.swift b/Sources/SotoCore/AWSClient.swift index 9d7e2f853..b6672205a 100644 --- a/Sources/SotoCore/AWSClient.swift +++ b/Sources/SotoCore/AWSClient.swift @@ -31,7 +31,7 @@ import SotoXML /// This is the workhorse of SotoCore. You provide it with a ``AWSShape`` Input object, it converts it to ``AWSRequest`` which is then converted /// to a raw `HTTPClient` Request. This is then sent to AWS. When the response from AWS is received if it is successful it is converted to a ``AWSResponse`` /// which is then decoded to generate a ``AWSShape`` Output object. If it is not successful then `AWSClient` will throw an ``AWSErrorType``. -public final class AWSClient { +public final class AWSClient: Sendable { // MARK: Member variables /// Default logger that logs nothing @@ -203,7 +203,7 @@ public final class AWSClient { } /// Specifies how `HTTPClient` will be created and establishes lifecycle ownership. - public enum HTTPClientProvider { + public enum HTTPClientProvider: Sendable { /// Use HTTPClient provided by the user. User is responsible for the lifecycle of the HTTPClient. case shared(HTTPClient) /// HTTPClient will be created by AWSClient using provided EventLoopGroup. When `shutdown` is called, created `HTTPClient` @@ -214,7 +214,7 @@ public final class AWSClient { } /// Additional options - public struct Options { + public struct Options: Sendable { /// log level used for request logging let requestLogLevel: Logger.Level /// log level used for error logging @@ -234,9 +234,37 @@ public final class AWSClient { // MARK: API Calls -// public facing apis extension AWSClient { - /// execute a request with an input object and return a future with an empty response + /// Shutdown AWSClient asynchronously. + /// + /// Before an `AWSClient` is deleted you need to call this function or the synchronous + /// version `syncShutdown` to do a clean shutdown of the client. It cleans up `CredentialProvider` tasks and shuts down + /// the HTTP client if it was created by the `AWSClient`. + public func shutdown() async throws { + guard self.isShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged else { + throw ClientError.alreadyShutdown + } + // shutdown credential provider ignoring any errors as credential provider that doesn't initialize + // can cause the shutdown process to fail + try? await self.credentialProvider.shutdown(on: self.eventLoopGroup.any()).get() + // if httpClient was created by AWSClient then it is required to shutdown the httpClient. + switch self.httpClientProvider { + case .createNew, .createNewWithEventLoopGroup: + do { + try await self.httpClient.shutdown() + } catch { + self.clientLogger.log(level: self.options.errorLogLevel, "Error shutting down HTTP client", metadata: [ + "aws-error": "\(error)", + ]) + throw error + } + + case .shared: + return + } + } + + /// execute a request with an input object and an empty response /// - parameters: /// - operationName: Name of the AWS operation /// - path: path to append to endpoint URL @@ -245,9 +273,6 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on - /// - returns: - /// Empty Future that completes when response is received public func execute( operation operationName: String, path: String, @@ -255,10 +280,9 @@ extension AWSClient { serviceConfig: AWSServiceConfig, input: Input, hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - return execute( + logger: Logger = AWSClient.loggingDisabled + ) async throws { + return try await self.execute( operation: operationName, createRequest: { try AWSRequest( @@ -270,37 +294,32 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + execute: { request, logger in + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) }, processResponse: { _ in return }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } - /// Execute an empty request and return a future with an empty response + /// Execute an empty request and an empty response /// - parameters: /// - operationName: Name of the AWS operation /// - path: path to append to endpoint URL /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) /// - serviceConfig: AWS Service configuration /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on - /// - returns: - /// Empty Future that completes when response is received public func execute( operation operationName: String, path: String, httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - return execute( + logger: Logger = AWSClient.loggingDisabled + ) async throws { + return try await self.execute( operation: operationName, createRequest: { try AWSRequest( @@ -310,37 +329,34 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + execute: { request, logger in + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) }, processResponse: { _ in return }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } - /// Execute an empty request and return a future with the output object generated from the response + /// Execute an empty request and return the output object generated from the response /// - parameters: /// - operationName: Name of the AWS operation /// - path: path to append to endpoint URL /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) /// - serviceConfig: AWS Service configuration /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on /// - returns: - /// Future containing output object that completes when response is received + /// Output object that completes when response is received public func execute( operation operationName: String, path: String, httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - return execute( + logger: Logger = AWSClient.loggingDisabled + ) async throws -> Output { + return try await self.execute( operation: operationName, createRequest: { try AWSRequest( @@ -350,19 +366,18 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + execute: { request, logger in + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } - /// Execute a request with an input object and return a future with the output object generated from the response + /// Execute a request with an input object and return the output object generated from the response /// - parameters: /// - operationName: Name of the AWS operation /// - path: path to append to endpoint URL @@ -371,9 +386,8 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on /// - returns: - /// Future containing output object that completes when response is received + /// Output object that completes when response is received public func execute( operation operationName: String, path: String, @@ -381,10 +395,9 @@ extension AWSClient { serviceConfig: AWSServiceConfig, input: Input, hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - return execute( + logger: Logger = AWSClient.loggingDisabled + ) async throws -> Output { + return try await self.execute( operation: operationName, createRequest: { try AWSRequest( @@ -396,21 +409,18 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + execute: { request, logger in + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } - /// Execute a request with an input object and return output object generated from the response - /// - /// This version of execute also streams the payload of the response into the provided closure + /// Execute a request with an input object and return the output object generated from the response /// - parameters: /// - operationName: Name of the AWS operation /// - path: path to append to endpoint URL @@ -419,10 +429,8 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on - /// - stream: Closure to stream payload response into /// - returns: - /// Future containing output object that completes when response is received + /// Output object that completes when response is received public func execute( operation operationName: String, path: String, @@ -431,10 +439,9 @@ extension AWSClient { input: Input, hostPrefix: String? = nil, logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil, stream: @escaping AWSResponseStream - ) -> EventLoopFuture { - return execute( + ) async throws -> Output { + return try await self.execute( operation: operationName, createRequest: { try AWSRequest( @@ -446,15 +453,14 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger, stream: stream) + execute: { request, logger in + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger, stream: stream) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -462,47 +468,103 @@ extension AWSClient { internal func execute( operation operationName: String, createRequest: @escaping () throws -> AWSRequest, - execute: @escaping (AWSHTTPRequest, EventLoop, Logger) -> EventLoopFuture, + execute: @escaping (AWSHTTPRequest, Logger) async throws -> AWSHTTPResponse, processResponse: @escaping (AWSHTTPResponse) throws -> Output, config: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) -> EventLoopFuture { - let eventLoop = eventLoop ?? eventLoopGroup.next() + logger: Logger = AWSClient.loggingDisabled + ) async throws -> Output { let logger = logger.attachingRequestId( Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), operation: operationName, service: config.service ) - // get credentials - let future: EventLoopFuture = credentialProvider.getCredential(on: eventLoop, logger: logger) - .flatMapThrowing { credential -> AWSHTTPRequest in - // construct signer - let signer = AWSSigner(credentials: credential, name: config.signingName, region: config.region.rawValue) - // create request and sign with signer - let awsRequest = try createRequest() - return try awsRequest - .applyMiddlewares(config.middlewares + self.middlewares, config: config) - .createHTTPRequest(signer: signer, serviceConfig: config) - }.flatMap { request -> EventLoopFuture in - // send request to AWS and process result - let streaming: Bool - switch request.body.payload { - case .stream: - streaming = true - default: - streaming = false + let dimensions: [(String, String)] = [("aws-service", config.service), ("aws-operation", operationName)] + let startTime = DispatchTime.now().uptimeNanoseconds + + Counter(label: "aws_requests_total", dimensions: dimensions).increment() + logger.log(level: self.options.requestLogLevel, "AWS Request") + do { + // get credentials + let credential = try await credentialProvider.getCredential(on: self.eventLoopGroup.any(), logger: logger).get() + // construct signer + let signer = AWSSigner(credentials: credential, name: config.signingName, region: config.region.rawValue) + // create request and sign with signer + let awsRequest = try createRequest() + .applyMiddlewares(config.middlewares + self.middlewares, config: config) + .createHTTPRequest(signer: signer, serviceConfig: config) + // send request to AWS and process result + let streaming: Bool + switch awsRequest.body.payload { + case .stream: + streaming = true + default: + streaming = false + } + try Task.checkCancellation() + let response = try await self.invoke( + with: config, + logger: logger, + request: { try await execute(awsRequest, logger) }, + processResponse: processResponse, + streaming: streaming + ) + logger.trace("AWS Response") + Metrics.Timer( + label: "aws_request_duration", + dimensions: dimensions, + preferredDisplayUnit: .seconds + ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) + return response + } catch { + Counter(label: "aws_request_errors", dimensions: dimensions).increment() + // AWSErrorTypes have already been logged + if error as? AWSErrorType == nil { + // log error message + logger.error("AWSClient error", metadata: [ + "aws-error-message": "\(error)", + ]) + } + throw error + } + } + + func invoke( + with serviceConfig: AWSServiceConfig, + logger: Logger, + request: @escaping () async throws -> AWSHTTPResponse, + processResponse: @escaping (AWSHTTPResponse) throws -> Output, + streaming: Bool + ) async throws -> Output { + var attempt = 0 + while true { + do { + let response = try await request() + // if it returns an HTTP status code outside 2xx then throw an error + guard (200..<300).contains(response.status.code) else { + throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) + } + let output = try processResponse(response) + return output + } catch { + // if streaming and the error returned is an AWS error fail immediately. Do not attempt + // to retry as the streaming function will not know you are retrying + if streaming, + error is AWSErrorType || error is AWSRawError + { + throw error + } + // If I get a retry wait time for this error then attempt to retry request + if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) { + logger.trace("Retrying request", metadata: [ + "aws-retry-time": "\(Double(retryTime.nanoseconds) / 1_000_000_000)", + ]) + try await Task.sleep(nanoseconds: UInt64(retryTime.nanoseconds)) + } else { + throw error } - return self.invoke( - with: config, - eventLoop: eventLoop, - logger: logger, - request: { eventLoop in execute(request, eventLoop, logger) }, - processResponse: processResponse, - streaming: streaming - ) } - return recordRequest(future, service: config.service, operation: operationName, logger: logger) + attempt += 1 + } } /// Get credential used by client @@ -510,9 +572,13 @@ extension AWSClient { /// - eventLoop: optional eventLoop to run operation on /// - logger: optional logger to use /// - Returns: Credential - public func getCredential(on eventLoop: EventLoop? = nil, logger: Logger = AWSClient.loggingDisabled) -> EventLoopFuture { - let eventLoop = eventLoop ?? self.eventLoopGroup.next() - return self.credentialProvider.getCredential(on: eventLoop, logger: logger) + public func getCredential(logger: Logger = AWSClient.loggingDisabled) async throws -> Credential { + let eventLoop = self.eventLoopGroup.any() + if let asyncCredentialProvider = self.credentialProvider as? AsyncCredentialProvider { + return try await asyncCredentialProvider.getCredential(on: eventLoop, logger: logger) + } else { + return try await self.credentialProvider.getCredential(on: eventLoop, logger: logger).get() + } } /// Generate a signed URL @@ -532,18 +598,17 @@ extension AWSClient { expires: TimeAmount, serviceConfig: AWSServiceConfig, logger: Logger = AWSClient.loggingDisabled - ) -> EventLoopFuture { + ) async throws -> URL { let logger = logger.attachingRequestId( Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), - operation: "signURL", + operation: "signHeaders", service: serviceConfig.service ) - return createSigner(serviceConfig: serviceConfig, logger: logger).flatMapThrowing { signer in - guard let cleanURL = signer.processURL(url: url) else { - throw AWSClient.ClientError.invalidURL - } - return signer.signURL(url: cleanURL, method: httpMethod, headers: headers, expires: expires) + let signer = try await self.createSigner(serviceConfig: serviceConfig, logger: logger) + guard let cleanURL = signer.processURL(url: url) else { + throw AWSClient.ClientError.invalidURL } + return signer.signURL(url: cleanURL, method: httpMethod, headers: headers, expires: expires) } /// Generate signed headers @@ -563,25 +628,23 @@ extension AWSClient { body: AWSPayload, serviceConfig: AWSServiceConfig, logger: Logger = AWSClient.loggingDisabled - ) -> EventLoopFuture { + ) async throws -> HTTPHeaders { let logger = logger.attachingRequestId( Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), operation: "signHeaders", service: serviceConfig.service ) - return createSigner(serviceConfig: serviceConfig, logger: logger).flatMapThrowing { signer in - guard let cleanURL = signer.processURL(url: url) else { - throw AWSClient.ClientError.invalidURL - } - let body: AWSSigner.BodyData? = body.asByteBuffer().map { .byteBuffer($0) } - return signer.signHeaders(url: cleanURL, method: httpMethod, headers: headers, body: body) + let signer = try await self.createSigner(serviceConfig: serviceConfig, logger: logger) + guard let cleanURL = signer.processURL(url: url) else { + throw AWSClient.ClientError.invalidURL } + let body: AWSSigner.BodyData? = body.asByteBuffer().map { .byteBuffer($0) } + return signer.signHeaders(url: cleanURL, method: httpMethod, headers: headers, body: body) } - func createSigner(serviceConfig: AWSServiceConfig, logger: Logger) -> EventLoopFuture { - return credentialProvider.getCredential(on: eventLoopGroup.next(), logger: logger).map { credential in - return AWSSigner(credentials: credential, name: serviceConfig.signingName, region: serviceConfig.region.rawValue) - } + func createSigner(serviceConfig: AWSServiceConfig, logger: Logger) async throws -> AWSSigner { + let credential = try await credentialProvider.getCredential(on: eventLoopGroup.next(), logger: logger).get() + return AWSSigner(credentials: credential, name: serviceConfig.signingName, region: serviceConfig.region.rawValue) } } @@ -622,59 +685,6 @@ extension AWSClient { } } -// invoker -extension AWSClient { - func invoke( - with serviceConfig: AWSServiceConfig, - eventLoop: EventLoop, - logger: Logger, - request: @escaping (EventLoop) -> EventLoopFuture, - processResponse: @escaping (AWSHTTPResponse) throws -> Output, - streaming: Bool - ) -> EventLoopFuture { - let promise = eventLoop.makePromise(of: Output.self) - - func execute(attempt: Int) { - // execute HTTP request - _ = request(eventLoop) - .flatMapThrowing { response throws in - // if it returns an HTTP status code outside 2xx then throw an error - guard (200..<300).contains(response.status.code) else { - throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) - } - let output = try processResponse(response) - promise.succeed(output) - } - .flatMapErrorThrowing { error in - // if streaming and the error returned is an AWS error fail immediately. Do not attempt - // to retry as the streaming function will not know you are retrying - if streaming, - error is AWSErrorType || error is AWSRawError - { - promise.fail(error) - return - } - // If I get a retry wait time for this error then attempt to retry request - if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) { - logger.trace("Retrying request", metadata: [ - "aws-retry-time": "\(Double(retryTime.nanoseconds) / 1_000_000_000)", - ]) - // schedule task for retrying AWS request - eventLoop.scheduleTask(in: retryTime) { - execute(attempt: attempt + 1) - } - } else { - promise.fail(error) - } - } - } - - execute(attempt: 0) - - return promise.futureResult - } -} - extension AWSClient.ClientError: CustomStringConvertible { /// return human readable description of error public var description: String { @@ -700,37 +710,6 @@ extension AWSClient.ClientError: CustomStringConvertible { } } -extension AWSClient { - /// Record request in swift-metrics, and swift-log - func recordRequest(_ future: EventLoopFuture, service: String, operation: String, logger: Logger) -> EventLoopFuture { - let dimensions: [(String, String)] = [("aws-service", service), ("aws-operation", operation)] - let startTime = DispatchTime.now().uptimeNanoseconds - - Counter(label: "aws_requests_total", dimensions: dimensions).increment() - logger.log(level: self.options.requestLogLevel, "AWS Request") - - return future.map { response in - logger.trace("AWS Response") - Metrics.Timer( - label: "aws_request_duration", - dimensions: dimensions, - preferredDisplayUnit: .seconds - ).recordNanoseconds(DispatchTime.now().uptimeNanoseconds - startTime) - return response - }.flatMapErrorThrowing { error in - Counter(label: "aws_request_errors", dimensions: dimensions).increment() - // AWSErrorTypes have already been logged - if error as? AWSErrorType == nil { - // log error message - logger.log(level: self.options.errorLogLevel, "AWSClient error", metadata: [ - "aws-error-message": "\(error)", - ]) - } - throw error - } - } -} - extension Logger { func attachingRequestId(_ id: Int, operation: String, service: String) -> Logger { var logger = self @@ -740,7 +719,3 @@ extension Logger { return logger } } - -extension AWSClient: Sendable {} -extension AWSClient.HTTPClientProvider: Sendable {} -extension AWSClient.Options: Sendable {} diff --git a/Sources/SotoCore/AWSService+async.swift b/Sources/SotoCore/AWSService+async.swift deleted file mode 100644 index dd03be027..000000000 --- a/Sources/SotoCore/AWSService+async.swift +++ /dev/null @@ -1,57 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Soto for AWS open source project -// -// Copyright (c) 2020-2021 the Soto project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Soto project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import struct Foundation.URL -import NIOCore - -/// Protocol for services objects. Contains a client to communicate with AWS and config for defining how to communicate -extension AWSService { - /// Generate a signed URL - /// - parameters: - /// - url : URL to sign - /// - httpMethod: HTTP method to use (.GET, .PUT, .PUSH etc) - /// - headers: Headers that are to be used with this URL. Be sure to include these headers when you used the returned URL - /// - expires: How long before the signed URL expires - /// - logger: Logger to output to - /// - returns: - /// A signed URL - public func signURL( - url: URL, - httpMethod: HTTPMethod, - headers: HTTPHeaders = HTTPHeaders(), - expires: TimeAmount, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> URL { - return try await self.client.signURL(url: url, httpMethod: httpMethod, headers: headers, expires: expires, serviceConfig: self.config, logger: logger) - } - - /// Generate signed headers - /// - parameters: - /// - url : URL to sign - /// - httpMethod: HTTP method to use (.GET, .PUT, .PUSH etc) - /// - headers: Headers that are to be used with this URL. Be sure to include these headers when you used the returned URL - /// - body: body payload to sign as well. While it is unnecessary to provide the body for S3 other services require it - /// - logger: Logger to output to - /// - returns: - /// A series of signed headers including the original headers provided to the function - public func signHeaders( - url: URL, - httpMethod: HTTPMethod, - headers: HTTPHeaders = HTTPHeaders(), - body: AWSPayload = .empty, - logger: Logger = AWSClient.loggingDisabled - ) async throws -> HTTPHeaders { - return try await self.client.signHeaders(url: url, httpMethod: httpMethod, headers: headers, body: body, serviceConfig: self.config, logger: logger) - } -} diff --git a/Sources/SotoCore/AWSService.swift b/Sources/SotoCore/AWSService.swift index 34331829b..5b8533daa 100644 --- a/Sources/SotoCore/AWSService.swift +++ b/Sources/SotoCore/AWSService.swift @@ -42,6 +42,30 @@ extension AWSService { /// The EventLoopGroup service is using public var eventLoopGroup: EventLoopGroup { return client.eventLoopGroup } + /// Return new version of Service with edited parameters + /// - Parameters: + /// - region: Server region + /// - middlewares: Additional middleware to add + /// - timeout: Time out value for HTTP requests + /// - byteBufferAllocator: byte buffer allocator used throughout AWSClient + /// - options: options used by client when processing requests + /// - Returns: New version of the service + public func with( + region: Region? = nil, + middlewares: [AWSServiceMiddleware] = [], + timeout: TimeAmount? = nil, + byteBufferAllocator: ByteBufferAllocator? = nil, + options: AWSServiceConfig.Options? = nil + ) -> Self { + return Self(from: self, patch: .init( + region: region, + middlewares: middlewares, + timeout: timeout, + byteBufferAllocator: byteBufferAllocator, + options: options + )) + } + /// Generate a signed URL /// - parameters: /// - url : URL to sign @@ -57,8 +81,8 @@ extension AWSService { headers: HTTPHeaders = HTTPHeaders(), expires: TimeAmount, logger: Logger = AWSClient.loggingDisabled - ) -> EventLoopFuture { - return self.client.signURL(url: url, httpMethod: httpMethod, headers: headers, expires: expires, serviceConfig: self.config, logger: logger) + ) async throws -> URL { + return try await self.client.signURL(url: url, httpMethod: httpMethod, headers: headers, expires: expires, serviceConfig: self.config, logger: logger) } /// Generate signed headers @@ -76,31 +100,7 @@ extension AWSService { headers: HTTPHeaders = HTTPHeaders(), body: AWSPayload = .empty, logger: Logger = AWSClient.loggingDisabled - ) -> EventLoopFuture { - return self.client.signHeaders(url: url, httpMethod: httpMethod, headers: headers, body: body, serviceConfig: self.config, logger: logger) - } - - /// Return new version of Service with edited parameters - /// - Parameters: - /// - region: Server region - /// - middlewares: Additional middleware to add - /// - timeout: Time out value for HTTP requests - /// - byteBufferAllocator: byte buffer allocator used throughout AWSClient - /// - options: options used by client when processing requests - /// - Returns: New version of the service - public func with( - region: Region? = nil, - middlewares: [AWSServiceMiddleware] = [], - timeout: TimeAmount? = nil, - byteBufferAllocator: ByteBufferAllocator? = nil, - options: AWSServiceConfig.Options? = nil - ) -> Self { - return Self(from: self, patch: .init( - region: region, - middlewares: middlewares, - timeout: timeout, - byteBufferAllocator: byteBufferAllocator, - options: options - )) + ) async throws -> HTTPHeaders { + return try await self.client.signHeaders(url: url, httpMethod: httpMethod, headers: headers, body: body, serviceConfig: self.config, logger: logger) } } diff --git a/Sources/SotoCore/Credential/STSAssumeRole.swift b/Sources/SotoCore/Credential/STSAssumeRole.swift index b32211e11..ed44ac465 100644 --- a/Sources/SotoCore/Credential/STSAssumeRole.swift +++ b/Sources/SotoCore/Credential/STSAssumeRole.swift @@ -85,11 +85,11 @@ struct STSCredentials: AWSDecodableShape, ExpiringCredential { } /// Credential Provider that holds an AWSClient -protocol CredentialProviderWithClient: CredentialProvider { +protocol AsyncCredentialProviderWithClient: AsyncCredentialProvider { var client: AWSClient { get } } -extension CredentialProviderWithClient { +extension AsyncCredentialProviderWithClient { /// shutdown credential provider and client func shutdown(on eventLoop: EventLoop) -> EventLoopFuture { // shutdown AWSClient @@ -106,7 +106,7 @@ extension CredentialProviderWithClient { } /// Internal version of AssumeRole credential provider used by ConfigFileCredentialProvider -struct STSAssumeRoleCredentialProvider: CredentialProviderWithClient { +struct STSAssumeRoleCredentialProvider: AsyncCredentialProviderWithClient { let request: STSAssumeRoleRequest let client: AWSClient let config: AWSServiceConfig @@ -131,17 +131,22 @@ struct STSAssumeRoleCredentialProvider: CredentialProviderWithClient { ) } - func getCredential(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { - self.assumeRole(self.request, logger: logger, on: eventLoop) - .flatMapThrowing { response in - guard let credentials = response.credentials else { - throw CredentialProviderError.noProvider - } - return credentials - } + func getCredential(on eventLoop: EventLoop, logger: Logger) async throws -> Credential { + let response = try await self.assumeRole(self.request, logger: logger) + guard let credentials = response.credentials else { + throw CredentialProviderError.noProvider + } + return credentials } - func assumeRole(_ input: STSAssumeRoleRequest, logger: Logger, on eventLoop: EventLoop?) -> EventLoopFuture { - return self.client.execute(operation: "AssumeRole", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + func assumeRole(_ input: STSAssumeRoleRequest, logger: Logger) async throws -> STSAssumeRoleResponse { + return try await self.client.execute( + operation: "AssumeRole", + path: "/", + httpMethod: .POST, + serviceConfig: self.config, + input: input, + logger: logger + ) } } diff --git a/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift b/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift deleted file mode 100644 index b2eb94a85..000000000 --- a/Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift +++ /dev/null @@ -1,86 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Soto for AWS open source project -// -// Copyright (c) 2017-2021 the Soto project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Soto project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Logging -import NIOCore - -extension AWSClient { - /// Returns when waiter polling returns a success state - /// or returns an error if the polling returns an error or timesout - /// - /// - Parameters: - /// - input: Input parameters - /// - waiter: Waiter to wait on - /// - maxWaitTime: Maximum amount of time to wait - /// - logger: Logger used to provide output - /// - eventLoop: EventLoop to run API calls on - /// - Returns: EventLoopFuture that will be fulfilled once waiter has completed - public func waitUntil( - _ input: Input, - waiter: Waiter, - maxWaitTime: TimeAmount? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil - ) async throws { - let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime - let deadline: NIODeadline = .now() + maxWaitTime - let eventLoop = eventLoop ?? eventLoopGroup.next() - - var attempt = 0 - while true { - attempt += 1 - let result: Result - do { - result = try .success(await waiter.command(input, logger, eventLoop).get()) - } catch { - result = .failure(error) - } - var acceptorState: WaiterState? - for acceptor in waiter.acceptors { - if acceptor.matcher.match(result: result.map { $0 }) { - acceptorState = acceptor.state - break - } - } - // if state has not been set then set it based on return of API call - let waiterState: WaiterState - if let state = acceptorState { - waiterState = state - } else if case .failure = result { - waiterState = .failure - } else { - waiterState = .retry - } - // based on state succeed, fail promise or retry - switch waiterState { - case .success: - return - case .failure: - if case .failure(let error) = result { - throw error - } else { - throw ClientError.waiterFailed - } - case .retry: - let wait = waiter.calculateRetryWaitTime(attempt: attempt, remainingTime: deadline - .now()) - if wait < .seconds(0) { - throw ClientError.waiterTimeout - } else { - logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms") - try await Task.sleep(nanoseconds: UInt64(wait.nanoseconds)) - } - } - } - } -} diff --git a/Sources/SotoCore/Waiters/AWSClient+Waiter.swift b/Sources/SotoCore/Waiters/AWSClient+Waiter.swift index fe252dc61..1d80f52ce 100644 --- a/Sources/SotoCore/Waiters/AWSClient+Waiter.swift +++ b/Sources/SotoCore/Waiters/AWSClient+Waiter.swift @@ -39,7 +39,7 @@ extension AWSClient { let matcher: AWSWaiterMatcher } - public typealias WaiterCommand = @Sendable (Input, Logger, EventLoop?) -> EventLoopFuture + public typealias WaiterCommand = @Sendable (Input, Logger) async throws -> Output /// Initialize an waiter /// - Parameters: @@ -91,4 +91,68 @@ extension AWSClient { return timeDelay } } + + /// Returns when waiter polling returns a success state + /// or returns an error if the polling returns an error or timesout + /// + /// - Parameters: + /// - input: Input parameters + /// - waiter: Waiter to wait on + /// - maxWaitTime: Maximum amount of time to wait + /// - logger: Logger used to provide output + public func waitUntil( + _ input: Input, + waiter: Waiter, + maxWaitTime: TimeAmount? = nil, + logger: Logger = AWSClient.loggingDisabled + ) async throws { + let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime + let deadline: NIODeadline = .now() + maxWaitTime + + var attempt = 0 + while true { + attempt += 1 + let result: Result + do { + result = try .success(await waiter.command(input, logger)) + } catch { + result = .failure(error) + } + var acceptorState: WaiterState? + for acceptor in waiter.acceptors { + if acceptor.matcher.match(result: result.map { $0 }) { + acceptorState = acceptor.state + break + } + } + // if state has not been set then set it based on return of API call + let waiterState: WaiterState + if let state = acceptorState { + waiterState = state + } else if case .failure = result { + waiterState = .failure + } else { + waiterState = .retry + } + // based on state succeed, fail promise or retry + switch waiterState { + case .success: + return + case .failure: + if case .failure(let error) = result { + throw error + } else { + throw ClientError.waiterFailed + } + case .retry: + let wait = waiter.calculateRetryWaitTime(attempt: attempt, remainingTime: deadline - .now()) + if wait < .seconds(0) { + throw ClientError.waiterTimeout + } else { + logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms") + try await Task.sleep(nanoseconds: UInt64(wait.nanoseconds)) + } + } + } + } } diff --git a/Tests/SotoCoreTests/AWSClientTests.swift b/Tests/SotoCoreTests/AWSClientTests.swift index fd3cbc684..e088b58a3 100644 --- a/Tests/SotoCoreTests/AWSClientTests.swift +++ b/Tests/SotoCoreTests/AWSClientTests.swift @@ -33,7 +33,7 @@ class AWSClientTests: XCTestCase { XCTAssertNoThrow(try client.syncShutdown()) } - let credentialForSignature = try await client.getCredential(on: client.eventLoopGroup.next(), logger: TestEnvironment.logger).get() + let credentialForSignature = try await client.getCredential(logger: TestEnvironment.logger) XCTAssertEqual(credentialForSignature.accessKeyId, "key") XCTAssertEqual(credentialForSignature.secretAccessKey, "secret") } @@ -46,7 +46,7 @@ class AWSClientTests: XCTestCase { } do { - _ = try await client.getCredential(on: client.eventLoopGroup.next(), logger: TestEnvironment.logger).get() + _ = try await client.getCredential(logger: TestEnvironment.logger) XCTFail("Should not get here") } catch let error as CredentialProviderError where error == .noProvider { // credentials request should fail. One possible error is a connectTimerout diff --git a/Tests/SotoCoreTests/EndpointDiscoveryTests.swift b/Tests/SotoCoreTests/EndpointDiscoveryTests.swift index 5e03e045b..262392f38 100644 --- a/Tests/SotoCoreTests/EndpointDiscoveryTests.swift +++ b/Tests/SotoCoreTests/EndpointDiscoveryTests.swift @@ -60,7 +60,7 @@ final class EndpointDiscoveryTests: XCTestCase { return AWSEndpoints(endpoints: [.init(address: self.endpointToDiscover, cachePeriodInMinutes: 60)]) } - public func test(_ input: TestRequest, logger: Logger = AWSClient.loggingDisabled, on eventLoop: EventLoop? = nil) async throws { + public func test(_ input: TestRequest, logger: Logger = AWSClient.loggingDisabled) async throws { return try await self.client.execute( operation: "Test", path: "/test", @@ -78,7 +78,7 @@ final class EndpointDiscoveryTests: XCTestCase { return AWSEndpoints(endpoints: [.init(address: self.endpointToDiscover, cachePeriodInMinutes: 0)]) } - public func testDontCache(logger: Logger = AWSClient.loggingDisabled, on eventLoop: EventLoop? = nil) async throws { + public func testDontCache(logger: Logger = AWSClient.loggingDisabled) async throws { return try await self.client.execute( operation: "Test", path: "/test", @@ -89,7 +89,7 @@ final class EndpointDiscoveryTests: XCTestCase { ) } - public func testNotRequired(_ input: TestRequest, logger: Logger = AWSClient.loggingDisabled, on eventLoop: EventLoop? = nil) async throws { + public func testNotRequired(_ input: TestRequest, logger: Logger = AWSClient.loggingDisabled) async throws { return try await self.client.execute( operation: "Test", path: "/test", diff --git a/Tests/SotoCoreTests/PaginateTests.swift b/Tests/SotoCoreTests/PaginateTests.swift index b40ad6ebd..4f3ff05f0 100644 --- a/Tests/SotoCoreTests/PaginateTests.swift +++ b/Tests/SotoCoreTests/PaginateTests.swift @@ -68,7 +68,7 @@ final class PaginateTests: XCTestCase, @unchecked Sendable { let outputToken: Int? } - func counter(_ input: CounterInput, logger: Logger, on eventLoop: EventLoop?) async throws -> CounterOutput { + func counter(_ input: CounterInput, logger: Logger) async throws -> CounterOutput { return try await self.client.execute( operation: "TestOperation", path: "/", @@ -89,7 +89,7 @@ final class PaginateTests: XCTestCase, @unchecked Sendable { ) } - func stringList(_ input: StringListInput, logger: Logger, on eventLoop: EventLoop? = nil) async throws -> StringListOutput { + func stringList(_ input: StringListInput, logger: Logger) async throws -> StringListOutput { return try await self.client.execute( operation: "TestOperation", path: "/", diff --git a/Tests/SotoCoreTests/WaiterTests.swift b/Tests/SotoCoreTests/WaiterTests.swift index a8a240ed4..1d54f03bc 100644 --- a/Tests/SotoCoreTests/WaiterTests.swift +++ b/Tests/SotoCoreTests/WaiterTests.swift @@ -40,8 +40,15 @@ class WaiterTests: XCTestCase { let i: Int } - @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + @Sendable func operation(input: Input, logger: Logger) async throws -> Output { + try await self.client.execute( + operation: "Basic", + path: "/", + httpMethod: .POST, + serviceConfig: self.config, + input: input, + logger: logger + ) } struct ArrayOutput: AWSDecodableShape & Encodable { @@ -59,8 +66,15 @@ class WaiterTests: XCTestCase { let array: [Element] } - @Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + @Sendable func arrayOperation(input: Input, logger: Logger) async throws -> ArrayOutput { + try await self.client.execute( + operation: "Basic", + path: "/", + httpMethod: .POST, + serviceConfig: self.config, + input: input, + logger: logger + ) } struct OptionalArrayOutput: AWSDecodableShape & Encodable { @@ -74,8 +88,15 @@ class WaiterTests: XCTestCase { let array: [Element]? } - @Sendable func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + @Sendable func optionalArrayOperation(input: Input, logger: Logger) async throws -> OptionalArrayOutput { + try await self.client.execute( + operation: "Basic", + path: "/", + httpMethod: .POST, + serviceConfig: self.config, + input: input, + logger: logger + ) } func testJMESPathWaiter() async throws { @@ -102,8 +123,8 @@ class WaiterTests: XCTestCase { struct StringOutput: AWSDecodableShape & Encodable { let s: String } - @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + @Sendable func operation(input: Input, logger: Logger) async throws -> StringOutput { + try await self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger) } let waiter = AWSClient.Waiter( acceptors: [ @@ -137,8 +158,8 @@ class WaiterTests: XCTestCase { struct EnumOutput: AWSDecodableShape & Encodable { let e: YesNo } - @Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop) + @Sendable func operation(input: Input, logger: Logger) async throws -> EnumOutput { + try await self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger) } let waiter = AWSClient.Waiter( acceptors: [ @@ -222,8 +243,8 @@ class WaiterTests: XCTestCase { defer { XCTAssertNoThrow(try awsServer.stop()) } let config = createServiceConfig(serviceProtocol: .restxml, endpoint: awsServer.address) - @Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture { - self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: logger, on: eventLoop) + @Sendable func arrayOperation(input: Input, logger: Logger) async throws -> ArrayOutput { + try await self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: logger) } let waiter = AWSClient.Waiter(