Skip to content

Commit

Permalink
SNOW-1814745: Fix aborting requests in HTTP client. Normalization of …
Browse files Browse the repository at this point in the history
…responses unified. (#967)
  • Loading branch information
sfc-gh-fpawlowski authored Dec 20, 2024
1 parent 0aaa740 commit 08d802e
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 26 deletions.
25 changes: 15 additions & 10 deletions ci/container/hang_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,41 @@ class HTTPRequestHandler(BaseHTTPRequestHandler):
# counts specific calls to change behaviour after some calls
counter = 0

def __respond(self, http_code, content_type='text/plain', body=None, ):
if body:
self.send_response(http_code, body)
def __respond(self, http_code, content_type='text/plain', message=None, body=None, ):
if message:
self.send_response(http_code, message)
else:
self.send_response(http_code)
self.send_header('Content-Type', content_type)
self.end_headers()

if body is not None:
self.wfile.write(body.encode('utf-8'))

def do_POST(self):
if self.path.startswith('/403'):
self.__respond(403)
elif self.path.startswith('/404'):
self.__respond(404)
elif self.path.startswith('/hang'):
time.sleep(300)
self.__respond(200, body='OK')
self.__respond(200, message='OK')
elif self.path.startswith('/503'):
self.__respond(503)
elif self.path.startswith('/xml'):
self.__respond(200, body='<error/>', content_type='application/xml')
self.__respond(200, message='<error/>', content_type='application/xml')
elif self.path.startswith('/json'):
self.__respond(200, message='OK', body='{"smkId": 32621973126123526, "data": {"test":"data"}}', content_type='application/json')
elif self.path.startswith('/resetCounter'):
HTTPRequestHandler.counter = 0
self.__respond(200, body='OK')
self.__respond(200, message='OK')
elif self.path.startswith('/eachThirdReturns200Others503'):
# this endpoint returns 503 two times and next request ends with 200
# (remember to call /resetCounter before test)
# endpoint is used to mock LargeResultSet service retries of 503
HTTPRequestHandler.counter += 12
if HTTPRequestHandler.counter % 3 == 0:
self.__respond(200, body='OK')
self.__respond(200, message='OK')
else:
self.__respond(503)
elif self.path.startswith('/eachThirdReturns200OthersHang'):
Expand All @@ -49,12 +54,12 @@ def do_POST(self):
# endpoint is used to mock LargeResultSet service retries of timeouts
HTTPRequestHandler.counter += 1
if HTTPRequestHandler.counter % 3 == 0:
self.__respond(200, body='OK')
self.__respond(200, message='OK')
else:
time.sleep(300)
self.__respond(200, body='OK')
self.__respond(200, message='OK')
else:
self.__respond(200, body='OK')
self.__respond(200, message='OK')
do_GET = do_POST

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
Expand Down
28 changes: 16 additions & 12 deletions lib/http/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ function HttpClient(connectionConfig) {
HttpClient.prototype.request = function (options) {
Logger.getInstance().trace('Request%s - preparing for sending.', requestUtil.describeRequestFromOptions(options));

let requestPromise;
const requestOptions = prepareRequestOptions.call(this, options);
const requestObject = {};
const requestAbortController = new AbortController();
const requestHandlers = { signal: requestAbortController.signal };
const requestOptions = prepareRequestOptions.call(this, options, requestHandlers);

let sendRequest = async function sendRequest() {
Logger.getInstance().trace('Request%s - sending.', requestUtil.describeRequestFromOptions(requestOptions));
const timer = new ExecutionTimer().start();
requestPromise = axios.request(requestOptions).then(response => {
requestObject.requestPromise = axios.request(requestOptions).then(response => {
const httpResponseTime = timer.getDuration();
Logger.getInstance().debug('Request%s - response received after %s milliseconds with status %s.', requestUtil.describeRequestFromOptions(requestOptions), httpResponseTime, response.status);
sanitizeAxiosResponse(response);
Expand Down Expand Up @@ -77,15 +80,15 @@ HttpClient.prototype.request = function (options) {

// return an externalized request object that only contains
// methods we're comfortable exposing to the outside world
return {
abort: function () {
if (requestPromise) {
Logger.getInstance().trace('Request%s - aborting.', requestUtil.describeRequestFromOptions(requestOptions));
// TODO: This line won't work - promise has no method called abort
requestPromise.abort();
}
requestObject.abort = function () {
if (requestAbortController) {
Logger.getInstance().trace('Request%s - aborting.', requestUtil.describeRequestFromOptions(requestOptions));
requestAbortController.abort();
Logger.getInstance().debug('Request%s - aborted.', requestUtil.describeRequestFromOptions(requestOptions));
}
};

return requestObject;
};

/**
Expand All @@ -107,7 +110,7 @@ HttpClient.prototype.requestAsync = async function (options) {
Logger.getInstance().debug('Request%s - response received after %s milliseconds with status %s.', requestUtil.describeRequestFromOptions(requestOptions), httpResponseTime, response.status);
parseResponseData(response);
sanitizeAxiosResponse(response);
return response;
return normalizeResponse(response);
} catch (err) {
const httpResponseTime = timer.getDuration();
Logger.getInstance().debug('Request%s - failed after %s milliseconds. Error will be re-raised.', requestUtil.describeRequestFromOptions(options), httpResponseTime);
Expand Down Expand Up @@ -236,7 +239,7 @@ function sanitizeAxiosError(error) {
}
}

function prepareRequestOptions(options) {
function prepareRequestOptions(options, requestHandlers = {}) {
Logger.getInstance().trace('Request%s - constructing options.', requestUtil.describeRequestFromOptions(options));
const headers = normalizeHeaders(options.headers) || {};

Expand Down Expand Up @@ -284,6 +287,7 @@ function prepareRequestOptions(options) {
// we manually parse jsons or other structures from the server so they need to be text
responseType: options.responseType || 'text',
proxy: false,
...requestHandlers
};

const url = new URL(options.url);
Expand Down
2 changes: 1 addition & 1 deletion lib/services/sf.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ function StateAbstract(options) {

if (body.code === GSErrors.code.ID_TOKEN_INVALID && data.authnMethod === 'TOKEN') {
Logger.getInstance().debug('ID Token being used has expired. Reauthenticating');
const key = Util.buildCredentialCacheKey(connectionConfig.host,
const key = Util.buildCredentialCacheKey(connectionConfig.host,
connectionConfig.username, AuthenticationTypes.ID_TOKEN_AUTHENTICATOR);
await GlobalConfig.getCredentialManager().remove(key);
await auth.reauthenticate(requestOptions.json);
Expand Down
122 changes: 122 additions & 0 deletions test/integration/testHttpClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
const connOptions = require('../integration/connectionOptions');
const ConnectionConfig = require('../../lib/connection/connection_config');
const NodeHttpClient = require('../../lib/http/node').NodeHttpClient;
const { hangWebServerUrl } = require('../hangWebserver');
const assert = require('assert');
const testUtil = require('./testUtil');

describe('HttpClient Tests', () => {
let httpClientInstance;

const connectionOptions = {
...(connOptions.valid),
timeout: 2000
};

const httpConnectionOptions = new ConnectionConfig(connectionOptions, false, false, {
version: '1',
environment: process.versions,
});

beforeEach(() => {
httpClientInstance = new NodeHttpClient(httpConnectionOptions);
});

describe('Aborting requests', () => {
const urlReturningResponseAfterHanging = hangWebServerUrl + '/hang';

it('should allow aborting any request immediately', async () => {
let errorFromCallback;

const requestObject = httpClientInstance.request({
url: urlReturningResponseAfterHanging,
method: 'GET',
callback: (err) => {
// We expect an error due to aborting the request.
if (err) {
testUtil.isRequestCancelledError(err);
} else {
errorFromCallback = Error('Expected an error from aborted request, but got success.');
}
}
});

// Abort the request immediately
requestObject.abort();

//Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place.
await testUtil.waitForCondition(() => requestObject.requestPromise);
await requestObject.requestPromise;

assert.ok(!errorFromCallback, `Did not receive a normalized response. Error: ${errorFromCallback}`);
});

it('should allow aborting long-running request after some time', async () => {
let errorFromCallback;
const TIME_IN_MS_TO_WAIT_BEFORE_ABORT = 1500;
assert.ok(TIME_IN_MS_TO_WAIT_BEFORE_ABORT < connectionOptions.timeout, 'Test was not set up correctly. ' +
'To test correctly the aborting functionality it should be triggered before timeout of the request itself');

const requestObject = httpClientInstance.request({
url: urlReturningResponseAfterHanging,
method: 'GET',
callback: (err) => {
// We expect an error due to aborting the request.
if (err) {
testUtil.isRequestCancelledError(err);
} else {
errorFromCallback = Error('Expected an error from aborted request, but got success.');
}
}
});

// Abort the request after some time
await testUtil.sleepAsync(TIME_IN_MS_TO_WAIT_BEFORE_ABORT);
requestObject.abort();

//Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place.
await testUtil.waitForCondition(() => requestObject.requestPromise);
await requestObject.requestPromise;

assert.ok(!errorFromCallback, `Did not receive a normalized response. Error: ${errorFromCallback}`);
});
});

describe('Normalizing Response', () => {
const urlReturningJsonBody = hangWebServerUrl + '/json';

it('should return a normalized response with statusCode and body for requestAsync', async () => {
const response = await httpClientInstance.requestAsync({
url: urlReturningJsonBody,
method: 'GET'
});

assert.ok(response, 'Response should be defined');
assert.ok(response.statusCode, 'Normalized response should have statusCode');
assert.ok(response.body, 'Normalized response should have body');
});

it('should return a normalized response with statusCode and body for synchronous request', async () => {
let errorRaisedInCallback;

const requestObject = httpClientInstance.request({
url: urlReturningJsonBody,
method: 'GET',
callback: (err, response) => {
try {
assert.ok(response, 'Response should be defined');
assert.ok(response.statusCode, 'Normalized response should have statusCode');
assert.ok(response.body, 'Normalized response should have body');
} catch (assertionError) {
errorRaisedInCallback = assertionError;
}
}
});
//Due to usage of 'nextTick' in the httpClient requestPromise may be undefined for some time, only to be set when scheduled sending took place.
await testUtil.waitForCondition(() => requestObject.requestPromise);
await requestObject.requestPromise;

assert.ok(!errorRaisedInCallback, `Did not receive a normalized response. Error: ${errorRaisedInCallback}`);
});
});
});
24 changes: 22 additions & 2 deletions test/integration/testUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,24 @@ module.exports.createRandomFileName = function ( option = { prefix: '', postfix:
return fileName;
};

module.exports.sleepAsync = function (ms) {
const sleepAsync = function (ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};

module.exports.sleepAsync = sleepAsync;

module.exports.waitForCondition = async function (conditionCallable, { maxWaitTimeInMs = 20000, waitTimeBetweenChecksInMs = 1000 } = {}) {
let waitedTimeInMs = 0;
while (!conditionCallable()) {
await sleepAsync(waitTimeBetweenChecksInMs);
waitedTimeInMs += waitTimeBetweenChecksInMs;

if (waitedTimeInMs > maxWaitTimeInMs) {
throw Error(`Condition was not met after max wait time = ${maxWaitTimeInMs}`);
}
}
};

module.exports.assertConnectionActive = function (connection) {
assert.ok(connection.isUp(), 'Connection expected to be active, but was inactive.');
};
Expand All @@ -372,4 +386,10 @@ module.exports.normalizeValue = normalizeValue;

module.exports.isGuidInRequestOptions = function (requestOptions) {
return requestOptions.url.includes('request_guid') || 'request_guid' in requestOptions.params;
};
};

module.exports.isRequestCancelledError = function (error) {
assert.equal(error.message, 'canceled', `Expected error message "canceled", but received ${error.message}`);
assert.equal(error.name, 'CanceledError', `Expected error name "CanceledError", but received ${error.name}`);
assert.equal(error.code, 'ERR_CANCELED', `Expected error code "ERR_CANCELED", but received ${error.code}`);
};
2 changes: 1 addition & 1 deletion test/integration/test_utils/httpInterceptorUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ function HttpClientWithInterceptors(connectionConfig, initialInterceptors) {

Util.inherits(HttpClientWithInterceptors, NodeHttpClient);


//To add new methods to be intercepted wrap them here with appropriate interceptors calls
HttpClientWithInterceptors.prototype.requestAsync = async function (url, options) {
this.interceptors.intercept('requestAsync', HOOK_TYPE.FOR_ARGS, url, options);
const response = await NodeHttpClient.prototype.requestAsync.call(this, url, options);
Expand Down

0 comments on commit 08d802e

Please sign in to comment.