Skip to content

Commit

Permalink
Add eventStream observable to publish/subscribe to SO events
Browse files Browse the repository at this point in the history
  • Loading branch information
sebelga committed Jun 3, 2022
1 parent 20cfce8 commit aae9289
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 172 deletions.
169 changes: 169 additions & 0 deletions src/core/server/event_streams/event_streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { Subject, Observable, mergeAll, map } from 'rxjs';

type EventStreamName = 'savedObjects';

export interface EventStream<T> {
publish: (event: T) => void;
stream$: Observable<T>;
}

function getIdGenerator() {
let lastId = 0;

return function getNextUniqueId() {
lastId += 1;
return lastId;
};
}

const eventStreamsFactory = () => {
const mainStream$$: Subject<Observable<any>> = new Subject();
const combinedStream$ = mainStream$$.pipe(mergeAll());
const subscriptions: {
[stream in EventStreamName]?: {
[eventType: string]: {
[subId: string]: (data: any) => void;
};
};
} = {};
const getNextUniqueId = getIdGenerator();
const eventStreams: Map<string, Subject<any>> = new Map();

combinedStream$.subscribe((event: { stream: EventStreamName; type: string; data?: object }) => {
const callbacks = subscriptions[event.stream]?.[event.type] ?? {};

Object.values(callbacks).forEach((callback) => {
callback(event.data);
});
});

/**
* Register a new event stream
*
* @param stream The stream name
* @returns A "publish" method and an Observable for the stream
*/
const registerEventStream = <T>(stream: EventStreamName): EventStream<T> => {
if (eventStreams.has(stream)) {
throw new Error(`Event stream [${stream}] is already registered.`);
}

const eventStream = new Subject<T>();
eventStreams.set(stream, eventStream);

mainStream$$.next(
eventStream.asObservable().pipe(
map((event) => ({
stream, // We add the stream name to all the event
...event,
}))
)
);

return {
publish: (event: T) => {
eventStream.next(event);
},
stream$: eventStream.asObservable(),
};
};

/**
* Retrieve a stream Observable
* @param stream The stream name
* @returns An Observable to subscribe to
*/
const getEventStream$ = <T>(stream: EventStreamName) => {
const eventStream = eventStreams.get(stream);

if (!eventStream) {
return;
}

return (eventStream as Subject<T>).asObservable();
};

/**
* Publish an event to a stream
*
* @param stream The stream to publish the event to
* @param event The event to publish
*
* @example
```
eventStreams.publish<SavedObjectEvents>('savedObjects', {
type: 'pre:create',
data: {
baz: 123,
},
});
```
*/
const publish = <T>(stream: EventStreamName, event: T) => {
eventStreams.get(stream)?.next(event);
};

/**
* Subscribe to a single event on an event stream
*
* @example
* ```js
eventStreams.subscribe<PreCreateEvent>('savedObjects', 'pre:create', (data) => {
// Do something with data
});
* ```
*
* @param stream The stream name
* @param eventType The event type
* @param handler The handler to call with possible data
* @returns Handler to unsubscribe from the stream event
*/
const subscribe = <T extends { type: string; data?: unknown }>(
stream: EventStreamName,
eventType: T['type'],
handler: (data: T['data']) => void
) => {
if (!subscriptions[stream]) {
subscriptions[stream] = {};
}

if (!subscriptions[stream]![eventType]) {
subscriptions[stream]![eventType] = {};
}

const id = getNextUniqueId();

subscriptions[stream]![eventType][id] = handler;

return {
unsubscribe: () => {
delete subscriptions[stream]![eventType][id];
if (Object.keys(subscriptions[stream]![eventType]).length === 0) {
delete subscriptions[stream]![eventType];
}
if (Object.keys(subscriptions[stream]!).length === 0) {
delete subscriptions[stream];
}
},
};
};

return {
registerEventStream,
getEventStream$,
publish,
subscribe,
};
};

const eventStreams = eventStreamsFactory();

export { eventStreams };
11 changes: 11 additions & 0 deletions src/core/server/event_streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export { eventStreams } from './event_streams';

export type { EventStream } from './event_streams';
6 changes: 6 additions & 0 deletions src/core/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ export type {
SavedObjectsImportWarning,
SavedObjectsValidationMap,
SavedObjectsValidationSpec,
PreGetEvent,
PostGetEvent,
PreCreateEvent,
PostCreateEvent,
SavedObjectEventStream,
SavedObjectEventsTypes,
} from './saved_objects';

export type {
Expand Down
1 change: 1 addition & 0 deletions src/core/server/plugins/plugin_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export function createPluginSetupContext<TPlugin, TPluginDependencies>(
getKibanaIndex: deps.savedObjects.getKibanaIndex,
pre: deps.savedObjects.pre,
post: deps.savedObjects.post,
eventStream$: deps.savedObjects.eventStream$,
},
status: {
core$: deps.status.core$,
Expand Down
8 changes: 8 additions & 0 deletions src/core/server/saved_objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,11 @@ export type { SavedObjectsValidationMap, SavedObjectsValidationSpec } from './va
export { savedObjectsConfig, savedObjectsMigrationConfig } from './saved_objects_config';
export { SavedObjectTypeRegistry } from './saved_objects_type_registry';
export type { ISavedObjectTypeRegistry } from './saved_objects_type_registry';
export type {
SavedObjectEventsTypes,
PreGetEvent,
PostGetEvent,
PreCreateEvent,
PostCreateEvent,
SavedObjectEventStream,
} from './saved_objects_stream_events';
22 changes: 11 additions & 11 deletions src/core/server/saved_objects/saved_objects_hooks_registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
* Side Public License, v 1.
*/

import { SavedObject } from '../../types';
import { SavedObjectsClient, SavedObjectsBulkCreateObject } from '.';
import { SavedObjectsBulkCreateObject } from '.';
import {
PreGetEvent,
PostGetEvent,
PreCreateEvent,
PostCreateEvent,
} from './saved_objects_stream_events';

/** Handler to execute before fetching one or multiple saved object(s) */
type PreGetHook = (...args: Parameters<SavedObjectsClient['bulkGet']>) => Promise<void>;
type PreGetHook = (arg: PreGetEvent['data']) => Promise<void>;
/** Handler to execute after fetching one or multiple saved object(s) */
type PostGetHook = (
objects: SavedObject[],
options: Parameters<SavedObjectsClient['bulkGet']>[1]
) => Promise<void>;
type PostGetHook = (arg: PostGetEvent['data']) => Promise<void>;

/** Handler to execute before creating one or multiple saved object(s) */
type PreCreateHook = (
...args: Parameters<SavedObjectsClient['bulkCreate']>
args: PreCreateEvent['data']
) => Promise<SavedObjectsBulkCreateObject[] | undefined>;
/** Handler to execute after creating one or multiple saved object(s) */
type PostCreateHook = (
objects: Array<Awaited<ReturnType<SavedObjectsClient['create']>>>
) => Promise<void>;
type PostCreateHook = (arg: PostCreateEvent['data']) => Promise<void>;

/** Map of hooks to execute **before** API methods of the SavedObjectsClient */
export interface PreHooks {
Expand Down
8 changes: 8 additions & 0 deletions src/core/server/saved_objects/saved_objects_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ import { registerRoutes } from './routes';
import { ServiceStatus } from '../status';
import { calculateStatus$ } from './status';
import { registerCoreObjectTypes } from './object_types';
import { SavedObjectEvents } from './saved_objects_stream_events';
import { getSavedObjectsDeprecationsProvider } from './deprecations';
import { DocLinksServiceStart } from '../doc_links';
import { eventStreams } from '../event_streams';

const kibanaIndex = '.kibana';

Expand Down Expand Up @@ -169,6 +171,8 @@ export interface SavedObjectsServiceSetup {
pre: SavedObjectsHooksRegistry['pre'];
/** Register a "post" hook to execute after a SavedObject clients API method ("get", "create"...) */
post: SavedObjectsHooksRegistry['post'];
/** Observable of saved object events */
eventStream$: Observable<SavedObjectEvents>;
}

/**
Expand Down Expand Up @@ -308,6 +312,8 @@ export class SavedObjectsService
private migrator$ = new Subject<IKibanaMigrator>();
private typeRegistry = new SavedObjectTypeRegistry();
private hooksRegistry = new SavedObjectsHooksRegistry();
private savedObjectEventStream =
eventStreams.registerEventStream<SavedObjectEvents>('savedObjects');
private started = false;

constructor(private readonly coreContext: CoreContext) {
Expand Down Expand Up @@ -388,6 +394,7 @@ export class SavedObjectsService
getKibanaIndex: () => kibanaIndex,
pre: this.hooksRegistry.pre.bind(this.hooksRegistry),
post: this.hooksRegistry.post.bind(this.hooksRegistry),
eventStream$: this.savedObjectEventStream.stream$,
};
}

Expand Down Expand Up @@ -467,6 +474,7 @@ export class SavedObjectsService
migrator,
this.typeRegistry,
{ preHooks, postHooks },
this.savedObjectEventStream,
kibanaIndex,
esClient,
this.logger.get('repository'),
Expand Down
58 changes: 58 additions & 0 deletions src/core/server/saved_objects/saved_objects_stream_events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { EventStream } from '../event_streams';
import {
SavedObjectsClient,
SavedObjectsBulkCreateObject,
SavedObjectsBulkGetObject,
SavedObjectsCreateOptions,
SavedObjectsBulkResponse,
} from '.';

import { SavedObject, SavedObjectsBaseOptions } from '../types';

export type SavedObjectEventsTypes = 'pre:get' | 'post:get' | 'pre:create' | 'post:create';

interface BaseSavedObjectEvents {
type: SavedObjectEventsTypes;
}

export interface PreGetEvent extends BaseSavedObjectEvents {
type: 'pre:get';
data: {
objects?: SavedObjectsBulkGetObject[];
options?: SavedObjectsBaseOptions;
};
}

export interface PostGetEvent<T = unknown> extends BaseSavedObjectEvents {
type: 'post:get';
data: {
objects: Array<SavedObject<T>>;
options: Parameters<SavedObjectsClient['bulkGet']>[1];
};
}

export interface PreCreateEvent<T = unknown> extends BaseSavedObjectEvents {
type: 'pre:create';
data: {
objects: Array<SavedObjectsBulkCreateObject<T>>;
options?: SavedObjectsCreateOptions;
};
}

export interface PostCreateEvent extends BaseSavedObjectEvents {
type: 'post:create';
data: {
objects: SavedObjectsBulkResponse['saved_objects'];
};
}

export type SavedObjectEvents = PreGetEvent | PostGetEvent | PreCreateEvent | PostCreateEvent;

export type SavedObjectEventStream = EventStream<SavedObjectEvents>;
Loading

0 comments on commit aae9289

Please sign in to comment.