From 08d802e696a7c3a594dc0e4935279e014d9ff5f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filip=20Paw=C5=82owski?= Date: Fri, 20 Dec 2024 10:30:26 +0100 Subject: [PATCH] SNOW-1814745: Fix aborting requests in HTTP client. Normalization of responses unified. (#967) --- ci/container/hang_webserver.py | 25 ++-- lib/http/base.js | 28 ++-- lib/services/sf.js | 2 +- test/integration/testHttpClient.js | 122 ++++++++++++++++++ test/integration/testUtil.js | 24 +++- .../test_utils/httpInterceptorUtils.js | 2 +- 6 files changed, 177 insertions(+), 26 deletions(-) create mode 100644 test/integration/testHttpClient.js diff --git a/ci/container/hang_webserver.py b/ci/container/hang_webserver.py index a650e964d..ab25a0152 100755 --- a/ci/container/hang_webserver.py +++ b/ci/container/hang_webserver.py @@ -11,14 +11,17 @@ class HTTPRequestHandler(BaseHTTPRequestHandler): # counts specific calls to change behaviour after some calls counter = 0 - def __respond(self, http_code, content_type='text/plain', body=None, ): - if body: - self.send_response(http_code, body) + def __respond(self, http_code, content_type='text/plain', message=None, body=None, ): + if message: + self.send_response(http_code, message) else: self.send_response(http_code) self.send_header('Content-Type', content_type) self.end_headers() + if body is not None: + self.wfile.write(body.encode('utf-8')) + def do_POST(self): if self.path.startswith('/403'): self.__respond(403) @@ -26,21 +29,23 @@ def do_POST(self): self.__respond(404) elif self.path.startswith('/hang'): time.sleep(300) - self.__respond(200, body='OK') + self.__respond(200, message='OK') elif self.path.startswith('/503'): self.__respond(503) elif self.path.startswith('/xml'): - self.__respond(200, body='', content_type='application/xml') + self.__respond(200, message='', content_type='application/xml') + elif self.path.startswith('/json'): + self.__respond(200, message='OK', body='{"smkId": 32621973126123526, "data": {"test":"data"}}', content_type='application/json') elif self.path.startswith('/resetCounter'): HTTPRequestHandler.counter = 0 - self.__respond(200, body='OK') + self.__respond(200, message='OK') elif self.path.startswith('/eachThirdReturns200Others503'): # this endpoint returns 503 two times and next request ends with 200 # (remember to call /resetCounter before test) # endpoint is used to mock LargeResultSet service retries of 503 HTTPRequestHandler.counter += 12 if HTTPRequestHandler.counter % 3 == 0: - self.__respond(200, body='OK') + self.__respond(200, message='OK') else: self.__respond(503) elif self.path.startswith('/eachThirdReturns200OthersHang'): @@ -49,12 +54,12 @@ def do_POST(self): # endpoint is used to mock LargeResultSet service retries of timeouts HTTPRequestHandler.counter += 1 if HTTPRequestHandler.counter % 3 == 0: - self.__respond(200, body='OK') + self.__respond(200, message='OK') else: time.sleep(300) - self.__respond(200, body='OK') + self.__respond(200, message='OK') else: - self.__respond(200, body='OK') + self.__respond(200, message='OK') do_GET = do_POST class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): diff --git a/lib/http/base.js b/lib/http/base.js index a365f6e25..e91e1b262 100644 --- a/lib/http/base.js +++ b/lib/http/base.js @@ -35,12 +35,15 @@ function HttpClient(connectionConfig) { HttpClient.prototype.request = function (options) { Logger.getInstance().trace('Request%s - preparing for sending.', requestUtil.describeRequestFromOptions(options)); - let requestPromise; - const requestOptions = prepareRequestOptions.call(this, options); + const requestObject = {}; + const requestAbortController = new AbortController(); + const requestHandlers = { signal: requestAbortController.signal }; + const requestOptions = prepareRequestOptions.call(this, options, requestHandlers); + let sendRequest = async function sendRequest() { Logger.getInstance().trace('Request%s - sending.', requestUtil.describeRequestFromOptions(requestOptions)); const timer = new ExecutionTimer().start(); - requestPromise = axios.request(requestOptions).then(response => { + requestObject.requestPromise = axios.request(requestOptions).then(response => { const httpResponseTime = timer.getDuration(); Logger.getInstance().debug('Request%s - response received after %s milliseconds with status %s.', requestUtil.describeRequestFromOptions(requestOptions), httpResponseTime, response.status); sanitizeAxiosResponse(response); @@ -77,15 +80,15 @@ HttpClient.prototype.request = function (options) { // return an externalized request object that only contains // methods we're comfortable exposing to the outside world - return { - abort: function () { - if (requestPromise) { - Logger.getInstance().trace('Request%s - aborting.', requestUtil.describeRequestFromOptions(requestOptions)); - // TODO: This line won't work - promise has no method called abort - requestPromise.abort(); - } + requestObject.abort = function () { + if (requestAbortController) { + Logger.getInstance().trace('Request%s - aborting.', requestUtil.describeRequestFromOptions(requestOptions)); + requestAbortController.abort(); + Logger.getInstance().debug('Request%s - aborted.', requestUtil.describeRequestFromOptions(requestOptions)); } }; + + return requestObject; }; /** @@ -107,7 +110,7 @@ HttpClient.prototype.requestAsync = async function (options) { Logger.getInstance().debug('Request%s - response received after %s milliseconds with status %s.', requestUtil.describeRequestFromOptions(requestOptions), httpResponseTime, response.status); parseResponseData(response); sanitizeAxiosResponse(response); - return response; + return normalizeResponse(response); } catch (err) { const httpResponseTime = timer.getDuration(); Logger.getInstance().debug('Request%s - failed after %s milliseconds. Error will be re-raised.', requestUtil.describeRequestFromOptions(options), httpResponseTime); @@ -236,7 +239,7 @@ function sanitizeAxiosError(error) { } } -function prepareRequestOptions(options) { +function prepareRequestOptions(options, requestHandlers = {}) { Logger.getInstance().trace('Request%s - constructing options.', requestUtil.describeRequestFromOptions(options)); const headers = normalizeHeaders(options.headers) || {}; @@ -284,6 +287,7 @@ function prepareRequestOptions(options) { // we manually parse jsons or other structures from the server so they need to be text responseType: options.responseType || 'text', proxy: false, + ...requestHandlers }; const url = new URL(options.url); diff --git a/lib/services/sf.js b/lib/services/sf.js index 2e339c204..57f0924b0 100644 --- a/lib/services/sf.js +++ b/lib/services/sf.js @@ -661,7 +661,7 @@ function StateAbstract(options) { if (body.code === GSErrors.code.ID_TOKEN_INVALID && data.authnMethod === 'TOKEN') { Logger.getInstance().debug('ID Token being used has expired. Reauthenticating'); - const key = Util.buildCredentialCacheKey(connectionConfig.host, + const key = Util.buildCredentialCacheKey(connectionConfig.host, connectionConfig.username, AuthenticationTypes.ID_TOKEN_AUTHENTICATOR); await GlobalConfig.getCredentialManager().remove(key); await auth.reauthenticate(requestOptions.json); diff --git a/test/integration/testHttpClient.js b/test/integration/testHttpClient.js new file mode 100644 index 000000000..3c721d24a --- /dev/null +++ b/test/integration/testHttpClient.js @@ -0,0 +1,122 @@ +const connOptions = require('../integration/connectionOptions'); +const ConnectionConfig = require('../../lib/connection/connection_config'); +const NodeHttpClient = require('../../lib/http/node').NodeHttpClient; +const { hangWebServerUrl } = require('../hangWebserver'); +const assert = require('assert'); +const testUtil = require('./testUtil'); + +describe('HttpClient Tests', () => { + let httpClientInstance; + + const connectionOptions = { + ...(connOptions.valid), + timeout: 2000 + }; + + const httpConnectionOptions = new ConnectionConfig(connectionOptions, false, false, { + version: '1', + environment: process.versions, + }); + + beforeEach(() => { + httpClientInstance = new NodeHttpClient(httpConnectionOptions); + }); + + describe('Aborting requests', () => { + const urlReturningResponseAfterHanging = hangWebServerUrl + '/hang'; + + it('should allow aborting any request immediately', async () => { + let errorFromCallback; + + const requestObject = httpClientInstance.request({ + url: urlReturningResponseAfterHanging, + method: 'GET', + callback: (err) => { + // We expect an error due to aborting the request. + if (err) { + testUtil.isRequestCancelledError(err); + } else { + errorFromCallback = Error('Expected an error from aborted request, but got success.'); + } + } + }); + + // Abort the request immediately + requestObject.abort(); + + //Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place. + await testUtil.waitForCondition(() => requestObject.requestPromise); + await requestObject.requestPromise; + + assert.ok(!errorFromCallback, `Did not receive a normalized response. Error: ${errorFromCallback}`); + }); + + it('should allow aborting long-running request after some time', async () => { + let errorFromCallback; + const TIME_IN_MS_TO_WAIT_BEFORE_ABORT = 1500; + assert.ok(TIME_IN_MS_TO_WAIT_BEFORE_ABORT < connectionOptions.timeout, 'Test was not set up correctly. ' + + 'To test correctly the aborting functionality it should be triggered before timeout of the request itself'); + + const requestObject = httpClientInstance.request({ + url: urlReturningResponseAfterHanging, + method: 'GET', + callback: (err) => { + // We expect an error due to aborting the request. + if (err) { + testUtil.isRequestCancelledError(err); + } else { + errorFromCallback = Error('Expected an error from aborted request, but got success.'); + } + } + }); + + // Abort the request after some time + await testUtil.sleepAsync(TIME_IN_MS_TO_WAIT_BEFORE_ABORT); + requestObject.abort(); + + //Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place. + await testUtil.waitForCondition(() => requestObject.requestPromise); + await requestObject.requestPromise; + + assert.ok(!errorFromCallback, `Did not receive a normalized response. Error: ${errorFromCallback}`); + }); + }); + + describe('Normalizing Response', () => { + const urlReturningJsonBody = hangWebServerUrl + '/json'; + + it('should return a normalized response with statusCode and body for requestAsync', async () => { + const response = await httpClientInstance.requestAsync({ + url: urlReturningJsonBody, + method: 'GET' + }); + + assert.ok(response, 'Response should be defined'); + assert.ok(response.statusCode, 'Normalized response should have statusCode'); + assert.ok(response.body, 'Normalized response should have body'); + }); + + it('should return a normalized response with statusCode and body for synchronous request', async () => { + let errorRaisedInCallback; + + const requestObject = httpClientInstance.request({ + url: urlReturningJsonBody, + method: 'GET', + callback: (err, response) => { + try { + assert.ok(response, 'Response should be defined'); + assert.ok(response.statusCode, 'Normalized response should have statusCode'); + assert.ok(response.body, 'Normalized response should have body'); + } catch (assertionError) { + errorRaisedInCallback = assertionError; + } + } + }); + //Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place. + await testUtil.waitForCondition(() => requestObject.requestPromise); + await requestObject.requestPromise; + + assert.ok(!errorRaisedInCallback, `Did not receive a normalized response. Error: ${errorRaisedInCallback}`); + }); + }); +}); diff --git a/test/integration/testUtil.js b/test/integration/testUtil.js index 1f082d28c..332cdd505 100644 --- a/test/integration/testUtil.js +++ b/test/integration/testUtil.js @@ -349,10 +349,24 @@ module.exports.createRandomFileName = function ( option = { prefix: '', postfix: return fileName; }; -module.exports.sleepAsync = function (ms) { +const sleepAsync = function (ms) { return new Promise(resolve => setTimeout(resolve, ms)); }; +module.exports.sleepAsync = sleepAsync; + +module.exports.waitForCondition = async function (conditionCallable, { maxWaitTimeInMs = 20000, waitTimeBetweenChecksInMs = 1000 } = {}) { + let waitedTimeInMs = 0; + while (!conditionCallable()) { + await sleepAsync(waitTimeBetweenChecksInMs); + waitedTimeInMs += waitTimeBetweenChecksInMs; + + if (waitedTimeInMs > maxWaitTimeInMs) { + throw Error(`Condition was not met after max wait time = ${maxWaitTimeInMs}`); + } + } +}; + module.exports.assertConnectionActive = function (connection) { assert.ok(connection.isUp(), 'Connection expected to be active, but was inactive.'); }; @@ -372,4 +386,10 @@ module.exports.normalizeValue = normalizeValue; module.exports.isGuidInRequestOptions = function (requestOptions) { return requestOptions.url.includes('request_guid') || 'request_guid' in requestOptions.params; -}; \ No newline at end of file +}; + +module.exports.isRequestCancelledError = function (error) { + assert.equal(error.message, 'canceled', `Expected error message "canceled", but received ${error.message}`); + assert.equal(error.name, 'CanceledError', `Expected error name "CanceledError", but received ${error.name}`); + assert.equal(error.code, 'ERR_CANCELED', `Expected error code "ERR_CANCELED", but received ${error.code}`); +}; diff --git a/test/integration/test_utils/httpInterceptorUtils.js b/test/integration/test_utils/httpInterceptorUtils.js index 408896938..097fc99a6 100644 --- a/test/integration/test_utils/httpInterceptorUtils.js +++ b/test/integration/test_utils/httpInterceptorUtils.js @@ -82,7 +82,7 @@ function HttpClientWithInterceptors(connectionConfig, initialInterceptors) { Util.inherits(HttpClientWithInterceptors, NodeHttpClient); - +//To add new methods to be intercepted wrap them here with appropriate interceptors calls HttpClientWithInterceptors.prototype.requestAsync = async function (url, options) { this.interceptors.intercept('requestAsync', HOOK_TYPE.FOR_ARGS, url, options); const response = await NodeHttpClient.prototype.requestAsync.call(this, url, options);