From aae928993b1b3de760d7c618ea4b087bf404fea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=CC=81bastien=20Loix?= Date: Fri, 3 Jun 2022 07:39:15 +0100 Subject: [PATCH] Add eventStream observable to publish/subscribe to SO events --- .../server/event_streams/event_streams.ts | 169 +++++++++++ src/core/server/event_streams/index.ts | 11 + src/core/server/index.ts | 6 + src/core/server/plugins/plugin_context.ts | 1 + src/core/server/saved_objects/index.ts | 8 + .../saved_objects_hooks_registry.ts | 22 +- .../saved_objects/saved_objects_service.ts | 8 + .../saved_objects_stream_events.ts | 58 ++++ .../saved_objects/service/lib/repository.ts | 280 +++++++++--------- .../service/saved_objects_client.ts | 4 +- .../server/services/user_content_service.tsx | 67 +++-- 11 files changed, 462 insertions(+), 172 deletions(-) create mode 100644 src/core/server/event_streams/event_streams.ts create mode 100644 src/core/server/event_streams/index.ts create mode 100644 src/core/server/saved_objects/saved_objects_stream_events.ts diff --git a/src/core/server/event_streams/event_streams.ts b/src/core/server/event_streams/event_streams.ts new file mode 100644 index 0000000000000..52acefa2658d9 --- /dev/null +++ b/src/core/server/event_streams/event_streams.ts @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Subject, Observable, mergeAll, map } from 'rxjs'; + +type EventStreamName = 'savedObjects'; + +export interface EventStream { + publish: (event: T) => void; + stream$: Observable; +} + +function getIdGenerator() { + let lastId = 0; + + return function getNextUniqueId() { + lastId += 1; + return lastId; + }; +} + +const eventStreamsFactory = () => { + const mainStream$$: Subject> = new Subject(); + const combinedStream$ = mainStream$$.pipe(mergeAll()); + const subscriptions: { + [stream in EventStreamName]?: { + [eventType: string]: { + [subId: string]: (data: any) => void; + }; + }; + } = {}; + const getNextUniqueId = getIdGenerator(); + const eventStreams: Map> = new Map(); + + combinedStream$.subscribe((event: { stream: EventStreamName; type: string; data?: object }) => { + const callbacks = subscriptions[event.stream]?.[event.type] ?? {}; + + Object.values(callbacks).forEach((callback) => { + callback(event.data); + }); + }); + + /** + * Register a new event stream + * + * @param stream The stream name + * @returns A "publish" method and an Observable for the stream + */ + const registerEventStream = (stream: EventStreamName): EventStream => { + if (eventStreams.has(stream)) { + throw new Error(`Event stream [${stream}] is already registered.`); + } + + const eventStream = new Subject(); + eventStreams.set(stream, eventStream); + + mainStream$$.next( + eventStream.asObservable().pipe( + map((event) => ({ + stream, // We add the stream name to all the event + ...event, + })) + ) + ); + + return { + publish: (event: T) => { + eventStream.next(event); + }, + stream$: eventStream.asObservable(), + }; + }; + + /** + * Retrieve a stream Observable + * @param stream The stream name + * @returns An Observable to subscribe to + */ + const getEventStream$ = (stream: EventStreamName) => { + const eventStream = eventStreams.get(stream); + + if (!eventStream) { + return; + } + + return (eventStream as Subject).asObservable(); + }; + + /** + * Publish an event to a stream + * + * @param stream The stream to publish the event to + * @param event The event to publish + * + * @example + ``` + eventStreams.publish('savedObjects', { + type: 'pre:create', + data: { + baz: 123, + }, + }); + ``` + */ + const publish = (stream: EventStreamName, event: T) => { + eventStreams.get(stream)?.next(event); + }; + + /** + * Subscribe to a single event on an event stream + * + * @example + * ```js + eventStreams.subscribe('savedObjects', 'pre:create', (data) => { + // Do something with data + }); + * ``` + * + * @param stream The stream name + * @param eventType The event type + * @param handler The handler to call with possible data + * @returns Handler to unsubscribe from the stream event + */ + const subscribe = ( + stream: EventStreamName, + eventType: T['type'], + handler: (data: T['data']) => void + ) => { + if (!subscriptions[stream]) { + subscriptions[stream] = {}; + } + + if (!subscriptions[stream]![eventType]) { + subscriptions[stream]![eventType] = {}; + } + + const id = getNextUniqueId(); + + subscriptions[stream]![eventType][id] = handler; + + return { + unsubscribe: () => { + delete subscriptions[stream]![eventType][id]; + if (Object.keys(subscriptions[stream]![eventType]).length === 0) { + delete subscriptions[stream]![eventType]; + } + if (Object.keys(subscriptions[stream]!).length === 0) { + delete subscriptions[stream]; + } + }, + }; + }; + + return { + registerEventStream, + getEventStream$, + publish, + subscribe, + }; +}; + +const eventStreams = eventStreamsFactory(); + +export { eventStreams }; diff --git a/src/core/server/event_streams/index.ts b/src/core/server/event_streams/index.ts new file mode 100644 index 0000000000000..4e69f6d550749 --- /dev/null +++ b/src/core/server/event_streams/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export { eventStreams } from './event_streams'; + +export type { EventStream } from './event_streams'; diff --git a/src/core/server/index.ts b/src/core/server/index.ts index d7669b70fde93..649da5b126366 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -382,6 +382,12 @@ export type { SavedObjectsImportWarning, SavedObjectsValidationMap, SavedObjectsValidationSpec, + PreGetEvent, + PostGetEvent, + PreCreateEvent, + PostCreateEvent, + SavedObjectEventStream, + SavedObjectEventsTypes, } from './saved_objects'; export type { diff --git a/src/core/server/plugins/plugin_context.ts b/src/core/server/plugins/plugin_context.ts index 2a2936f311ed9..5a3beb1bc4c17 100644 --- a/src/core/server/plugins/plugin_context.ts +++ b/src/core/server/plugins/plugin_context.ts @@ -217,6 +217,7 @@ export function createPluginSetupContext( getKibanaIndex: deps.savedObjects.getKibanaIndex, pre: deps.savedObjects.pre, post: deps.savedObjects.post, + eventStream$: deps.savedObjects.eventStream$, }, status: { core$: deps.status.core$, diff --git a/src/core/server/saved_objects/index.ts b/src/core/server/saved_objects/index.ts index 73a20c0402bbe..cc01a50b0401e 100644 --- a/src/core/server/saved_objects/index.ts +++ b/src/core/server/saved_objects/index.ts @@ -99,3 +99,11 @@ export type { SavedObjectsValidationMap, SavedObjectsValidationSpec } from './va export { savedObjectsConfig, savedObjectsMigrationConfig } from './saved_objects_config'; export { SavedObjectTypeRegistry } from './saved_objects_type_registry'; export type { ISavedObjectTypeRegistry } from './saved_objects_type_registry'; +export type { + SavedObjectEventsTypes, + PreGetEvent, + PostGetEvent, + PreCreateEvent, + PostCreateEvent, + SavedObjectEventStream, +} from './saved_objects_stream_events'; diff --git a/src/core/server/saved_objects/saved_objects_hooks_registry.ts b/src/core/server/saved_objects/saved_objects_hooks_registry.ts index 1bd75edcb701a..35c17c53cedb4 100644 --- a/src/core/server/saved_objects/saved_objects_hooks_registry.ts +++ b/src/core/server/saved_objects/saved_objects_hooks_registry.ts @@ -6,25 +6,25 @@ * Side Public License, v 1. */ -import { SavedObject } from '../../types'; -import { SavedObjectsClient, SavedObjectsBulkCreateObject } from '.'; +import { SavedObjectsBulkCreateObject } from '.'; +import { + PreGetEvent, + PostGetEvent, + PreCreateEvent, + PostCreateEvent, +} from './saved_objects_stream_events'; /** Handler to execute before fetching one or multiple saved object(s) */ -type PreGetHook = (...args: Parameters) => Promise; +type PreGetHook = (arg: PreGetEvent['data']) => Promise; /** Handler to execute after fetching one or multiple saved object(s) */ -type PostGetHook = ( - objects: SavedObject[], - options: Parameters[1] -) => Promise; +type PostGetHook = (arg: PostGetEvent['data']) => Promise; /** Handler to execute before creating one or multiple saved object(s) */ type PreCreateHook = ( - ...args: Parameters + args: PreCreateEvent['data'] ) => Promise; /** Handler to execute after creating one or multiple saved object(s) */ -type PostCreateHook = ( - objects: Array>> -) => Promise; +type PostCreateHook = (arg: PostCreateEvent['data']) => Promise; /** Map of hooks to execute **before** API methods of the SavedObjectsClient */ export interface PreHooks { diff --git a/src/core/server/saved_objects/saved_objects_service.ts b/src/core/server/saved_objects/saved_objects_service.ts index 43ebd52739657..229a72a09b07a 100644 --- a/src/core/server/saved_objects/saved_objects_service.ts +++ b/src/core/server/saved_objects/saved_objects_service.ts @@ -50,8 +50,10 @@ import { registerRoutes } from './routes'; import { ServiceStatus } from '../status'; import { calculateStatus$ } from './status'; import { registerCoreObjectTypes } from './object_types'; +import { SavedObjectEvents } from './saved_objects_stream_events'; import { getSavedObjectsDeprecationsProvider } from './deprecations'; import { DocLinksServiceStart } from '../doc_links'; +import { eventStreams } from '../event_streams'; const kibanaIndex = '.kibana'; @@ -169,6 +171,8 @@ export interface SavedObjectsServiceSetup { pre: SavedObjectsHooksRegistry['pre']; /** Register a "post" hook to execute after a SavedObject clients API method ("get", "create"...) */ post: SavedObjectsHooksRegistry['post']; + /** Observable of saved object events */ + eventStream$: Observable; } /** @@ -308,6 +312,8 @@ export class SavedObjectsService private migrator$ = new Subject(); private typeRegistry = new SavedObjectTypeRegistry(); private hooksRegistry = new SavedObjectsHooksRegistry(); + private savedObjectEventStream = + eventStreams.registerEventStream('savedObjects'); private started = false; constructor(private readonly coreContext: CoreContext) { @@ -388,6 +394,7 @@ export class SavedObjectsService getKibanaIndex: () => kibanaIndex, pre: this.hooksRegistry.pre.bind(this.hooksRegistry), post: this.hooksRegistry.post.bind(this.hooksRegistry), + eventStream$: this.savedObjectEventStream.stream$, }; } @@ -467,6 +474,7 @@ export class SavedObjectsService migrator, this.typeRegistry, { preHooks, postHooks }, + this.savedObjectEventStream, kibanaIndex, esClient, this.logger.get('repository'), diff --git a/src/core/server/saved_objects/saved_objects_stream_events.ts b/src/core/server/saved_objects/saved_objects_stream_events.ts new file mode 100644 index 0000000000000..79408bc60ca62 --- /dev/null +++ b/src/core/server/saved_objects/saved_objects_stream_events.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import { EventStream } from '../event_streams'; +import { + SavedObjectsClient, + SavedObjectsBulkCreateObject, + SavedObjectsBulkGetObject, + SavedObjectsCreateOptions, + SavedObjectsBulkResponse, +} from '.'; + +import { SavedObject, SavedObjectsBaseOptions } from '../types'; + +export type SavedObjectEventsTypes = 'pre:get' | 'post:get' | 'pre:create' | 'post:create'; + +interface BaseSavedObjectEvents { + type: SavedObjectEventsTypes; +} + +export interface PreGetEvent extends BaseSavedObjectEvents { + type: 'pre:get'; + data: { + objects?: SavedObjectsBulkGetObject[]; + options?: SavedObjectsBaseOptions; + }; +} + +export interface PostGetEvent extends BaseSavedObjectEvents { + type: 'post:get'; + data: { + objects: Array>; + options: Parameters[1]; + }; +} + +export interface PreCreateEvent extends BaseSavedObjectEvents { + type: 'pre:create'; + data: { + objects: Array>; + options?: SavedObjectsCreateOptions; + }; +} + +export interface PostCreateEvent extends BaseSavedObjectEvents { + type: 'post:create'; + data: { + objects: SavedObjectsBulkResponse['saved_objects']; + }; +} + +export type SavedObjectEvents = PreGetEvent | PostGetEvent | PreCreateEvent | PostCreateEvent; + +export type SavedObjectEventStream = EventStream; diff --git a/src/core/server/saved_objects/service/lib/repository.ts b/src/core/server/saved_objects/service/lib/repository.ts index f2060f63b9441..43f89da61413f 100644 --- a/src/core/server/saved_objects/service/lib/repository.ts +++ b/src/core/server/saved_objects/service/lib/repository.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { EMPTY } from 'rxjs'; import { omit, isObject } from 'lodash'; import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import * as esKuery from '@kbn/es-query'; @@ -67,6 +68,7 @@ import { import { SavedObjectsTypeValidator } from '../../validation'; import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry'; import { ISavedObjectsHooks, PreHooks } from '../../saved_objects_hooks_registry'; +import type { SavedObjectEventStream } from '../../saved_objects_stream_events'; import { internalBulkResolve, InternalBulkResolveError } from './internal_bulk_resolve'; import { validateConvertFilterToKueryNode } from './filter_utils'; import { validateAndConvertAggregations } from './aggregations'; @@ -114,6 +116,7 @@ export interface SavedObjectsRepositoryOptions { client: ElasticsearchClient; typeRegistry: ISavedObjectTypeRegistry; hooks: ISavedObjectsHooks; + savedObjectEventStream: SavedObjectEventStream; serializer: SavedObjectsSerializer; migrator: IKibanaMigrator; allowedTypes: string[]; @@ -214,6 +217,7 @@ export class SavedObjectsRepository { private _mappings: IndexMapping; private _registry: ISavedObjectTypeRegistry; private _hooks: ISavedObjectsHooks; + private _savedObjectEventStream: SavedObjectEventStream; private _allowedTypes: string[]; private readonly client: RepositoryEsClient; private _serializer: SavedObjectsSerializer; @@ -231,6 +235,7 @@ export class SavedObjectsRepository { migrator: IKibanaMigrator, typeRegistry: ISavedObjectTypeRegistry, hooks: ISavedObjectsHooks, + savedObjectEventStream: SavedObjectEventStream, indexName: string, client: ElasticsearchClient, logger: Logger, @@ -257,6 +262,7 @@ export class SavedObjectsRepository { mappings, typeRegistry, hooks, + savedObjectEventStream, serializer, allowedTypes, client, @@ -280,6 +286,10 @@ export class SavedObjectsRepository { create: [], }, }, + savedObjectEventStream = { + publish: () => undefined, + stream$: EMPTY, + }, serializer, migrator, allowedTypes = [], @@ -298,6 +308,7 @@ export class SavedObjectsRepository { this._mappings = mappings; this._registry = typeRegistry; this._hooks = hooks; + this._savedObjectEventStream = savedObjectEventStream; this.client = createRepositoryEsClient(client); if (allowedTypes.length === 0) { throw new Error('Empty or missing types for saved object repository!'); @@ -344,9 +355,7 @@ export class SavedObjectsRepository { throw SavedObjectsErrorHelpers.createUnsupportedTypeError(type); } - // Run all the "Pre" create hooks which might update the attributes - const [{ attributes: updatedAttribues }] = await this.runPreCreateHooks( - [...this._hooks.preHooks.create], + const [{ attributes: updatedAttribues }] = await this.onPreCreate( [ { ...options, @@ -355,10 +364,7 @@ export class SavedObjectsRepository { }, ], options - ).catch((e) => { - this._logger.error(e); - return [{ attributes }]; - }); + ); const time = getCurrentTime(); let savedObjectNamespace: string | undefined; @@ -438,9 +444,7 @@ export class SavedObjectsRepository { ...body, }); - await this.runPostCreateHooks([result], options).catch((e) => { - this._logger.error(e); - }); + await this.onPostCreate([result], options); return result; } @@ -458,14 +462,7 @@ export class SavedObjectsRepository { objects: Array>, options: SavedObjectsCreateOptions = {} ): Promise> { - const updatedObjects = await this.runPreCreateHooks( - [...this._hooks.preHooks.create], - objects, - options - ).catch((e) => { - this._logger.error(e); - return objects; - }); + const updatedObjects = await this.onPreCreate(objects, options); const { overwrite = false, refresh = DEFAULT_REFRESH_SETTING } = options; const namespace = normalizeNamespace(options.namespace); @@ -643,28 +640,32 @@ export class SavedObjectsRepository { }) : undefined; - return { - saved_objects: expectedBulkResults.map((expectedResult) => { - if (isLeft(expectedResult)) { - return expectedResult.value as any; - } + const savedObjects = expectedBulkResults.map((expectedResult) => { + if (isLeft(expectedResult)) { + return expectedResult.value; + } - const { requestedId, rawMigratedDoc, esRequestIndex } = expectedResult.value; - const rawResponse = Object.values(bulkResponse?.items[esRequestIndex] ?? {})[0] as any; + const { requestedId, rawMigratedDoc, esRequestIndex } = expectedResult.value; + const rawResponse = Object.values(bulkResponse?.items[esRequestIndex] ?? {})[0] as any; - const error = getBulkOperationError(rawMigratedDoc._source.type, requestedId, rawResponse); - if (error) { - return { type: rawMigratedDoc._source.type, id: requestedId, error }; - } + const error = getBulkOperationError(rawMigratedDoc._source.type, requestedId, rawResponse); + if (error) { + return { type: rawMigratedDoc._source.type, id: requestedId, error }; + } - // When method == 'index' the bulkResponse doesn't include the indexed - // _source so we return rawMigratedDoc but have to spread the latest - // _seq_no and _primary_term values from the rawResponse. - return this._rawToSavedObject({ - ...rawMigratedDoc, - ...{ _seq_no: rawResponse._seq_no, _primary_term: rawResponse._primary_term }, - }); - }), + // When method == 'index' the bulkResponse doesn't include the indexed + // _source so we return rawMigratedDoc but have to spread the latest + // _seq_no and _primary_term values from the rawResponse. + return this._rawToSavedObject({ + ...rawMigratedDoc, + ...{ _seq_no: rawResponse._seq_no, _primary_term: rawResponse._primary_term }, + }); + }); + + await this.onPostCreate(savedObjects, options); + + return { + saved_objects: savedObjects, }; } @@ -1123,30 +1124,15 @@ export class SavedObjectsRepository { const { objectsToRegisterEventsFor, objectsToRegisterEventsForById } = this.getObjectsToRegisterEventsFor(objects); - if (options.eventMetadata?.registerEvent !== false) { - for (const preGetHook of this._hooks.preHooks.get) { - try { - await preGetHook(objectsToRegisterEventsFor, options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPreGet(objectsToRegisterEventsFor, options); + const namespace = normalizeNamespace(options.namespace); const onResponse = async (result: Array>) => { - if (options.eventMetadata?.registerEvent !== false) { - for (const postGetHook of this._hooks.postHooks.get) { - try { - await postGetHook( - result.filter((obj) => objectsToRegisterEventsForById[obj.id]), - options - ); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPostGet( + result.filter((obj) => objectsToRegisterEventsForById[obj.id]), + options + ); return { saved_objects: result }; }; @@ -1270,15 +1256,7 @@ export class SavedObjectsRepository { const { objectsToRegisterEventsFor, objectsToRegisterEventsForById } = this.getObjectsToRegisterEventsFor(objects); - if (options.eventMetadata?.registerEvent !== false) { - for (const preGetHook of this._hooks.preHooks.get) { - try { - await preGetHook(objectsToRegisterEventsFor, options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPreGet(objectsToRegisterEventsFor, options); const { resolved_objects: bulkResults } = await internalBulkResolve({ registry: this._registry, @@ -1303,20 +1281,13 @@ export class SavedObjectsRepository { return result as SavedObjectsResolveResponse; }); - if (options.eventMetadata?.registerEvent !== false) { - for (const postGetHook of this._hooks.postHooks.get) { - try { - await postGetHook( - resolvedObjects - .filter((obj) => objectsToRegisterEventsForById[obj.saved_object.id]) - .map(({ saved_object: savedObject }) => savedObject), - options - ); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPostGet( + resolvedObjects + .filter((obj) => objectsToRegisterEventsForById[obj.saved_object.id]) + .map(({ saved_object: savedObject }) => savedObject), + options + ); + return { resolved_objects: resolvedObjects }; } @@ -1338,15 +1309,7 @@ export class SavedObjectsRepository { throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id); } - if (options.eventMetadata?.registerEvent) { - for (const preGetHook of this._hooks.preHooks.get) { - try { - await preGetHook([{ id, type }], options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPreGet([{ id, type }], options); const namespace = normalizeNamespace(options.namespace); const { body, statusCode, headers } = await this.client.get( @@ -1373,15 +1336,7 @@ export class SavedObjectsRepository { const result = getSavedObjectFromSource(this._registry, type, id, body); - if (options.eventMetadata?.registerEvent) { - for (const postGetHook of this._hooks.postHooks.get) { - try { - await postGetHook([result], options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPostGet([result], options); return result; } @@ -1400,15 +1355,7 @@ export class SavedObjectsRepository { id: string, options: SavedObjectsBaseOptions = {} ): Promise> { - if (options.eventMetadata?.registerEvent) { - for (const preGetHook of this._hooks.preHooks.get) { - try { - await preGetHook([{ id, type }], options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPreGet([{ id, type }], options); const { resolved_objects: bulkResults } = await internalBulkResolve({ registry: this._registry, @@ -1425,15 +1372,7 @@ export class SavedObjectsRepository { throw (result as InternalBulkResolveError).error; } - if (options.eventMetadata?.registerEvent) { - for (const postGetHook of this._hooks.postHooks.get) { - try { - await postGetHook([(result as SavedObjectsResolveResponse).saved_object], options); - } catch (e) { - this._logger.error(e); - } - } - } + await this.onPostGet([(result as SavedObjectsResolveResponse).saved_object], options); return result as SavedObjectsResolveResponse; } @@ -2513,33 +2452,108 @@ export class SavedObjectsRepository { return { objectsToRegisterEventsFor, objectsToRegisterEventsForById }; }; - private runPreCreateHooks = async ( - hooks: PreHooks['create'], - updatedObjects: Array>, + private onPreGet = async ( + objects: SavedObjectsBulkGetObject[], + options: SavedObjectsBaseOptions + ) => { + if (options.eventMetadata?.registerEvent !== false) { + if (objects.length > 0) { + const preEventData = { objects, options }; + + for (const preGetHook of this._hooks.preHooks.get) { + try { + await preGetHook(preEventData); + } catch (e) { + this._logger.error(e); + } + } + + // Emit event on the stream + this._savedObjectEventStream.publish({ type: 'pre:get', data: preEventData }); + } + } + }; + + private onPostGet = async ( + objects: Array>, + options: SavedObjectsBaseOptions = {} + ) => { + if (options.eventMetadata?.registerEvent !== false) { + const postEventData = { + objects, + options, + }; + + for (const postGetHook of this._hooks.postHooks.get) { + try { + await postGetHook(postEventData); + } catch (e) { + this._logger.error(e); + } + } + + this._savedObjectEventStream.publish({ type: 'post:get', data: postEventData }); + } + }; + + private onPreCreate = async ( + objects: Array>, options: SavedObjectsCreateOptions = {} ): Promise>> => { - if (hooks.length === 0 || !options.eventMetadata?.registerEvent) { - return updatedObjects; + if (!options.eventMetadata?.registerEvent) { + return objects; } - // Recursively call the the "pre" create hooks passing the - // updated saved objects - const [preCreateHook, ...restOfHooks] = hooks; - const result = (await preCreateHook(updatedObjects)) as Array>; - if (restOfHooks.length) { - return await this.runPreCreateHooks(restOfHooks, result!); - } - return result!; + const runPreCreateHooks = async ( + hooks: PreHooks['create'], + updatedObjects: Array> + ): Promise>> => { + if (hooks.length === 0) { + return updatedObjects; + } + + const [preCreateHook, ...restOfHooks] = hooks; + const result = (await preCreateHook({ objects: updatedObjects, options }).catch((e) => { + this._logger.error(e); + return updatedObjects; + })) as Array>; + + if (restOfHooks.length) { + // Recursively call the the "pre" create hooks + return await runPreCreateHooks(restOfHooks, result!); + } + + return result!; + }; + + const updatedObjects = await runPreCreateHooks([...this._hooks.preHooks.create], objects); + + this._savedObjectEventStream.publish({ + type: 'pre:create', + data: { + objects: updatedObjects, + options, + }, + }); + + return updatedObjects; }; - private runPostCreateHooks = async ( - objects: SavedObject[], + private onPostCreate = async ( + objects: SavedObjectsBulkResponse['saved_objects'], options: SavedObjectsCreateOptions ) => { if (options.eventMetadata?.registerEvent) { for (const postCreateHook of this._hooks.postHooks.create) { - await postCreateHook(objects); + await postCreateHook({ objects }).catch((e) => { + this._logger.error(e); + }); } + + this._savedObjectEventStream.publish({ + type: 'post:create', + data: { objects }, + }); } }; } diff --git a/src/core/server/saved_objects/service/saved_objects_client.ts b/src/core/server/saved_objects/service/saved_objects_client.ts index 4e3e20b1154ca..a573c63aa6596 100644 --- a/src/core/server/saved_objects/service/saved_objects_client.ts +++ b/src/core/server/saved_objects/service/saved_objects_client.ts @@ -135,7 +135,7 @@ export interface SavedObjectsBulkUpdateObject * @public */ export interface SavedObjectsBulkResponse { - saved_objects: Array>; + saved_objects: Array | { type: string; id?: string; error: Payload }>; } /** @@ -304,7 +304,7 @@ export interface SavedObjectsBulkGetObject { * @public */ export interface SavedObjectsBulkResponse { - saved_objects: Array>; + saved_objects: Array | { type: string; id?: string; error: Payload }>; } /** diff --git a/src/plugins/user_content/server/services/user_content_service.tsx b/src/plugins/user_content/server/services/user_content_service.tsx index 785bb40807e8a..106a4c791096e 100644 --- a/src/plugins/user_content/server/services/user_content_service.tsx +++ b/src/plugins/user_content/server/services/user_content_service.tsx @@ -11,6 +11,8 @@ import { SavedObjectsServiceSetup, SavedObjectsType, SavedObjectsTypeMappingDefinition, + PostGetEvent, + PostCreateEvent, } from '@kbn/core/server'; import { defaultUserContentAttributes, userContentCommonMappings } from '../../common'; @@ -44,6 +46,14 @@ export class UserContentService { this.metadataEventsService = metadataEventService; this.registerSavedObjectsHooks(); + + this.savedObjects.eventStream$.subscribe((event) => { + if (event.type === 'post:get') { + this.onPostGetSavedObject(event.data); + } else if (event.type === 'post:create') { + this.onPostCreateSavedObject(event.data); + } + }); } /** @@ -74,54 +84,59 @@ export class UserContentService { } private registerSavedObjectsHooks() { - // Hook whenever user generated saved object(s) are accessed - this.savedObjects!.post('get', async (objects) => { - const registeredContents = this.userContentTypes; - const filteredToContentType = objects.filter(({ type }) => registeredContents.includes(type)); - - if (filteredToContentType.length > 0) { - this.metadataEventsService?.bulkRegisterEvents( - filteredToContentType.map(({ id: soId, type }) => ({ - type: 'viewed:kibana', - data: { - so_id: soId, - so_type: type, - }, - })) - ); - } - }); - // Hook before saving a user generated saved object - this.savedObjects!.pre('create', async (objects) => { + this.savedObjects!.pre('create', async ({ objects }) => { if (!objects) { return; } + + // Add common attributes to all user generated saved objects const updatedObject = objects.map((object) => { const { type, attributes } = object; if (!this.contents.has(type)) { return object; } - // Add common attributes to all user generated saved objects + const updatedAttributes = this.addDefaultUserContentAttributes(attributes as object); return { ...object, attributes: updatedAttributes }; }); + return updatedObject; }); + } + + private onPostGetSavedObject = (data: PostGetEvent['data']) => { + const registeredContents = this.userContentTypes; + const filteredToContentType = data.objects.filter(({ type }) => + registeredContents.includes(type) + ); - // Hook after saving a user generated saved object - this.savedObjects!.post('create', async (objects) => { + if (filteredToContentType.length > 0) { this.metadataEventsService?.bulkRegisterEvents( - objects.map(({ id: soId, type }) => ({ - type: 'created:kibana', + filteredToContentType.map(({ id: soId, type }) => ({ + type: 'viewed:kibana', data: { so_id: soId, so_type: type, }, })) ); - }); - } + } + }; + + private onPostCreateSavedObject = (data: PostCreateEvent['data']) => { + this.metadataEventsService?.bulkRegisterEvents( + data.objects + .filter(({ id }) => id !== undefined) + .map(({ id: soId, type }) => ({ + type: 'created:kibana', + data: { + so_id: soId!, + so_type: type, + }, + })) + ); + }; /** * Merge user content common default attributes to the provided attibutes.