diff --git a/package-lock.json b/package-lock.json index a9bf55a..c887fd6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1413,12 +1413,6 @@ "resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz", "integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=" }, - "assertion-error": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", - "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==", - "dev": true - }, "assign-symbols": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/assign-symbols/-/assign-symbols-1.0.0.tgz", @@ -1697,20 +1691,6 @@ "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", "integrity": "sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw=" }, - "chai": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/chai/-/chai-4.1.2.tgz", - "integrity": "sha1-D2RYS6ZC8PKs4oBiefTwbKI61zw=", - "dev": true, - "requires": { - "assertion-error": "^1.0.1", - "check-error": "^1.0.1", - "deep-eql": "^3.0.0", - "get-func-name": "^2.0.0", - "pathval": "^1.0.0", - "type-detect": "^4.0.0" - } - }, "chalk": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-2.4.2.tgz", @@ -1728,12 +1708,6 @@ "integrity": "sha512-mT8iDcrh03qDGRRmoA2hmBJnxpllMR+0/0qlzjqZES6NdiWDcZkCNAk4rPFZ9Q85r27unkiNNg8ZOiwZXBHwcA==", "dev": true }, - "check-error": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", - "integrity": "sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=", - "dev": true - }, "ci-info": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ci-info/-/ci-info-2.0.0.tgz", @@ -2057,21 +2031,6 @@ "integrity": "sha1-6zkTMzRYd1y4TNGh+uBiEGu4dUU=", "dev": true }, - "deep-eql": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", - "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==", - "dev": true, - "requires": { - "type-detect": "^4.0.0" - } - }, - "deep-equal": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/deep-equal/-/deep-equal-1.0.1.tgz", - "integrity": "sha1-9dJgKStmDghO/0zbyfCK0yR0SLU=", - "dev": true - }, "deep-is": { "version": "0.1.3", "resolved": "https://registry.npmjs.org/deep-is/-/deep-is-0.1.3.tgz", @@ -3299,12 +3258,6 @@ "integrity": "sha512-3t6rVToeoZfYSGd8YoLFR2DJkiQrIiUrGcjvFX2mDw3bn6k2OtwHN0TNCLbBO+w8qTvimhDkv+LSscbJY1vE6w==", "dev": true }, - "get-func-name": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.0.tgz", - "integrity": "sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=", - "dev": true - }, "get-stream": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-4.1.0.tgz", @@ -3370,9 +3323,9 @@ "dev": true }, "handlebars": { - "version": "4.4.5", - "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.4.5.tgz", - "integrity": "sha512-0Ce31oWVB7YidkaTq33ZxEbN+UDxMMgThvCe8ptgQViymL5DPis9uLdTA13MiRPhgvqyxIegugrP97iK3JeBHg==", + "version": "4.5.3", + "resolved": "https://registry.npmjs.org/handlebars/-/handlebars-4.5.3.tgz", + "integrity": "sha512-3yPecJoJHK/4c6aZhSvxOyG4vJKDshV36VHp0iVCDVh7o9w2vwi3NSnL2MMPj3YdduqaBcu7cGbggJQM0br9xA==", "dev": true, "requires": { "neo-async": "^2.6.0", @@ -4903,46 +4856,6 @@ "integrity": "sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==", "dev": true }, - "nock": { - "version": "9.2.3", - "resolved": "https://registry.npmjs.org/nock/-/nock-9.2.3.tgz", - "integrity": "sha512-4XYNSJDJ/PvNoH+cCRWcGOOFsq3jtZdNTRIlPIBA7CopGWJO56m5OaPEjjJ3WddxNYfe5HL9sQQAtMt8oyR9AA==", - "dev": true, - "requires": { - "chai": "^4.1.2", - "debug": "^3.1.0", - "deep-equal": "^1.0.0", - "json-stringify-safe": "^5.0.1", - "lodash": "^4.17.5", - "mkdirp": "^0.5.0", - "propagate": "^1.0.0", - "qs": "^6.5.1", - "semver": "^5.5.0" - }, - "dependencies": { - "debug": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-3.1.0.tgz", - "integrity": "sha1-W7WgZyYotkFJVmuhaBnmFRjGcmE=", - "dev": true, - "requires": { - "ms": "2.0.0" - } - }, - "lodash": { - "version": "4.17.15", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", - "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==", - "dev": true - }, - "semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", - "dev": true - } - } - }, "node-cache": { "version": "4.2.1", "resolved": "https://registry.npmjs.org/node-cache/-/node-cache-4.2.1.tgz", @@ -5336,12 +5249,6 @@ "pify": "^3.0.0" } }, - "pathval": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.0.tgz", - "integrity": "sha1-uULm1L3mUwBe9rcTYd74cn0GReA=", - "dev": true - }, "performance-now": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/performance-now/-/performance-now-2.1.0.tgz", @@ -5438,12 +5345,6 @@ "sisteransi": "^1.0.0" } }, - "propagate": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/propagate/-/propagate-1.0.0.tgz", - "integrity": "sha1-AMLa7t2iDofjeCs0Stuhzd1q1wk=", - "dev": true - }, "pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", @@ -5469,12 +5370,6 @@ "resolved": "https://registry.npmjs.org/punycode/-/punycode-1.4.1.tgz", "integrity": "sha1-wNWmOycYgArY4esPpSachN1BhF4=" }, - "qs": { - "version": "6.5.1", - "resolved": "https://registry.npmjs.org/qs/-/qs-6.5.1.tgz", - "integrity": "sha512-eRzhrN1WSINYCDCbrz796z37LOe3m5tmW7RQf6oBntukAG1nmovJvhnwHHRMAfeoItc1m2Hk02WER2aQ/iqs+A==", - "dev": true - }, "react-is": { "version": "16.8.6", "resolved": "https://registry.npmjs.org/react-is/-/react-is-16.8.6.tgz", @@ -6498,12 +6393,6 @@ "prelude-ls": "~1.1.2" } }, - "type-detect": { - "version": "4.0.8", - "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", - "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", - "dev": true - }, "typescript": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.0.1.tgz", @@ -6511,9 +6400,9 @@ "dev": true }, "uglify-js": { - "version": "3.6.3", - "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.6.3.tgz", - "integrity": "sha512-KfQUgOqTkLp2aZxrMbCuKCDGW9slFYu2A23A36Gs7sGzTLcRBDORdOi5E21KWHFIfkY8kzgi/Pr1cXCh0yIp5g==", + "version": "3.6.9", + "resolved": "https://registry.npmjs.org/uglify-js/-/uglify-js-3.6.9.tgz", + "integrity": "sha512-pcnnhaoG6RtrvHJ1dFncAe8Od6Nuy30oaJ82ts6//sGSXOP5UjBMEthiProjXmMNHOfd93sqlkztifFMcb+4yw==", "dev": true, "optional": true, "requires": { diff --git a/package.json b/package.json index 5d86bcc..f5270cc 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,6 @@ "eslint-formatter-pretty": "2.1.1", "jest": "24.7.1", "jest-junit": "6.3.0", - "nock": "9.2.3", "selfsigned": "1.10.4", "tmp": "0.0.33", "typescript": "3.0.1" diff --git a/test/LDClient-end-to-end-test.js b/test/LDClient-end-to-end-test.js new file mode 100644 index 0000000..155a86e --- /dev/null +++ b/test/LDClient-end-to-end-test.js @@ -0,0 +1,116 @@ +const LDClient = require('../index.js'); +import { AsyncQueue, sleepAsync, withCloseable } from './async_utils'; +import { createServer, respond, respondJson, respondSSE } from './http_server'; +import { stubLogger } from './stubs'; + +async function withAllServers(asyncCallback) { + return await withCloseable(createServer, async pollingServer => + withCloseable(createServer, async streamingServer => + withCloseable(createServer, async eventsServer => { + const servers = { polling: pollingServer, streaming: streamingServer, events: eventsServer }; + const baseConfig = { + baseUri: pollingServer.url, + streamUri: streamingServer.url, + eventsUri: eventsServer.url, + logger: stubLogger() + }; + return await asyncCallback(servers, baseConfig); + }) + ) + ); +} + +describe('LDClient end-to-end', () => { + const sdkKey = 'sdkKey'; + const flagKey = 'flagKey'; + const expectedFlagValue = 'yes'; + const flag = { + key: flagKey, + version: 1, + on: false, + offVariation: 0, + variations: [ expectedFlagValue, 'no' ] + }; + const allData = { flags: { flagKey: flag }, segments: {} }; + + const user = { key: 'userKey' }; + + it('starts in polling mode', async () => { + await withAllServers(async (servers, config) => { + servers.polling.forMethodAndPath('get', '/sdk/latest-all', respondJson(allData)); + servers.events.forMethodAndPath('post', '/bulk', respond(200)); + + config.stream = false; + await withCloseable(LDClient.init(sdkKey, config), async client => { + await client.waitForInitialization(); + expect(client.initialized()).toBe(true); + + const value = await client.variation(flag.key, user); + expect(value).toEqual(expectedFlagValue); + + await client.flush(); + }); + + expect(servers.polling.requestCount()).toEqual(1); + expect(servers.streaming.requestCount()).toEqual(0); + expect(servers.events.requestCount()).toEqual(1); + }); + }); + + it('fails in polling mode with 401 error', async () => { + await withAllServers(async (servers, config) => { + servers.polling.forMethodAndPath('get', '/sdk/latest-all', respond(401)); + servers.events.forMethodAndPath('post', '/bulk', respond(200)); + + config.stream = false; + + await withCloseable(LDClient.init(sdkKey, config), async client => { + await expect(client.waitForInitialization()).rejects.toThrow(); + expect(client.initialized()).toBe(false); + }); + + expect(servers.polling.requestCount()).toEqual(1); + expect(servers.streaming.requestCount()).toEqual(0); + }); + }); + + it('starts in streaming mode', async () => { + await withAllServers(async (servers, config) => { + const streamEvent = { type: 'put', data: { data: allData } }; + await withCloseable(AsyncQueue(), async events => { + events.add(streamEvent); + servers.streaming.forMethodAndPath('get', '/all', respondSSE(events)); + servers.events.forMethodAndPath('post', '/bulk', respond(200)); + + await withCloseable(LDClient.init(sdkKey, config), async client => { + await client.waitForInitialization(); + expect(client.initialized()).toBe(true); + + const value = await client.variation(flag.key, user); + expect(value).toEqual(expectedFlagValue); + + await client.flush(); + }); + + expect(servers.polling.requestCount()).toEqual(0); + expect(servers.streaming.requestCount()).toEqual(1); + expect(servers.events.requestCount()).toEqual(1); + }); + }); + }); + + it('fails in streaming mode with 401 error', async () => { + await withAllServers(async (servers, config) => { + servers.streaming.forMethodAndPath('get', '/all', respond(401)); + servers.events.forMethodAndPath('post', '/bulk', respond(200)); + + await withCloseable(LDClient.init(sdkKey, config), async client => { + await expect(client.waitForInitialization()).rejects.toThrow(); + expect(client.initialized()).toBe(false); + }); + + expect(servers.polling.requestCount()).toEqual(0); + expect(servers.streaming.requestCount()).toEqual(1); + }); + }); +}); diff --git a/test/LDClient-tls-test.js b/test/LDClient-tls-test.js index 5b24dc6..9df942b 100644 --- a/test/LDClient-tls-test.js +++ b/test/LDClient-tls-test.js @@ -1,25 +1,23 @@ import * as selfsigned from 'selfsigned'; import * as LDClient from '../index'; -import { sleepAsync } from './async_utils'; -import * as httpServer from './http_server'; +import { AsyncQueue, sleepAsync, withCloseable } from './async_utils'; +import { createServer, respond, respondJson, respondSSE } from './http_server'; import * as stubs from './stubs'; describe('LDClient TLS configuration', () => { const sdkKey = 'secret'; let logger = stubs.stubLogger(); - let server; let certData; - beforeEach(async () => { + beforeAll(async () => { certData = await makeSelfSignedPems(); - const serverOptions = { key: certData.private, cert: certData.cert, ca: certData.public }; - server = await httpServer.createServer(true, serverOptions); }); - afterEach(() => { - httpServer.closeServers(); - }); + async function createSecureServer() { + const serverOptions = { key: certData.private, cert: certData.cert, ca: certData.public }; + return await createServer(true, serverOptions); + } async function makeSelfSignedPems() { const certAttrs = [{ name: 'commonName', value: 'localhost' }]; @@ -36,90 +34,91 @@ describe('LDClient TLS configuration', () => { } it('can connect via HTTPS to a server with a self-signed certificate, if CA is specified', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, {})); - const config = { - baseUri: server.url, - sendEvents: false, - stream: false, - logger: stubs.stubLogger(), - tlsParams: { ca: certData.cert }, - }; - const client = LDClient.init(sdkKey, config); - await client.waitForInitialization(); - client.close(); + await withCloseable(createSecureServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-all', respondJson({})); + + const config = { + baseUri: server.url, + sendEvents: false, + stream: false, + logger: stubs.stubLogger(), + tlsParams: { ca: certData.cert }, + }; + + await withCloseable(LDClient.init(sdkKey, config), async client => { + await client.waitForInitialization(); + }); + }); }); it('cannot connect via HTTPS to a server with a self-signed certificate, using default config', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, {})); - const config = { - baseUri: server.url, - sendEvents: false, - stream: false, - logger: stubs.stubLogger(), - }; - const client = LDClient.init(sdkKey, config); - await sleepAsync(300); // the client won't signal an unrecoverable error, but it should log a message - expect(config.logger.warn.mock.calls.length).toEqual(2); - expect(config.logger.warn.mock.calls[1][0]).toMatch(/self signed/); + await withCloseable(createSecureServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-all', respondJson({})); + + const config = { + baseUri: server.url, + sendEvents: false, + stream: false, + logger: stubs.stubLogger(), + }; + + await withCloseable(LDClient.init(sdkKey, config), async client => { + await sleepAsync(300); // the client won't signal an unrecoverable error, but it should log a message + expect(config.logger.warn.mock.calls.length).toEqual(2); + expect(config.logger.warn.mock.calls[1][0]).toMatch(/self signed/); + }); + }); }); it('can use custom TLS options for streaming as well as polling', async () => { - const eventData = { data: { flags: { flag: { version: 1 } }, segments: {} } }; - server.on('request', (req, res) => { - if (req.url.match(/\/stream/)) { - httpServer.respondSSEEvent(res, 'put', eventData); - } else { - httpServer.respondJson(res, {}); - } - }); + await withCloseable(createSecureServer, async server => { + const eventData = { data: { flags: { flag: { version: 1 } }, segments: {} } }; + await withCloseable(AsyncQueue(), async events => { + events.add({ type: 'put', data: eventData }); + server.forMethodAndPath('get', '/stream/all', respondSSE(events)); - const config = { - baseUri: server.url, - streamUri: server.url + '/stream', - sendEvents: false, - logger: logger, - tlsParams: { ca: certData.cert }, - }; + const config = { + baseUri: server.url, + streamUri: server.url + '/stream', + sendEvents: false, + logger: logger, + tlsParams: { ca: certData.cert }, + }; - const client = LDClient.init(sdkKey, config); - await client.waitForInitialization(); // this won't return until the stream receives the "put" event - client.close(); + await withCloseable(LDClient.init(sdkKey, config), async client => { + await client.waitForInitialization(); // this won't return until the stream receives the "put" event + }); + }); + }); }); it('can use custom TLS options for posting events', async () => { - let receivedEventFn; - const receivedEvent = new Promise(resolve => { - receivedEventFn = resolve; - }); + await withCloseable(createSecureServer, async server => { + server.forMethodAndPath('post', '/events/bulk', respond(200)); + server.forMethodAndPath('get', '/sdk/latest-all', respondJson({})); - server.on('request', (req, res) => { - if (req.url.match(/\/events/)) { - httpServer.readAll(req).then(body => { - receivedEventFn(body); - httpServer.respond(res, 200); - }); - } else { - httpServer.respondJson(res, {}); - } - }); + const config = { + baseUri: server.url, + eventsUri: server.url + '/events', + stream: false, + logger: stubs.stubLogger(), + tlsParams: { ca: certData.cert }, + }; - const config = { - baseUri: server.url, - eventsUri: server.url + '/events', - stream: false, - logger: stubs.stubLogger(), - tlsParams: { ca: certData.cert }, - }; + await withCloseable(LDClient.init(sdkKey, config), async client => { + await client.waitForInitialization(); + client.identify({ key: 'user' }); + await client.flush(); - const client = LDClient.init(sdkKey, config); - await client.waitForInitialization(); - client.identify({ key: 'user' }); - await client.flush(); - - const receivedEventBody = await receivedEvent; - const eventData = JSON.parse(receivedEventBody); - expect(eventData.length).toEqual(1); - expect(eventData[0].kind).toEqual('identify'); - client.close(); + const flagsRequest = await server.nextRequest(); + expect(flagsRequest.path).toEqual('/sdk/latest-all'); + + const eventsRequest = await server.nextRequest(); + expect(eventsRequest.path).toEqual('/events/bulk'); + const eventData = JSON.parse(eventsRequest.body); + expect(eventData.length).toEqual(1); + expect(eventData[0].kind).toEqual('identify'); + }); + }); }); }); diff --git a/test/async_utils.js b/test/async_utils.js index f748172..25c1161 100644 --- a/test/async_utils.js +++ b/test/async_utils.js @@ -1,53 +1,100 @@ +// Converts a function that takes a Node-style callback (err, result) as its last argument into a +// function that returns a Promise. This is equivalent to util.promisify, but is reimplemented here +// because promisify isn't supported in Node 6. +// Usage: await promisify(doSomething)(allParamsExceptCallback) +function promisify(f) { + return (...args) => + new Promise((resolve, reject) => + f(...args, (err, result) => err ? reject(err) : resolve(result))); +} + // Converts a function that takes a single-parameter callback (like most SDK methods) into a Promise. -// Usage: asyncify(callback => doSomething(params, callback)) +// This is different from util.promisify, which uses Node-style callbacks with two parameters. +// Usage: await asyncify(callback => doSomething(params, callback)) function asyncify(f) { return new Promise(resolve => f(resolve)); } -// Converts a function that takes a Node-style callback (err, result) into a Promise. -// Usage: asyncifyNode(callback => doSomething(params, callback)) -function asyncifyNode(f) { - return new Promise((resolve, reject) => f((err, result) => err ? reject(err) : resolve(result))); -} - +// Usage: await sleepAsync(5000) function sleepAsync(millis) { return new Promise(resolve => { setTimeout(resolve, millis); }); } +// Calls the entity's close() method after passing the entity to the callback. Usages: +// await withCloseable(myExistingObject, async o => doSomething(o)); +// await withCloseable(() => makeNewObject(), async o => doSomething(o)); +// await withCloseable(async () => await makeObjectAsync(), async o => doSomething(o)); +function withCloseable(entityOrCreateFn, asyncCallback) { + // Using Promise.resolve allows promises and simple values to be treated as promises + return Promise.resolve(typeof entityOrCreateFn === 'function' ? entityOrCreateFn() : entityOrCreateFn) + .then(entity => + asyncCallback(entity) + .then(result => { + entity.close(); + return result; + }) + .catch(err => { + entity.close(); + return Promise.reject(err); + }) + // Note that we can't use Promise.finally() because it's not supported in Node 6. + ); +} + +// Promise-based blocking queue. function AsyncQueue() { const items = []; const awaiters = []; + let closed = false; + const closedError = () => new Error("queue was closed"); return { + // Adds an item. add: item => { if (awaiters.length) { - awaiters.shift()(item); + awaiters.shift().resolve(item); } else { items.push(item); } }, + // Blocks for the next item (async). Throws an exception if there are no more items and the queue has + // been closed. take: () => { if (items.length) { return Promise.resolve(items.shift()); } - return new Promise(resolve => { - awaiters.push(resolve); + if (closed) { + return Promise.reject(closedError()); + } + return new Promise((resolve, reject) => { + awaiters.push({ resolve, reject }); }); }, isEmpty: () => { return items.length === 0; + }, + + length: () => items.length, + + // Allows any remaining items to be consumed, but causes take() to throw an exception after that. + close: () => { + while (awaiters.length > 0) { + awaiters.shift().reject(closedError()); + } + closed = true; } }; } module.exports = { - asyncify: asyncify, - asyncifyNode: asyncifyNode, - sleepAsync: sleepAsync, - AsyncQueue: AsyncQueue + asyncify, + promisify, + sleepAsync, + withCloseable, + AsyncQueue, }; diff --git a/test/event_processor-test.js b/test/event_processor-test.js index 4978ccc..fee6a90 100644 --- a/test/event_processor-test.js +++ b/test/event_processor-test.js @@ -1,9 +1,9 @@ -const nock = require('nock'); const EventProcessor = require('../event_processor'); +const { createServer, respond } = require('./http_server'); +const { sleepAsync, withCloseable } = require('./async_utils'); describe('EventProcessor', () => { - let ep; const eventsUri = 'http://example.com'; const sdkKey = 'SDK_KEY'; const defaultConfig = { @@ -24,31 +24,20 @@ describe('EventProcessor', () => { const stringifiedNumericUser = { key: '1', secondary: '2', ip: '3', country: '4', email: '5', firstName: '6', lastName: '7', avatar: '8', name: '9', anonymous: false, custom: { age: 99 } }; - afterEach(() => { - if (ep) { + function eventsServerTest(asyncCallback) { + return async () => withCloseable(createServer, async server => { + server.forMethodAndPath('post', '/bulk', respond(200)); + return await asyncCallback(server); + }); + } + + async function withEventProcessor(config, server, asyncCallback) { + const ep = EventProcessor(sdkKey, Object.assign({}, config, { eventsUri: server.url })); + try { + return await asyncCallback(ep); + } finally { ep.close(); } - nock.cleanAll(); - }); - - function flushAndGetRequest(options, cb) { - const callback = cb || options; - options = cb ? options : {}; - let requestBody; - let requestHeaders; - nock(eventsUri).post('/bulk') - .reply(function(uri, body) { - requestBody = body; - requestHeaders = this.req.headers; - return [ options.status || 200, '', options.headers || {} ]; - }); - ep.flush().then( - () => { - callback(requestBody, requestHeaders); - }, - error => { - callback(requestBody, requestHeaders, error); - }); } function headersWithDate(timestamp) { @@ -94,295 +83,302 @@ describe('EventProcessor', () => { expect(e.kind).toEqual('summary'); } - it('queues identify event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'identify', creationDate: 1000, user: user }; - ep.sendEvent(e); + async function getJsonRequest(server) { + return JSON.parse((await server.nextRequest()).body); + } + + it('queues identify event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'identify', creationDate: 1000, user: user }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output).toEqual([{ kind: 'identify', creationDate: 1000, key: user.key, user: user }]); - done(); }); - }); + })); - it('filters user in identify event', done => { + it('filters user in identify event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { allAttributesPrivate: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'identify', creationDate: 1000, user: user }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'identify', creationDate: 1000, user: user }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output).toEqual([{ kind: 'identify', creationDate: 1000, key: user.key, user: filteredUser }]); - done(); }); - }); + })); - it('stringifies user attributes in identify event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'identify', creationDate: 1000, user: numericUser }; - ep.sendEvent(e); + it('stringifies user attributes in identify event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'identify', creationDate: 1000, user: numericUser }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output).toEqual([{ kind: 'identify', creationDate: 1000, key: stringifiedNumericUser.key, user: stringifiedNumericUser }]); - done(); }); - }); + })); - it('queues individual feature event with index event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + it('queues individual feature event with index event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(3); checkIndexEvent(output[0], e, user); checkFeatureEvent(output[1], e, false); checkSummaryEvent(output[2]); - done(); }); - }); + })); - it('filters user in index event', done => { + it('filters user in index event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { allAttributesPrivate: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(3); checkIndexEvent(output[0], e, filteredUser); checkFeatureEvent(output[1], e, false); checkSummaryEvent(output[2]); - done(); }); - }); + })); - it('stringifies user attributes in index event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'feature', creationDate: 1000, user: numericUser, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + it('stringifies user attributes in index event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: numericUser, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(3); checkIndexEvent(output[0], e, stringifiedNumericUser); checkFeatureEvent(output[1], e, false); checkSummaryEvent(output[2]); - done(); }); - }); + })); - it('can include inline user in feature event', done => { + it('can include inline user in feature event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkFeatureEvent(output[0], e, false, user); checkSummaryEvent(output[1]); - done(); }); - }); + })); - it('filters user in feature event', done => { + it('filters user in feature event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { allAttributesPrivate: true, inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkFeatureEvent(output[0], e, false, filteredUser); checkSummaryEvent(output[1]); - done(); }); - }); + })); - it('stringifies user attributes in feature event', done => { + it('stringifies user attributes in feature event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: numericUser, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: numericUser, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkFeatureEvent(output[0], e, false, stringifiedNumericUser); checkSummaryEvent(output[1]); - done(); }); - }); + })); - it('can include reason in feature event', done => { + it('can include reason in feature event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true, - reason: { kind: 'FALLTHROUGH' } }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true, + reason: { kind: 'FALLTHROUGH' } }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkFeatureEvent(output[0], e, false, user); checkSummaryEvent(output[1]); - done(); }); - }); + })); - it('still generates index event if inlineUsers is true but feature event is not tracked', done => { + it('still generates index event if inlineUsers is true but feature event is not tracked', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: false }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: false }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkIndexEvent(output[0], e, user); checkSummaryEvent(output[1]); - done(); }); - }); + })); - it('sets event kind to debug if event is temporarily in debug mode', done => { - ep = EventProcessor(sdkKey, defaultConfig); - var futureTime = new Date().getTime() + 1000000; - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: false, debugEventsUntilDate: futureTime }; - ep.sendEvent(e); + it('sets event kind to debug if event is temporarily in debug mode', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + var futureTime = new Date().getTime() + 1000000; + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: false, debugEventsUntilDate: futureTime }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(3); checkIndexEvent(output[0], e, user); checkFeatureEvent(output[1], e, true, user); checkSummaryEvent(output[2]); - done(); }); - }); + })); - it('can both track and debug an event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - var futureTime = new Date().getTime() + 1000000; - const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', - version: 11, variation: 1, value: 'value', trackEvents: true, debugEventsUntilDate: futureTime }; - ep.sendEvent(e); + it('can both track and debug an event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const futureTime = new Date().getTime() + 1000000; + const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', + version: 11, variation: 1, value: 'value', trackEvents: true, debugEventsUntilDate: futureTime }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(4); checkIndexEvent(output[0], e, user); checkFeatureEvent(output[1], e, false); checkFeatureEvent(output[2], e, true, user); checkSummaryEvent(output[3]); - done(); }); - }); + })); - it('expires debug mode based on client time if client time is later than server time', done => { - ep = EventProcessor(sdkKey, defaultConfig); + it('expires debug mode based on client time if client time is later than server time', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + // Pick a server time that is somewhat behind the client time + const serverTime = new Date().getTime() - 20000; + s.forMethodAndPath('post', '/bulk', respond(200, headersWithDate(serverTime))); - // Pick a server time that is somewhat behind the client time - var serverTime = new Date().getTime() - 20000; + // Send and flush an event we don't care about, just to set the last server time + ep.sendEvent({ kind: 'identify', user: { key: 'otherUser' } }); + await ep.flush(); + await s.nextRequest(); - // Send and flush an event we don't care about, just to set the last server time - ep.sendEvent({ kind: 'identify', user: { key: 'otherUser' } }); - flushAndGetRequest({ status: 200, headers: headersWithDate(serverTime) }, function() { // Now send an event with debug mode on, with a "debug until" time that is further in // the future than the server time, but in the past compared to the client. - var debugUntil = serverTime + 1000; + const debugUntil = serverTime + 1000; const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', version: 11, variation: 1, value: 'value', trackEvents: false, debugEventsUntilDate: debugUntil }; ep.sendEvent(e); + await ep.flush(); // Should get a summary event only, not a full feature event - flushAndGetRequest(output => { - expect(output.length).toEqual(2); - checkIndexEvent(output[0], e, user); - checkSummaryEvent(output[1]); - done(); - }); + const output = await getJsonRequest(s); + expect(output.length).toEqual(2); + checkIndexEvent(output[0], e, user); + checkSummaryEvent(output[1]); }); - }); + })); - it('expires debug mode based on server time if server time is later than client time', done => { - ep = EventProcessor(sdkKey, defaultConfig); + it('expires debug mode based on server time if server time is later than client time', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + // Pick a server time that is somewhat ahead of the client time + const serverTime = new Date().getTime() + 20000; + s.forMethodAndPath('post', '/bulk', respond(200, headersWithDate(serverTime))); - // Pick a server time that is somewhat ahead of the client time - var serverTime = new Date().getTime() + 20000; + // Send and flush an event we don't care about, just to set the last server time + ep.sendEvent({ kind: 'identify', user: { key: 'otherUser' } }); + await ep.flush(); + await s.nextRequest(); - // Send and flush an event we don't care about, just to set the last server time - ep.sendEvent({ kind: 'identify', user: { key: 'otherUser' } }); - flushAndGetRequest({ status: 200, headers: headersWithDate(serverTime) }, function() { // Now send an event with debug mode on, with a "debug until" time that is further in // the future than the client time, but in the past compared to the server. - var debugUntil = serverTime - 1000; + const debugUntil = serverTime - 1000; const e = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey', version: 11, variation: 1, value: 'value', trackEvents: false, debugEventsUntilDate: debugUntil }; ep.sendEvent(e); + await ep.flush(); // Should get a summary event only, not a full feature event - flushAndGetRequest(output => { - expect(output.length).toEqual(2); - checkIndexEvent(output[0], e, user); - checkSummaryEvent(output[1]); - done(); - }); + const output = await getJsonRequest(s); + expect(output.length).toEqual(2); + checkIndexEvent(output[0], e, user); + checkSummaryEvent(output[1]); }); - }); - - it('generates only one index event from two feature events for same user', done => { - ep = EventProcessor(sdkKey, defaultConfig); - var e1 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey1', - version: 11, variation: 1, value: 'value', trackEvents: true }; - var e2 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey2', - version: 11, variation: 1, value: 'value', trackEvents: true }; - ep.sendEvent(e1); - ep.sendEvent(e2); - - flushAndGetRequest(output => { + })); + + it('generates only one index event from two feature events for same user', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e1 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey1', + version: 11, variation: 1, value: 'value', trackEvents: true }; + const e2 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey2', + version: 11, variation: 1, value: 'value', trackEvents: true }; + ep.sendEvent(e1); + ep.sendEvent(e2); + await ep.flush(); + + const output = await getJsonRequest(s); expect(output.length).toEqual(4); checkIndexEvent(output[0], e1, user); checkFeatureEvent(output[1], e1, false); checkFeatureEvent(output[2], e2, false); checkSummaryEvent(output[3]); - done(); }); - }); - - it('summarizes nontracked events', done => { - ep = EventProcessor(sdkKey, defaultConfig); - var e1 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey1', - version: 11, variation: 1, value: 'value1', default: 'default1', trackEvents: false }; - var e2 = { kind: 'feature', creationDate: 2000, user: user, key: 'flagkey2', - version: 22, variation: 1, value: 'value2', default: 'default2', trackEvents: false }; - ep.sendEvent(e1); - ep.sendEvent(e2); - - flushAndGetRequest(output => { + })); + + it('summarizes nontracked events', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e1 = { kind: 'feature', creationDate: 1000, user: user, key: 'flagkey1', + version: 11, variation: 1, value: 'value1', default: 'default1', trackEvents: false }; + const e2 = { kind: 'feature', creationDate: 2000, user: user, key: 'flagkey2', + version: 22, variation: 1, value: 'value2', default: 'default2', trackEvents: false }; + ep.sendEvent(e1); + ep.sendEvent(e2); + await ep.flush(); + + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkIndexEvent(output[0], e1, user); - var se = output[1]; + const se = output[1]; checkSummaryEvent(se); expect(se.startDate).toEqual(1000); expect(se.endDate).toEqual(2000); @@ -396,187 +392,161 @@ describe('EventProcessor', () => { counters: [ { version: 22, variation: 1, value: 'value2', count: 1 } ] } }); - done(); }); - }); + })); - it('queues custom event with user', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', - data: { thing: 'stuff' } }; - ep.sendEvent(e); + it('queues custom event with user', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', + data: { thing: 'stuff' } }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkIndexEvent(output[0], e, user); checkCustomEvent(output[1], e); - done(); }); - }); + })); - it('can include metric value in custom event', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', - data: { thing: 'stuff' }, metricValue: 1.5 }; - ep.sendEvent(e); + it('can include metric value in custom event', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', + data: { thing: 'stuff' }, metricValue: 1.5 }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(2); checkIndexEvent(output[0], e, user); checkCustomEvent(output[1], e); - done(); }); - }); + })); - it('can include inline user in custom event', done => { + it('can include inline user in custom event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', - data: { thing: 'stuff' } }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', + data: { thing: 'stuff' } }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(1); checkCustomEvent(output[0], e, user); - done(); }); - }); + })); - it('stringifies user attributes in custom event', done => { + it('stringifies user attributes in custom event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'custom', creationDate: 1000, user: numericUser, key: 'eventkey', - data: { thing: 'stuff' } }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'custom', creationDate: 1000, user: numericUser, key: 'eventkey', + data: { thing: 'stuff' } }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(1); checkCustomEvent(output[0], e, stringifiedNumericUser); - done(); }); - }); + })); - it('filters user in custom event', done => { + it('filters user in custom event', eventsServerTest(async s => { const config = Object.assign({}, defaultConfig, { allAttributesPrivate: true, inlineUsersInEvents: true }); - ep = EventProcessor(sdkKey, config); - const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', - data: { thing: 'stuff' } }; - ep.sendEvent(e); + await withEventProcessor(config, s, async ep => { + const e = { kind: 'custom', creationDate: 1000, user: user, key: 'eventkey', + data: { thing: 'stuff' } }; + ep.sendEvent(e); + await ep.flush(); - flushAndGetRequest(output => { + const output = await getJsonRequest(s); expect(output.length).toEqual(1); checkCustomEvent(output[0], e, filteredUser); - done(); - }); - }); - - it('sends nothing if there are no events', done => { - ep = EventProcessor(sdkKey, defaultConfig); - ep.flush(function() { - // Nock will generate an error if we sent a request we didn't explicitly listen for. - done(); }); - }); - - it('sends SDK key', done => { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'identify', creationDate: 1000, user: user }; - ep.sendEvent(e); + })); - flushAndGetRequest(function(requestBody, requestHeaders) { - expect(requestHeaders['authorization']).toEqual(sdkKey); - done(); + it('sends nothing if there are no events', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + await ep.flush(); + expect(s.requestCount()).toEqual(0); }); - }); - - function verifyUnrecoverableHttpError(done, status) { - ep = EventProcessor(sdkKey, defaultConfig); - const e = { kind: 'identify', creationDate: 1000, user: user }; - ep.sendEvent(e); - - flushAndGetRequest({ status: status }, (body, headers, error) => { - expect(error.message).toContain('error ' + status); + })); + it('sends SDK key', eventsServerTest(async s => { + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'identify', creationDate: 1000, user: user }; ep.sendEvent(e); + await ep.flush(); - ep.flush().then( - // no HTTP request should have been done here - Nock will error out if there was one - function() { }, - function(err) { - expect(err.message).toContain('SDK key is invalid'); - done(); - }); + const request = await s.nextRequest(); + expect(request.headers['authorization']).toEqual(sdkKey); + }); + })); + + function verifyUnrecoverableHttpError(status) { + return eventsServerTest(async s => { + s.forMethodAndPath('post', '/bulk', respond(status)); + await withEventProcessor(defaultConfig, s, async ep => { + const e = { kind: 'identify', creationDate: 1000, user: user }; + ep.sendEvent(e); + await expect(ep.flush()).rejects.toThrow('error ' + status); + + expect(s.requestCount()).toEqual(1); + await s.nextRequest(); + + ep.sendEvent(e); + await expect(ep.flush()).rejects.toThrow(/SDK key is invalid/); + expect(s.requestCount()).toEqual(0); + }); }); } - function verifyRecoverableHttpError(done, status) { - ep = EventProcessor(sdkKey, defaultConfig); - var e0 = { kind: 'identify', creationDate: 1000, user: user }; - ep.sendEvent(e0); - - nock(eventsUri).post('/bulk').reply(status); - nock(eventsUri).post('/bulk').reply(status); - // since we only queued two responses, Nock will throw an error if it gets a third. - ep.flush().then( - function() {}, - function(err) { - expect(err.message).toContain('error ' + status); - - var e1 = { kind: 'identify', creationDate: 1001, user: user }; - ep.sendEvent(e1); - - // this second event should go through - flushAndGetRequest(output => { - expect(output.length).toEqual(1); - expect(output[0].creationDate).toEqual(1001); - - done(); - }); + function verifyRecoverableHttpError(status) { + return eventsServerTest(async s => { + s.forMethodAndPath('post', '/bulk', respond(status)); + await withEventProcessor(defaultConfig, s, async ep => { + var e = { kind: 'identify', creationDate: 1000, user: user }; + ep.sendEvent(e); + await expect(ep.flush()).rejects.toThrow('error ' + status); + + expect(s.requestCount()).toEqual(2); + await s.nextRequest(); + await s.nextRequest(); + + s.forMethodAndPath('post', '/bulk', respond(200)); + ep.sendEvent(e); + await ep.flush(); + expect(s.requestCount()).toEqual(1); }); + }); } - it('retries after a 400 error', done => { - verifyRecoverableHttpError(done, 400); - }); + it('retries after a 400 error', verifyRecoverableHttpError(400)); - it('stops sending events after a 401 error', done => { - verifyUnrecoverableHttpError(done, 401); - }); + it('stops sending events after a 401 error', verifyUnrecoverableHttpError(401)); - it('stops sending events after a 403 error', done => { - verifyUnrecoverableHttpError(done, 403); - }); + it('stops sending events after a 403 error', verifyUnrecoverableHttpError(403)); - it('retries after a 408 error', done => { - verifyRecoverableHttpError(done, 408); - }); + it('retries after a 408 error', verifyRecoverableHttpError(408)); - it('retries after a 429 error', done => { - verifyRecoverableHttpError(done, 429); - }); + it('retries after a 429 error', verifyRecoverableHttpError(429)); - it('retries after a 503 error', done => { - verifyRecoverableHttpError(done, 503); - }); + it('retries after a 503 error', verifyRecoverableHttpError(503)); - it('swallows errors from failed background flush', done => { + it('swallows errors from failed background flush', eventsServerTest(async s => { // This test verifies that when a background flush fails, we don't emit an unhandled // promise rejection. Jest will fail the test if we do that. - + const config = Object.assign({}, defaultConfig, { flushInterval: 0.25 }); - ep = EventProcessor(sdkKey, config); - ep.sendEvent({ kind: 'identify', creationDate: 1000, user: user }); - - var req1 = nock(eventsUri).post('/bulk').reply(500); - var req2 = nock(eventsUri).post('/bulk').reply(500); - - // unfortunately we must wait for both the flush interval and the 1-second retry interval - var delay = 1500; - setTimeout(() => { - expect(req1.isDone()).toEqual(true); - expect(req2.isDone()).toEqual(true); - done(); - }, delay); - }); + await withEventProcessor(config, s, async ep => { + s.forMethodAndPath('post', '/bulk', respond(500)); + + ep.sendEvent({ kind: 'identify', creationDate: 1000, user: user }); + + // unfortunately we must wait for both the flush interval and the 1-second retry interval + await sleepAsync(1500); + expect(s.requestCount()).toEqual(2); + }); + })); }); diff --git a/test/file_data_source-test.js b/test/file_data_source-test.js index a343f60..2ef4c82 100644 --- a/test/file_data_source-test.js +++ b/test/file_data_source-test.js @@ -1,18 +1,19 @@ -var fs = require('fs'); -var tmp = require('tmp'); -var dataKind = require('../versioned_data_kind'); -const { asyncify, asyncifyNode, sleepAsync } = require('./async_utils'); - -var LaunchDarkly = require('../index'); -var FileDataSource = require('../file_data_source'); -var InMemoryFeatureStore = require('../feature_store'); - -var flag1Key = 'flag1'; -var flag2Key = 'flag2'; -var flag2Value = 'value2'; -var segment1Key = 'seg1'; - -var flag1 = { +const fs = require('fs'); +const tmp = require('tmp'); +const dataKind = require('../versioned_data_kind'); +const { asyncify, promisify, sleepAsync } = require('./async_utils'); +const { stubLogger } = require('./stubs'); + +const LaunchDarkly = require('../index'); +const FileDataSource = require('../file_data_source'); +const InMemoryFeatureStore = require('../feature_store'); + +const flag1Key = 'flag1'; +const flag2Key = 'flag2'; +const flag2Value = 'value2'; +const segment1Key = 'seg1'; + +const flag1 = { "key": flag1Key, "on": true, "fallthrough": { @@ -21,26 +22,26 @@ var flag1 = { "variations": [ "fall", "off", "on" ] }; -var segment1 = { +const segment1 = { "key": segment1Key, "include": ["user1"] }; -var flagOnlyJson = ` +const flagOnlyJson = ` { "flags": { "${flag1Key}": ${ JSON.stringify(flag1) } } }`; -var segmentOnlyJson = ` +const segmentOnlyJson = ` { "segments": { "${segment1Key}": ${ JSON.stringify(segment1) } } }`; -var allPropertiesJson = ` +const allPropertiesJson = ` { "flags": { "${flag1Key}": ${ JSON.stringify(flag1) } @@ -53,7 +54,7 @@ var allPropertiesJson = ` } }`; -var allPropertiesYaml = ` +const allPropertiesYaml = ` flags: ${flag1Key}: key: ${flag1Key} @@ -81,10 +82,7 @@ describe('FileDataSource', function() { beforeEach(() => { store = InMemoryFeatureStore(); dataSources = []; - logger = { - info: jest.fn(), - warn: jest.fn() - }; + logger = stubLogger(); }); afterEach(() => { @@ -92,14 +90,14 @@ describe('FileDataSource', function() { }); function makeTempFile(content) { - return asyncifyNode(tmp.file) + return promisify(tmp.file)() .then(path => { return replaceFileContent(path, content).then(() => path); }); } function replaceFileContent(path, content) { - return asyncifyNode(cb => fs.writeFile(path, content, cb)); + return promisify(fs.writeFile)(path, content); } function setupDataSource(options) { @@ -226,7 +224,7 @@ describe('FileDataSource', function() { it('evaluates simplified flag with client as expected', async () => { var path = await makeTempFile(allPropertiesJson); var factory = FileDataSource({ paths: [ path ]}); - var config = { updateProcessor: factory, sendEvents: false }; + var config = { updateProcessor: factory, sendEvents: false, logger: logger }; var client = LaunchDarkly.init('dummy-key', config); var user = { key: 'userkey' }; @@ -242,7 +240,7 @@ describe('FileDataSource', function() { it('evaluates full flag with client as expected', async () => { var path = await makeTempFile(allPropertiesJson); var factory = FileDataSource({ paths: [ path ]}); - var config = { updateProcessor: factory, sendEvents: false }; + var config = { updateProcessor: factory, sendEvents: false, logger: logger }; var client = LaunchDarkly.init('dummy-key', config); var user = { key: 'userkey' }; diff --git a/test/http_server.js b/test/http_server.js index c97fb4b..7afe124 100644 --- a/test/http_server.js +++ b/test/http_server.js @@ -1,82 +1,195 @@ const http = require('http'); const https = require('https'); +const { AsyncQueue } = require('./async_utils'); -// This is adapted from some helper code in https://github.com/EventSource/eventsource/blob/master/test/eventsource_test.js +// This file provides a simple interface for using an embedded HTTP or HTTPS server to handle +// requests in an end-to-end integration test. The implementation is based on Node's built-in +// server functionality, but the Node APIs are not exposed directly so test code can just use +// our abstraction. + +// The original design was based on helper code in https://github.com/EventSource/eventsource/blob/master/test/eventsource_test.js let nextPort = 8000; -let servers = []; +let allServerWrappers = []; + +function preprocessRequest(req) { + const method = req.method.toLowerCase(); + const bodyPromise = new Promise(resolve => { + if (method === 'post' || method === 'put') { + let body = ''; + req.on('data', data => { + body += data; + }); + req.on('end', () => resolve(body)); + } else { + resolve(); + } + }); + + return bodyPromise.then(body => ({ + method, + path: req.url, + headers: req.headers, + body + })); +} export async function createServer(secure, options) { - const server = secure ? https.createServer(options) : http.createServer(options); - let port = nextPort++; + const realServer = secure ? https.createServer(options) : http.createServer(options); - server.requests = []; + const requests = AsyncQueue(); const responses = []; + const handlers = []; - server.on('request', (req, res) => { - server.requests.push(req); - responses.push(res); + realServer.on('request', (req, res) => { + preprocessRequest(req).then(reqWrapper => { + requests.add(reqWrapper); + responses.push(res); + for (let i in handlers) { + if (handlers[i](reqWrapper, res)) { + break; + } + } + }); }); - const realClose = server.close; - server.close = callback => { - responses.forEach(res => res.end()); - realClose.call(server, callback); + const serverWrapper = { + // An AsyncQueue of all requests handled so far. Call "await server.requests.take()" to block + // until the server has handled a request. Each request is a simple object with the properties + // "method", "path", "headers", and "body". + requests, + + // Blocks to retrieve the next handled request. + nextRequest: () => requests.take(), + + // Returns the number of handled requests not yet retrieved. + requestCount: () => requests.length(), + + // Specify a function to be called by default. It takes a single parameter that is the Node + // ClientResponse object. You'll normally use the "respond" functions in this module: + // server.always(respondJson({ message: 'hi' })); + default: responderFn => { + handlers.push((req, res) => { + responderFn(res); + return true; + }); + }, + + // Specifies a function to be called only for the given method and path. Same responder semantics + // as default(). Overrides any previous handler for the same method and path. + forMethodAndPath: (method, path, responderFn) => { + handlers.unshift((req, res) => { + if (req.method === method.toLowerCase() && req.path === path) { + responderFn(res); + return true; + } + return false; + }); + }, + + close: async () => { + responses.forEach(res => res.end()); + requests.close(); // causes anyone waiting on the queue to get an exception + return new Promise(resolve => { + realServer.close(resolve); + }); + } }; - servers.push(server); + allServerWrappers.push(serverWrapper); while (true) { + const port = nextPort++; try { await new Promise((resolve, reject) => { - server.listen(port); - server.on('error', reject); - server.on('listening', resolve); + realServer.listen(port); + realServer.on('error', reject); + realServer.on('listening', resolve); }); - server.url = (secure ? 'https' : 'http') + '://localhost:' + port; - return server; + serverWrapper.url = (secure ? 'https' : 'http') + '://localhost:' + port; + return serverWrapper; } catch (err) { - if (err.message.match(/EADDRINUSE/)) { - port = nextPort++; - } else { + if (!err.message.match(/EADDRINUSE/)) { throw err; } } } } -export function closeServers() { - servers.forEach(server => server.close()); - servers = []; -} - -export function readAll(req) { - return new Promise(resolve => { - let body = ''; - req.on('data', data => { - body += data; - }); - req.on('end', () => resolve(body)); - }); +export async function closeServers() { + const all = [...allServerWrappers]; + allServerWrappers = []; + for (let i in all) { + await all[i].close(); + } } -export function respond(res, status, headers, body) { - res.writeHead(status, headers); - body && res.write(body); - res.end(); +// Usage: +// server.forMethodAndPath('get', '/path', respond(200, { 'content-type': 'text/plain' }, 'hello')); +export function respond(status, headers, body) { + return res => { + res.writeHead(status, headers); + body && res.write(body); + res.end(); + }; } -export function respondJson(res, data) { - respond(res, 200, { 'Content-Type': 'application/json' }, JSON.stringify(data)); +// Usage: +// server.forMethodAndPath('get', '/path', respondJson({ message: 'I am a JSON object' })); +export function respondJson(data) { + return respond(200, { 'Content-Type': 'application/json' }, JSON.stringify(data)); } -export function respondSSEEvent(res, eventType, eventData) { - res.writeHead(200, { 'Content-Type': 'text/event-stream' }) - res.write('event: ' + eventType + '\ndata: ' + JSON.stringify(eventData) + '\n\n'); - res.write(':\n'); - // purposely do not close the stream +// Usage: +// const chunkQueue = AsyncQueue(); +// server.forMethodAndPath('get', '/path', respondChunked(200, {}, chunkQueue)); +// chunkQueue.add('a chunk of data'); +// chunkQueue.add('another one'); +// chunkQueue.close(); // closing the queue ends the response +export function respondChunked(status, headers, chunkQueue) { + return async res => { + res.writeHead(status, headers); + res.write(''); // this just avoids response buffering, and causes all subsequent writes to be chunked + while (true) { + try { + const chunk = await chunkQueue.take(); + res.write(chunk); + } catch (e) { + // queue was probably closed + res.end(); + break; + } + } + } } -export function autoRespond(server, respondFn) { - server.on('request', (req, res) => respondFn(res)); +// Usage: +// const eventQueue = AsyncQueue(); +// server.forMethodAndPath('get', '/path', respondSSE(200, {}, eventQueue)); +// eventQueue.add({ type: 'patch', data: { path: '/flags', key: 'x' } }); +// eventQueue.add({ comment: '' }); +// eventQueue.close(); // closing the queue ends the response +export function respondSSE(eventQueue) { + const chunkQueue = AsyncQueue(); + (async () => { // we're not awaiting this task - it keeps running after we return + while (true) { + let event, chunk; + try { + event = await eventQueue.take(); + } catch (e) { + chunkQueue.close(); + break; + } + if (event.comment !== undefined) { + chunk = ':' + event.comment + '\n'; + } else { + chunk = 'event: ' + event.type + '\n'; + chunk += 'data: '; + chunk += (typeof event.data === 'string') ? event.data : JSON.stringify(event.data); + chunk += '\n\n'; + } + chunkQueue.add(chunk); + } + })(); + return respondChunked(200, { 'Content-Type': 'text/event-stream' }, chunkQueue); } diff --git a/test/polling-test.js b/test/polling-test.js index 56f4c7d..e296f86 100644 --- a/test/polling-test.js +++ b/test/polling-test.js @@ -1,7 +1,7 @@ const InMemoryFeatureStore = require('../feature_store'); const PollingProcessor = require('../polling'); const dataKind = require('../versioned_data_kind'); -const { asyncify, asyncifyNode, sleepAsync } = require('./async_utils'); +const { asyncify, promisify, sleepAsync } = require('./async_utils'); const stubs = require('./stubs'); describe('PollingProcessor', () => { @@ -48,7 +48,7 @@ describe('PollingProcessor', () => { }; processor = PollingProcessor(config, requestor); - await asyncifyNode(cb => processor.start(cb)); // didn't throw -> success + await promisify(processor.start)(); // didn't throw -> success }); it('initializes feature store', async () => { @@ -57,7 +57,7 @@ describe('PollingProcessor', () => { }; processor = PollingProcessor(config, requestor); - await asyncifyNode(cb => processor.start(cb)); + await promisify(processor.start)(); const flags = await asyncify(cb => store.all(dataKind.features, cb)); expect(flags).toEqual(allData.flags); diff --git a/test/redis_feature_store-test.js b/test/redis_feature_store-test.js index 37b18f0..4415f77 100644 --- a/test/redis_feature_store-test.js +++ b/test/redis_feature_store-test.js @@ -1,5 +1,6 @@ const RedisFeatureStore = require('../redis_feature_store'); const testBase = require('./feature_store_test_base'); +const { stubLogger } = require('./stubs'); const redis = require('redis'); @@ -11,15 +12,15 @@ const shouldSkip = (process.env.LD_SKIP_DATABASE_TESTS === '1'); const extraRedisClient = redis.createClient(redisOpts); function makeCachedStore() { - return new RedisFeatureStore(redisOpts, 30); + return new RedisFeatureStore(redisOpts, 30, null, stubLogger()); } function makeUncachedStore() { - return new RedisFeatureStore(redisOpts, 0); + return new RedisFeatureStore(redisOpts, 0, null, stubLogger()); } function makeStoreWithPrefix(prefix) { - return new RedisFeatureStore(redisOpts, 0, prefix); + return new RedisFeatureStore(redisOpts, 0, prefix, stubLogger()); } function clearExistingData(callback) { diff --git a/test/requestor-test.js b/test/requestor-test.js index f913a2c..7d6c551 100644 --- a/test/requestor-test.js +++ b/test/requestor-test.js @@ -1,7 +1,7 @@ import Requestor from '../requestor'; import * as dataKind from '../versioned_data_kind'; -import { asyncifyNode } from './async_utils'; -import * as httpServer from './http_server'; +import { promisify, withCloseable } from './async_utils'; +import { createServer, respond, respondJson } from './http_server'; describe('Requestor', () => { const sdkKey = 'x'; @@ -9,84 +9,63 @@ describe('Requestor', () => { const someData = { key: { version: 1 } }; const allData = { flags: someData, segments: someData }; - let server; - let config; - - beforeEach(async () => { - server = await httpServer.createServer(); - config = { baseUri: server.url }; - }); - - afterEach(() => { - httpServer.closeServers(); - }); - describe('requestObject', () => { - it('uses correct flag URL', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, {})); - const r = Requestor(sdkKey, config); - await asyncifyNode(cb => r.requestObject(dataKind.features, 'key', cb)); - expect(server.requests.length).toEqual(1); - expect(server.requests[0].url).toEqual('/sdk/latest-flags/key'); - }); + it('gets flag data', async () => + await withCloseable(createServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-flags/key', respondJson(someData)); + const r = Requestor(sdkKey, { baseUri: server.url }); + const result = await promisify(r.requestObject)(dataKind.features, 'key'); + expect(JSON.parse(result)).toEqual(someData); + }) + ); - it('uses correct segment URL', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, {})); - const r = Requestor(sdkKey, config); - await asyncifyNode(cb => r.requestObject(dataKind.segments, 'key', cb)); - expect(server.requests.length).toEqual(1); - expect(server.requests[0].url).toEqual('/sdk/latest-segments/key'); - }); + it('gets segment data', async () => + await withCloseable(createServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-segments/key', respondJson(someData)); + const r = Requestor(sdkKey, { baseUri: server.url }); + const result = await promisify(r.requestObject)(dataKind.segments, 'key'); + expect(JSON.parse(result)).toEqual(someData); + }) + ); - it('returns successful result', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, someData)); - const r = Requestor(sdkKey, config); - const result = await asyncifyNode(cb => r.requestObject(dataKind.features, 'key', cb)); - expect(JSON.parse(result)).toEqual(someData); - }); - - it('returns error result for HTTP error', async () => { - httpServer.autoRespond(server, res => httpServer.respond(res, 404)); - const r = Requestor(sdkKey, config); - const req = asyncifyNode(cb => r.requestObject(dataKind.features, 'key', cb)); - await expect(req).rejects.toThrow(/404/); - }); + it('returns error result for HTTP error', async () => + await withCloseable(createServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-flags/key', respond(404)); + const r = Requestor(sdkKey, { baseUri: server.url }); + const req = promisify(r.requestObject)(dataKind.features, 'key'); + await expect(req).rejects.toThrow(/404/); + }) + ); it('returns error result for network error', async () => { - config.baseUri = badUri; - const r = Requestor(sdkKey, config); - const req = asyncifyNode(cb => r.requestObject(dataKind.features, 'key', cb)); + const r = Requestor(sdkKey, { baseUri: badUri }); + const req = promisify(r.requestObject)(dataKind.features, 'key'); await expect(req).rejects.toThrow(/bad-uri/); }); }); describe('requestAllData', () => { - it('uses correct URL', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, {})); - const r = Requestor(sdkKey, config); - await asyncifyNode(cb => r.requestAllData(cb)); - expect(server.requests.length).toEqual(1); - expect(server.requests[0].url).toEqual('/sdk/latest-all'); - }); + it('gets data', async () => + await withCloseable(createServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-all', respondJson(allData)); + const r = Requestor(sdkKey, { baseUri: server.url }); + const result = await promisify(r.requestAllData)(); + expect(JSON.parse(result)).toEqual(allData); + }) + ); - it('returns successful result', async () => { - httpServer.autoRespond(server, res => httpServer.respondJson(res, allData)); - const r = Requestor(sdkKey, config); - const result = await asyncifyNode(cb => r.requestAllData(cb)); - expect(JSON.parse(result)).toEqual(allData); - }); - - it('returns error result for HTTP error', async () => { - httpServer.autoRespond(server, res => httpServer.respond(res, 404)); - const r = Requestor(sdkKey, config); - const req = asyncifyNode(cb => r.requestAllData(cb)); - await expect(req).rejects.toThrow(/404/); - }); + it('returns error result for HTTP error', async () => + await withCloseable(createServer, async server => { + server.forMethodAndPath('get', '/sdk/latest-all', respond(401)); + const r = Requestor(sdkKey, { baseUri: server.url }); + const req = promisify(r.requestAllData)(); + await expect(req).rejects.toThrow(/401/); + }) + ); it('returns error result for network error', async () => { - config.baseUri = badUri; - const r = Requestor(sdkKey, config); - const req = asyncifyNode(cb => r.requestAllData(cb)); + const r = Requestor(sdkKey, { baseUri: badUri }); + const req = promisify(r.requestAllData)(); await expect(req).rejects.toThrow(/bad-uri/); }); });