Skip to content

Commit

Permalink
[ML] Adds redux toolkit example for response_stream to developer ex…
Browse files Browse the repository at this point in the history
…amples. (#182690)

## Summary

Follow up to #132590.
Part of #181111.

This updates the developer examples for `@kbn/ml-response-stream` to
include a variant with a full Redux Toolkit setup. For this case, the
`@kbn/ml-response-stream` now includes a generic slice `streamSlice`
that can be used. This allows the actions created to be streamed via
NDJSON to be shared across server and client.

Functional tests for the examples were added too. To run these tests you
can use the following commands:

```
# Start the test server (can continue running)
node scripts/functional_tests_server.js --config test/examples/config.js
# Start a test run
node scripts/functional_test_runner.js --config test/examples/config.js
```

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
  • Loading branch information
walterra authored May 22, 2024
1 parent 9194c88 commit 5345e34
Show file tree
Hide file tree
Showing 31 changed files with 1,000 additions and 262 deletions.
21 changes: 10 additions & 11 deletions examples/response_stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ This plugin demonstrates how to stream chunks of data to the client with just a

To run Kibana with the described examples, use `yarn start --run-examples`.

The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates two use cases to get started: Streaming a raw string as well as a more complex example that streams Redux-like actions to the client which update React state via `useReducer()`.
The `response_stream` plugin demonstrates API endpoints that can stream data chunks with a single request with gzip/compression support. gzip-streams get decompressed natively by browsers. The plugin demonstrates some use cases to get you started:

Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.
- Streaming just a raw string.
- Streaming NDJSON with "old-school" redux like actions and client side state managed with `useFetchStream()`. This uses React's own `useReducer()` under the hood.
- Streaming NDJSON with actions created via Redux Toolkit's `createSlice()` to a client with a full Redux Toolkit setup.

No additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.
Code in `@kbn/ml-response-stream` contains helpers to set up a stream on the server side (`streamFactory()`) and consume it on the client side via a custom hook (`useFetchStream()`) or slice (`streamSlice()`). The utilities make use of TS generics in a way that allows to have type safety for both request related options as well as the returned data.

Besides Redux Toolkit for its particular use case, no additional third party libraries are used in the helpers to make it work. On the server, they integrate with `Hapi` and use node's own `gzip`. On the client, the custom hook abstracts away the necessary logic to consume the stream, internally it makes use of a generator function and `useReducer()` to update React state.

On the server, the simpler stream to send a string is set up like this:

Expand All @@ -21,12 +25,7 @@ The request's headers get passed on to automatically identify if compression is
On the client, the custom hook is used like this:

```ts
const {
errors,
start,
cancel,
data,
isRunning
} = useFetchStream('/internal/response_stream/simple_string_stream');
const { errors, start, cancel, data, isRunning } = useFetchStream(
'/internal/response_stream/simple_string_stream'
);
```

1 change: 1 addition & 0 deletions examples/response_stream/common/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@

export const RESPONSE_STREAM_API_ENDPOINT = {
REDUCER_STREAM: '/internal/response_stream/reducer_stream',
REDUX_STREAM: '/internal/response_stream/redux_stream',
SIMPLE_STRING_STREAM: '/internal/response_stream/simple_string_stream',
} as const;
68 changes: 0 additions & 68 deletions examples/response_stream/common/api/reducer_stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,71 +9,3 @@
export { reducerStreamReducer } from './reducer';
export { reducerStreamRequestBodySchema } from './request_body_schema';
export type { ReducerStreamRequestBodySchema } from './request_body_schema';

export const API_ACTION_NAME = {
UPDATE_PROGRESS: 'update_progress',
ADD_TO_ENTITY: 'add_to_entity',
DELETE_ENTITY: 'delete_entity',
ERROR: 'error',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];

interface ApiActionUpdateProgress {
type: typeof API_ACTION_NAME.UPDATE_PROGRESS;
payload: number;
}

export function updateProgressAction(payload: number): ApiActionUpdateProgress {
return {
type: API_ACTION_NAME.UPDATE_PROGRESS,
payload,
};
}

interface ApiActionAddToEntity {
type: typeof API_ACTION_NAME.ADD_TO_ENTITY;
payload: {
entity: string;
value: number;
};
}

export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity {
return {
type: API_ACTION_NAME.ADD_TO_ENTITY,
payload: {
entity,
value,
},
};
}

interface ApiActionDeleteEntity {
type: typeof API_ACTION_NAME.DELETE_ENTITY;
payload: string;
}

export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
return {
type: API_ACTION_NAME.DELETE_ENTITY,
payload,
};
}

interface ApiActionError {
type: typeof API_ACTION_NAME.ERROR;
payload: string;
}

export function errorAction(payload: string): ApiActionError {
return {
type: API_ACTION_NAME.ERROR,
payload,
};
}

export type ReducerStreamApiAction =
| ApiActionUpdateProgress
| ApiActionAddToEntity
| ApiActionDeleteEntity
| ApiActionError;
25 changes: 4 additions & 21 deletions examples/response_stream/common/api/reducer_stream/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@
* Side Public License, v 1.
*/

import { ReducerStreamApiAction, API_ACTION_NAME } from '.';
import { getInitialState, type StreamState } from '../stream_state';
import { type ReducerStreamApiAction, API_ACTION_NAME } from './reducer_actions';

export const UI_ACTION_NAME = {
RESET: 'reset',
} as const;
export type UiActionName = typeof UI_ACTION_NAME[keyof typeof UI_ACTION_NAME];

export interface StreamState {
errors: string[];
progress: number;
entities: Record<string, number>;
}
export const initialState: StreamState = {
errors: [],
progress: 0,
entities: {},
};

interface UiActionResetStream {
type: typeof UI_ACTION_NAME.RESET;
}
Expand All @@ -34,14 +24,7 @@ export function resetStream(): UiActionResetStream {

type UiAction = UiActionResetStream;
export type ReducerAction = ReducerStreamApiAction | UiAction;
export function reducerStreamReducer(
state: StreamState,
action: ReducerAction | ReducerAction[]
): StreamState {
if (Array.isArray(action)) {
return action.reduce(reducerStreamReducer, state);
}

export function reducerStreamReducer(state: StreamState, action: ReducerAction): StreamState {
switch (action.type) {
case API_ACTION_NAME.UPDATE_PROGRESS:
return {
Expand Down Expand Up @@ -72,7 +55,7 @@ export function reducerStreamReducer(
errors: [...state.errors, action.payload],
};
case UI_ACTION_NAME.RESET:
return initialState;
return getInitialState();
default:
return state;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const API_ACTION_NAME = {
UPDATE_PROGRESS: 'update_progress',
ADD_TO_ENTITY: 'add_to_entity',
DELETE_ENTITY: 'delete_entity',
ERROR: 'error',
} as const;
export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME];

interface ApiActionUpdateProgress {
type: typeof API_ACTION_NAME.UPDATE_PROGRESS;
payload: number;
}

export function updateProgressAction(payload: number): ApiActionUpdateProgress {
return {
type: API_ACTION_NAME.UPDATE_PROGRESS,
payload,
};
}

interface ApiActionAddToEntity {
type: typeof API_ACTION_NAME.ADD_TO_ENTITY;
payload: {
entity: string;
value: number;
};
}

export function addToEntityAction(entity: string, value: number): ApiActionAddToEntity {
return {
type: API_ACTION_NAME.ADD_TO_ENTITY,
payload: {
entity,
value,
},
};
}

interface ApiActionDeleteEntity {
type: typeof API_ACTION_NAME.DELETE_ENTITY;
payload: string;
}

export function deleteEntityAction(payload: string): ApiActionDeleteEntity {
return {
type: API_ACTION_NAME.DELETE_ENTITY,
payload,
};
}

interface ApiActionError {
type: typeof API_ACTION_NAME.ERROR;
payload: string;
}

export function errorAction(payload: string): ApiActionError {
return {
type: API_ACTION_NAME.ERROR,
payload,
};
}

export type ReducerStreamApiAction =
| ApiActionUpdateProgress
| ApiActionAddToEntity
| ApiActionDeleteEntity
| ApiActionError;
46 changes: 46 additions & 0 deletions examples/response_stream/common/api/redux_stream/data_slice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { createSlice } from '@reduxjs/toolkit';
import type { PayloadAction } from '@reduxjs/toolkit';

import { getInitialState } from '../stream_state';

export const dataSlice = createSlice({
name: 'development',
initialState: getInitialState(),
reducers: {
updateProgress: (state, action: PayloadAction<number>) => {
state.progress = action.payload;
},
addToEntity: (
state,
action: PayloadAction<{
entity: string;
value: number;
}>
) => {
const { entity, value } = action.payload;
state.entities[entity] = (state.entities[entity] || 0) + value;
},
deleteEntity: (state, action: PayloadAction<string>) => {
delete state.entities[action.payload];
},
error: (state, action: PayloadAction<string>) => {
state.errors.push(action.payload);
},
reset: () => {
return getInitialState();
},
},
});

export const { updateProgress, addToEntity, deleteEntity, error, reset } = dataSlice.actions;
export type ReduxStreamApiAction = ReturnType<
typeof dataSlice.actions[keyof typeof dataSlice.actions]
>;
37 changes: 37 additions & 0 deletions examples/response_stream/common/api/redux_stream/options_slice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { createSlice } from '@reduxjs/toolkit';
import type { PayloadAction } from '@reduxjs/toolkit';

const getInitialState = () => ({
simulateErrors: false,
compressResponse: true,
flushFix: false,
});

export const optionsSlice = createSlice({
name: 'options',
initialState: getInitialState(),
reducers: {
setSimulateErrors: (state, action: PayloadAction<boolean>) => {
state.simulateErrors = action.payload;
},
setCompressResponse: (state, action: PayloadAction<boolean>) => {
state.compressResponse = action.payload;
},
setFlushFix: (state, action: PayloadAction<boolean>) => {
state.flushFix = action.payload;
},
},
});

export const { setSimulateErrors, setCompressResponse, setFlushFix } = optionsSlice.actions;
export type ReduxOptionsApiAction = ReturnType<
typeof optionsSlice.actions[keyof typeof optionsSlice.actions]
>;
19 changes: 19 additions & 0 deletions examples/response_stream/common/api/stream_state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export interface StreamState {
errors: string[];
progress: number;
entities: Record<string, number>;
}

export const getInitialState = (): StreamState => ({
errors: [],
progress: 0,
entities: {},
});
2 changes: 1 addition & 1 deletion examples/response_stream/public/components/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export const Page: FC<PropsWithChildren<PageProps>> = ({ title = 'Untitled', chi
<>
<EuiPageTemplate.Header>
<EuiTitle size="l">
<h1>{title}</h1>
<h1 data-test-subj="responseStreamPageTitle">{title}</h1>
</EuiTitle>
</EuiPageTemplate.Header>
<EuiPageTemplate.Section>{children}</EuiPageTemplate.Section>
Expand Down
Loading

0 comments on commit 5345e34

Please sign in to comment.