Skip to content

Commit

Permalink
[ML] Versioning AIOps APIs (elastic#158806)
Browse files Browse the repository at this point in the history
Adds versioning to the AIOps API.
Versions are added to the server side routes and to the client side
functions which call the routes.
Updates API tests to add the API version to the request headers.

The single API endpoint is already internal and now has been given the
version '1'.

**Internal APIs**

`/internal/aiops/explain_log_rate_spikes`
  • Loading branch information
walterra authored Jun 1, 2023
1 parent 648c669 commit 423cb35
Show file tree
Hide file tree
Showing 14 changed files with 828 additions and 675 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const PageReducerStream: FC = () => {
typeof basePath
>(
`${basePath}/internal/response_stream/reducer_stream`,
'1',
{ compressResponse, simulateErrors },
{ reducer: reducerStreamReducer, initialState }
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const PageSimpleStringStream: FC = () => {
const { dispatch, errors, start, cancel, data, isRunning } = useFetchStream<
ApiSimpleStringStream,
typeof basePath
>(`${basePath}/internal/response_stream/simple_string_stream`, {
>(`${basePath}/internal/response_stream/simple_string_stream`, '1', {
compressResponse,
timeout: 500,
});
Expand Down
193 changes: 100 additions & 93 deletions examples/response_stream/server/routes/reducer_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,103 +20,110 @@ import {
import { API_ENDPOINT } from '../../common/api';

export const defineReducerStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
router.versioned
.post({
path: API_ENDPOINT.REDUCER_STREAM,
validate: {
body: reducerStreamRequestBodySchema,
access: 'internal',
})
.addVersion(
{
version: '1',
validate: {
request: {
body: reducerStreamRequestBodySchema,
},
},
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;

let logMessageCounter = 1;

function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}

logDebugMessage('Starting stream.');

let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});

const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger,
request.body.compressResponse
);

const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];

const actions = [...Array(19).fill('add'), 'delete'];

if (simulateError) {
actions.push('throw-error');
actions.push('emit-error');
}

let progress = 0;

async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;

if (progress > 100 || shouldStop) {
end();
return;
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
const simulateError = request.body.simulateErrors ?? false;

let logMessageCounter = 1;

function logDebugMessage(msg: string) {
logger.debug(`Response Stream Example #${logMessageCounter}: ${msg}`);
logMessageCounter++;
}

logDebugMessage('Starting stream.');

let shouldStop = false;
request.events.aborted$.subscribe(() => {
logDebugMessage('aborted$ subscription trigger.');
shouldStop = true;
});
request.events.completed$.subscribe(() => {
logDebugMessage('completed$ subscription trigger.');
shouldStop = true;
});

const { end, push, responseWithHeaders } = streamFactory<ReducerStreamApiAction>(
request.headers,
logger,
request.body.compressResponse
);

const entities = [
'kimchy',
's1monw',
'martijnvg',
'jasontedor',
'nik9000',
'javanna',
'rjernst',
'jrodewig',
];

const actions = [...Array(19).fill('add'), 'delete'];

if (simulateError) {
actions.push('throw-error');
actions.push('emit-error');
}

let progress = 0;

async function pushStreamUpdate() {
setTimeout(() => {
try {
progress++;

if (progress > 100 || shouldStop) {
end();
return;
}

push(updateProgressAction(progress));

const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];

if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught and logged to the Kibana server console.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'emit-error') {
// Emit an error as a stream action.
push(errorAction('(Simulated) error pushed to the stream'));
return;
}

pushStreamUpdate();
} catch (e) {
logger.error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
}

push(updateProgressAction(progress));

const randomEntity = entities[Math.floor(Math.random() * entities.length)];
const randomAction = actions[Math.floor(Math.random() * actions.length)];

if (randomAction === 'add') {
const randomCommits = Math.floor(Math.random() * 100);
push(addToEntityAction(randomEntity, randomCommits));
} else if (randomAction === 'delete') {
push(deleteEntityAction(randomEntity));
} else if (randomAction === 'throw-error') {
// Throw an error. It should not crash Kibana!
// It should be caught and logged to the Kibana server console.
throw new Error('There was a (simulated) server side error!');
} else if (randomAction === 'emit-error') {
// Emit an error as a stream action.
push(errorAction('(Simulated) error pushed to the stream'));
return;
}
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();

pushStreamUpdate();
} catch (e) {
logger.error(e);
}
}, Math.floor(Math.random() * maxTimeoutMs));
return response.ok(responseWithHeaders);
}

// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();

return response.ok(responseWithHeaders);
}
);
);
};
95 changes: 51 additions & 44 deletions examples/response_stream/server/routes/single_string_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,70 @@ function timeout(ms: number) {
}

export const defineSimpleStringStreamRoute = (router: IRouter, logger: Logger) => {
router.post(
{
router.versioned
.post({
path: API_ENDPOINT.SIMPLE_STRING_STREAM,
validate: {
body: simpleStringStreamRequestBodySchema,
access: 'internal',
})
.addVersion(
{
version: '1',
validate: {
request: {
body: simpleStringStreamRequestBodySchema,
},
},
},
},
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;
async (context, request, response) => {
const maxTimeoutMs = request.body.timeout ?? 250;

let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});
let shouldStop = false;
request.events.aborted$.subscribe(() => {
shouldStop = true;
});
request.events.completed$.subscribe(() => {
shouldStop = true;
});

const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);
const { end, push, responseWithHeaders } = streamFactory(
request.headers,
logger,
request.body.compressResponse
);

const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';
const text =
'Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. Elasticsearch is developed in Java and is dual-licensed under the source-available Server Side Public License and the Elastic license, while other parts fall under the proprietary (source-available) Elastic License. Official clients are available in Java, .NET (C#), PHP, Python, Apache Groovy, Ruby and many other languages. According to the DB-Engines ranking, Elasticsearch is the most popular enterprise search engine.';

const tokens = text.split(' ');
const tokens = text.split(' ');

async function pushStreamUpdate() {
try {
if (shouldStop) {
end();
return;
}
async function pushStreamUpdate() {
try {
if (shouldStop) {
end();
return;
}

const token = tokens.shift();
const token = tokens.shift();

if (token !== undefined) {
push(`${token} `);
await timeout(Math.floor(Math.random() * maxTimeoutMs));
if (token !== undefined) {
push(`${token} `);
await timeout(Math.floor(Math.random() * maxTimeoutMs));

if (!shouldStop) {
pushStreamUpdate();
if (!shouldStop) {
pushStreamUpdate();
}
} else {
end();
}
} else {
end();
} catch (e) {
logger.error(`There was an error: ${e.toString()}`);
}
} catch (e) {
logger.error(`There was an error: ${e.toString()}`);
}
}

// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();
// do not call this using `await` so it will run asynchronously while we return the stream already.
pushStreamUpdate();

return response.ok(responseWithHeaders);
}
);
return response.ok(responseWithHeaders);
}
);
};
5 changes: 5 additions & 0 deletions x-pack/packages/ml/aiops_utils/src/fetch_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import type { ReducerAction } from 'react';

import { ELASTIC_HTTP_VERSION_HEADER } from '@kbn/core-http-common';

import type { UseFetchStreamParamsDefault } from './use_fetch_stream';

type GeneratorError = string | null;
Expand All @@ -23,6 +25,7 @@ type GeneratorError = string | null;
* ```
*
* @param endpoint — The API endpoint including the Kibana basepath.
* @param apiVersion - The API version to be used.
* @param abortCtrl — Abort controller for cancelling the request.
* @param body — The request body. For now all requests are POST.
* @param ndjson — Boolean flag to receive the stream as a raw string or NDJSON.
Expand All @@ -37,6 +40,7 @@ type GeneratorError = string | null;
*/
export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePath extends string>(
endpoint: `${BasePath}${I['endpoint']}`,
apiVersion: string,
abortCtrl: React.MutableRefObject<AbortController>,
body: I['body'],
ndjson = true,
Expand All @@ -54,6 +58,7 @@ export async function* fetchStream<I extends UseFetchStreamParamsDefault, BasePa
// This refers to the format of the request body,
// not the response, which will be a uint8array Buffer.
'Content-Type': 'application/json',
[ELASTIC_HTTP_VERSION_HEADER]: apiVersion,
'kbn-xsrf': 'stream',
},
...(Object.keys(body).length > 0 ? { body: JSON.stringify(body) } : {}),
Expand Down
Loading

0 comments on commit 423cb35

Please sign in to comment.