diff --git a/packages/kbn-interpreter/src/public/batched_fetch.js b/packages/kbn-interpreter/src/public/batched_fetch.js index 20fd0d37b5ef9..414b1443f5b81 100644 --- a/packages/kbn-interpreter/src/public/batched_fetch.js +++ b/packages/kbn-interpreter/src/public/batched_fetch.js @@ -23,7 +23,7 @@ import { FUNCTIONS_URL } from './consts'; * Create a function which executes an Expression function on the * server as part of a larger batch of executions. */ -export function batchedFetch({ kfetch, serialize, ms = 10 }) { +export function batchedFetch({ ajaxStream, serialize, ms = 10 }) { // Uniquely identifies each function call in a batch operation // so that the appropriate promise can be resolved / rejected later. let id = 0; @@ -42,7 +42,7 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) { }; const runBatch = () => { - processBatch(kfetch, batch); + processBatch(ajaxStream, batch); reset(); }; @@ -70,14 +70,15 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) { function createFuture() { let resolve; let reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); return { - resolve(val) { return resolve(val); }, - reject(val) { return reject(val); }, - promise: new Promise((res, rej) => { - resolve = res; - reject = rej; - }), + resolve, + reject, + promise, }; } @@ -85,22 +86,21 @@ function createFuture() { * Runs the specified batch of functions on the server, then resolves * the related promises. */ -async function processBatch(kfetch, batch) { +async function processBatch(ajaxStream, batch) { try { - const { results } = await kfetch({ - pathname: FUNCTIONS_URL, - method: 'POST', + await ajaxStream({ + url: FUNCTIONS_URL, body: JSON.stringify({ functions: Object.values(batch).map(({ request }) => request), }), - }); + onResponse({ id, statusCode, result }) { + const { future } = batch[id]; - results.forEach(({ id, result }) => { - const { future } = batch[id]; - if (result.statusCode && result.err) { - future.reject(result); - } else { - future.resolve(result); + if (statusCode >= 400) { + future.reject(result); + } else { + future.resolve(result); + } } }); } catch (err) { diff --git a/packages/kbn-interpreter/src/public/batched_fetch.test.js b/packages/kbn-interpreter/src/public/batched_fetch.test.js new file mode 100644 index 0000000000000..f1c04e9de6d06 --- /dev/null +++ b/packages/kbn-interpreter/src/public/batched_fetch.test.js @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { batchedFetch } from './batched_fetch'; + +const serialize = (o) => JSON.stringify(o); + +describe('batchedFetch', () => { + it('resolves the correct promise', async () => { + const ajaxStream = jest.fn(async ({ body, onResponse }) => { + const { functions } = JSON.parse(body); + functions.map(({ id, functionName, context, args }) => onResponse({ + id, + statusCode: 200, + result: `${functionName}${context}${args}`, + })); + }); + + const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 }); + + const result = await Promise.all([ + ajax({ functionName: 'a', context: 1, args: 'aaa' }), + ajax({ functionName: 'b', context: 2, args: 'bbb' }), + ]); + + expect(result).toEqual([ + 'a1aaa', + 'b2bbb', + ]); + }); + + it('rejects responses whose statusCode is >= 300', async () => { + const ajaxStream = jest.fn(async ({ body, onResponse }) => { + const { functions } = JSON.parse(body); + functions.map(({ id, functionName, context, args }) => onResponse({ + id, + statusCode: context, + result: context >= 400 ? { err: {} } : `${functionName}${context}${args}`, + })); + }); + + const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 }); + + const result = await Promise.all([ + ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'), + ajax({ functionName: 'b', context: 400, args: 'bbb' }).catch(() => 'fail'), + ajax({ functionName: 'c', context: 200, args: 'ccc' }), + ]); + + expect(result).toEqual([ + 'fail', + 'fail', + 'c200ccc' + ]); + }); +}); \ No newline at end of file diff --git a/packages/kbn-interpreter/src/public/interpreter.js b/packages/kbn-interpreter/src/public/interpreter.js index 9e695864e1f45..02adc19f7810a 100644 --- a/packages/kbn-interpreter/src/public/interpreter.js +++ b/packages/kbn-interpreter/src/public/interpreter.js @@ -23,11 +23,11 @@ import { createHandlers } from './create_handlers'; import { batchedFetch } from './batched_fetch'; import { FUNCTIONS_URL } from './consts'; -export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) { +export async function initializeInterpreter({ kfetch, ajaxStream, typesRegistry, functionsRegistry }) { const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL }); const types = typesRegistry.toJS(); const { serialize } = serializeProvider(types); - const batch = batchedFetch({ kfetch, serialize }); + const batch = batchedFetch({ ajaxStream, serialize }); // For every sever-side function, register a client-side // function that matches its definition, but which simply diff --git a/packages/kbn-interpreter/src/public/interpreter.test.js b/packages/kbn-interpreter/src/public/interpreter.test.js index 34eb3578ec35c..8593d0793a1be 100644 --- a/packages/kbn-interpreter/src/public/interpreter.test.js +++ b/packages/kbn-interpreter/src/public/interpreter.test.js @@ -35,26 +35,21 @@ jest.mock('./create_handlers', () => ({ describe('kbn-interpreter/interpreter', () => { it('loads server-side functions', async () => { const kfetch = jest.fn(async () => ({})); + const ajaxStream = jest.fn(async () => ({})); - await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register: () => {} })); + await initializeInterpreter({ + kfetch, + ajaxStream, + typesRegistry: { toJS: () => ({}) }, + functionsRegistry: ({ register: () => {} }), + }); expect(kfetch).toHaveBeenCalledTimes(1); expect(kfetch).toHaveBeenCalledWith({ pathname: FUNCTIONS_URL }); }); it('registers client-side functions that pass through to the server', async () => { - const kfetch = jest.fn(async ({ method }) => { - if (method === 'POST') { - return { - results: [{ - id: 1, - result: { - hello: 'world', - }, - }], - }; - } - + const kfetch = jest.fn(async () => { return { hello: { name: 'hello' }, world: { name: 'world' }, @@ -62,8 +57,16 @@ describe('kbn-interpreter/interpreter', () => { }); const register = jest.fn(); + const ajaxStream = jest.fn(async ({ onResponse }) => { + onResponse({ id: 1, result: { hello: 'world' } }); + }); - await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register })); + await initializeInterpreter({ + kfetch, + ajaxStream, + typesRegistry: { toJS: () => ({}) }, + functionsRegistry: ({ register }), + }); expect(register).toHaveBeenCalledTimes(2); @@ -81,9 +84,9 @@ describe('kbn-interpreter/interpreter', () => { expect(result).toEqual({ hello: 'world' }); - expect(kfetch).toHaveBeenCalledWith({ - pathname: FUNCTIONS_URL, - method: 'POST', + expect(ajaxStream).toHaveBeenCalledWith({ + url: FUNCTIONS_URL, + onResponse: expect.any(Function), body: JSON.stringify({ functions: [{ id: 1, diff --git a/src/legacy/core_plugins/interpreter/public/interpreter.js b/src/legacy/core_plugins/interpreter/public/interpreter.js index 1ec5b00d39526..b26f46f7a27ee 100644 --- a/src/legacy/core_plugins/interpreter/public/interpreter.js +++ b/src/legacy/core_plugins/interpreter/public/interpreter.js @@ -20,6 +20,7 @@ import { register } from '@kbn/interpreter/common'; import { initializeInterpreter, registries } from '@kbn/interpreter/public'; import { kfetch } from 'ui/kfetch'; +import { ajaxStream } from 'ui/ajax_stream'; import { functions } from './functions'; import { visualization } from './renderers/visualization'; @@ -32,7 +33,12 @@ let _resolve; let _interpreterPromise; const initialize = async () => { - initializeInterpreter(kfetch, registries.types, registries.browserFunctions).then(interpreter => { + initializeInterpreter({ + kfetch, + ajaxStream, + typesRegistry: registries.types, + functionsRegistry: registries.browserFunctions, + }).then(interpreter => { _resolve({ interpreter }); }); }; diff --git a/src/legacy/core_plugins/interpreter/server/routes/server_functions.js b/src/legacy/core_plugins/interpreter/server/routes/server_functions.js index 07b3fabad6afd..cdd552e258ca9 100644 --- a/src/legacy/core_plugins/interpreter/server/routes/server_functions.js +++ b/src/legacy/core_plugins/interpreter/server/routes/server_functions.js @@ -65,37 +65,64 @@ function runServerFunctions(server) { const handlers = await createHandlers(req, server); const { functions } = req.payload; - // Process each function individually, and bundle up respones / errors into - // the format expected by the front-end batcher. - const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => { - const result = await runFunction(server, handlers, fnCall) - .catch(err => { - if (Boom.isBoom(err)) { - return { err, statusCode: err.statusCode, message: err.output.payload }; - } - return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' }; - }); - - if (result == null) { - const { functionName } = fnCall; - return { - id, - result: { - err: `No result from '${functionName}'`, - statusCode: 500, - message: `Function '${functionName}' did not return anything` - } - }; + // Grab the raw Node response object. + const res = req.raw.res; + + // Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884 + req._isReplied = true; + + // Send the initial headers. + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Connection': 'keep-alive', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + }); + + // Write a length-delimited response + const streamResult = (result) => { + const payload = JSON.stringify(result) + '\n'; + res.write(`${payload.length}:${payload}`); + }; + + // Tries to run an interpreter function, and ensures a consistent error payload on failure. + const tryFunction = async (id, fnCall) => { + try { + const result = await runFunction(server, handlers, fnCall); + + if (result != null) { + return { id, statusCode: 200, result }; + } + + return batchError(id, `Function ${fnCall.functionName} did not return anything.`); + } catch (err) { + if (Boom.isBoom(err)) { + return batchError(id, err.output.payload, err.statusCode); + } + return batchError(id, 'See server logs for details.'); } + }; - return { id, result }; - })); + // Process each function individually, and stream the responses back to the client + await Promise.all(functions.map(({ id, ...fnCall }) => tryFunction(id, fnCall).then(streamResult))); - return { results }; + // All of the responses have been written, so we can close the response. + res.end(); }, }); } +/** + * A helper function for bundling up errors. + */ +function batchError(id, message, statusCode = 500) { + return { + id, + statusCode, + result: { statusCode, message }, + }; +} + /** * Register the endpoint that returns the list of server-only functions. * @param {*} server - The Kibana server diff --git a/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts b/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts new file mode 100644 index 0000000000000..755473d1ee237 --- /dev/null +++ b/src/legacy/ui/public/ajax_stream/ajax_stream.test.ts @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { ajaxStream, XMLHttpRequestLike } from './ajax_stream'; + +// tslint:disable-next-line:no-empty +function noop() {} + +describe('ajaxStream', () => { + it('pulls items from the stream and calls the handler', async () => { + const handler = jest.fn(() => ({})); + const { req, sendText, done } = mockRequest(); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + sendText(messages[0]); + sendText(messages[1]); + done(); + + await promise; + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith({ hello: 'world' }); + expect(handler).toHaveBeenCalledWith({ tis: 'fate' }); + }); + + it('handles partial messages', async () => { + const handler = jest.fn(() => ({})); + const { req, sendText, done } = mockRequest(); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'] + .map(m => `${m.length}:${m}`) + .join(''); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + for (const s of messages) { + sendText(s); + } + done(); + + await promise; + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith({ hello: 'world' }); + expect(handler).toHaveBeenCalledWith({ tis: 'fate' }); + }); + + it('sends the request', async () => { + const handler = jest.fn(() => ({})); + const { req, done } = mockRequest(); + + const promise = ajaxStream('mehBasePath', { a: 'b' }, req, { + url: '/test/endpoint', + onResponse: handler, + body: 'whatup', + headers: { foo: 'bar' }, + }); + + done(); + + await promise; + expect(req.open).toHaveBeenCalledWith('POST', 'mehBasePath/test/endpoint'); + expect(req.setRequestHeader).toHaveBeenCalledWith('foo', 'bar'); + expect(req.setRequestHeader).toHaveBeenCalledWith('a', 'b'); + expect(req.send).toHaveBeenCalledWith('whatup'); + }); + + it('rejects if network failure', async () => { + const handler = jest.fn(() => ({})); + const { req, done } = mockRequest(); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + body: 'whatup', + }); + + done(0); + expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); + }); + + it('rejects if http status error', async () => { + const handler = jest.fn(() => ({})); + const { req, done } = mockRequest(); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + body: 'whatup', + }); + + done(400); + expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); + }); + + it('rejects if the payload contains invalid JSON', async () => { + const handler = jest.fn(() => ({})); + const { req, sendText, done } = mockRequest(); + const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join(''); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + sendText(messages); + done(); + + expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); + }); + + it('rejects if the delim is invalid', async () => { + const handler = jest.fn(() => ({})); + const { req, sendText, done } = mockRequest(); + const messages = '{ "hi": "there" }'; + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + sendText(messages); + done(); + + expect(await promise.then(() => true).catch(({ message }) => message)).toMatch( + /invalid stream response/i + ); + }); + + it('rejects if the handler throws', async () => { + const handler = jest.fn(() => { + throw new Error('DOH!'); + }); + const { req, sendText, done } = mockRequest(); + const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'] + .map(m => `${m.length}:${m}`) + .join(''); + + const promise = ajaxStream('', {}, req, { + url: '/test/endpoint', + onResponse: handler, + }); + + sendText(messages); + done(); + + expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(/doh!/i); + }); +}); + +function mockRequest() { + const req: XMLHttpRequestLike = { + onprogress: noop, + onreadystatechange: noop, + open: jest.fn(), + readyState: 0, + responseText: '', + send: jest.fn(), + setRequestHeader: jest.fn(), + abort: jest.fn(), + status: 0, + withCredentials: false, + }; + + return { + req, + sendText(text: string) { + req.responseText += text; + req.onreadystatechange(); + req.onprogress(); + }, + done(status = 200) { + req.status = status; + req.readyState = 4; + req.onreadystatechange(); + }, + }; +} diff --git a/src/legacy/ui/public/ajax_stream/ajax_stream.ts b/src/legacy/ui/public/ajax_stream/ajax_stream.ts new file mode 100644 index 0000000000000..74e10e2a271bc --- /dev/null +++ b/src/legacy/ui/public/ajax_stream/ajax_stream.ts @@ -0,0 +1,167 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { once } from 'lodash'; + +/** + * This file contains the client-side logic for processing a streaming AJAX response. + * This allows things like request batching to process individual batch item results + * as soon as the server sends them, instead of waiting for the entire response before + * client-side processing can begin. + * + * The server sends responses in this format: {length}:{json}, for example: + * + * 18:{"hello":"world"}\n16:{"hello":"you"}\n + */ + +// T is the response payload (the JSON), and we don't really +// care what it's type / shape is. +export type BatchResponseHandler = (result: T) => void; + +export interface BatchOpts { + url: string; + onResponse: BatchResponseHandler; + method?: string; + body?: string; + headers?: { [k: string]: string }; +} + +// The subset of XMLHttpRequest that we use +export interface XMLHttpRequestLike { + abort: () => void; + onreadystatechange: any; + onprogress: any; + open: (method: string, url: string) => void; + readyState: number; + responseText: string; + send: (body?: string) => void; + setRequestHeader: (header: string, value: string) => void; + status: number; + withCredentials: boolean; +} + +// Create a function which, when successively passed streaming response text, +// calls a handler callback with each response in the batch. +function processBatchResponseStream(handler: BatchResponseHandler) { + let index = 0; + + return (text: string) => { + // While there's text to process... + while (index < text.length) { + // Our messages are delimited by colon: len:json + const delim = ':'; + const delimIndex = text.indexOf(delim, index); + const payloadStart = delimIndex + delim.length; + + // We've got an incomplete batch length + if (delimIndex < 0) { + return; + } + + const rawLen = text.slice(index, delimIndex); + const payloadLen = parseInt(rawLen, 10); + const payloadEnd = payloadStart + payloadLen; + + // We've got an invalid batch message (e.g. one without a numeric length: prefix) + if (isNaN(payloadLen)) { + throw new Error(`Invalid stream response length: ${rawLen}`); + } + + // We've got an incomplete batch message + if (text.length < payloadEnd) { + return; + } + + const payload = JSON.parse(text.slice(payloadStart, payloadEnd)); + handler(payload); + + index = payloadEnd; + } + }; +} + +/** + * Sends an AJAX request to the server, and processes the result as a + * streaming HTTP/1 response. + * + * @param basePath - The Kibana basepath + * @param defaultHeaders - The default HTTP headers to be sent with each request + * @param req - The XMLHttpRequest + * @param opts - The request options + * @returns A promise which resolves when the entire batch response has been processed. + */ +export function ajaxStream( + basePath: string, + defaultHeaders: { [k: string]: string }, + req: XMLHttpRequestLike, + opts: BatchOpts +) { + return new Promise((resolve, reject) => { + const { url, method, headers } = opts; + + // There are several paths by which the promise may resolve or reject. We wrap this + // in "once" as a safeguard against cases where we attempt more than one call. (e.g. + // a batch handler fails, so we reject the promise, but then new data comes in for + // a subsequent batch item) + const complete = once((err: Error | undefined = undefined) => + err ? reject(err) : resolve(req) + ); + + // Begin the request + req.open(method || 'POST', `${basePath}/${url.replace(/^\//, '')}`); + req.withCredentials = true; + + // Set the HTTP headers + Object.entries(Object.assign({}, defaultHeaders, headers)).forEach(([k, v]) => + req.setRequestHeader(k, v) + ); + + const batchHandler = processBatchResponseStream(opts.onResponse); + const processBatch = () => { + try { + batchHandler(req.responseText); + } catch (err) { + req.abort(); + complete(err); + } + }; + + req.onprogress = processBatch; + + req.onreadystatechange = () => { + // Older browsers don't support onprogress, so we need + // to call this here, too. It's safe to call this multiple + // times even for the same progress event. + processBatch(); + + // 4 is the magic number that means the request is done + if (req.readyState === 4) { + // 0 indicates a network failure. 400+ messages are considered server errors + if (req.status === 0 || req.status >= 400) { + complete(new Error(`Batch request failed with status ${req.status}`)); + } else { + complete(); + } + } + }; + + // Send the payload to the server + req.send(opts.body); + }); +} diff --git a/src/legacy/ui/public/ajax_stream/index.ts b/src/legacy/ui/public/ajax_stream/index.ts new file mode 100644 index 0000000000000..9c5f1832f9c5d --- /dev/null +++ b/src/legacy/ui/public/ajax_stream/index.ts @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import chrome from 'ui/chrome'; +import { metadata } from 'ui/metadata'; +import { ajaxStream as ajax, BatchOpts } from './ajax_stream'; + +const defaultHeaders = { + 'Content-Type': 'application/json', + 'kbn-version': metadata.version, +}; + +export { BatchOpts } from './ajax_stream'; + +export function ajaxStream(opts: BatchOpts) { + return ajax(chrome.getBasePath(), defaultHeaders, new XMLHttpRequest(), opts); +}