From 6a0b2fd717e53c177fcc9fe3bad02c0f726e5d78 Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Thu, 29 Sep 2022 08:38:58 +0200 Subject: [PATCH] [Expressions] Fix expressions chain invocation not to unsubscribe on error (#142105) --- .../common/execution/execution.test.ts | 57 +++++++ .../expressions/common/execution/execution.ts | 145 +++++++++--------- 2 files changed, 129 insertions(+), 73 deletions(-) diff --git a/src/plugins/expressions/common/execution/execution.test.ts b/src/plugins/expressions/common/execution/execution.test.ts index 75a95035bb89f..a5e03084a6977 100644 --- a/src/plugins/expressions/common/execution/execution.test.ts +++ b/src/plugins/expressions/common/execution/execution.test.ts @@ -488,6 +488,63 @@ describe('Execution', () => { expect(spy.fn).toHaveBeenCalledTimes(0); }); + + test('continues execution when error state is gone', async () => { + testScheduler.run(({ cold, expectObservable, flush }) => { + const a = 1; + const b = 2; + const c = 3; + const observable$ = cold('abc|', { a, b, c }); + const flakyFn = jest + .fn() + .mockImplementationOnce((value) => value) + .mockImplementationOnce(() => { + throw new Error('Some error.'); + }) + .mockImplementationOnce((value) => value); + const spyFn = jest.fn((value) => value); + + const executor = createUnitTestExecutor(); + executor.registerFunction({ + name: 'observable', + args: {}, + help: '', + fn: () => observable$, + }); + executor.registerFunction({ + name: 'flaky', + args: {}, + help: '', + fn: (value) => flakyFn(value), + }); + executor.registerFunction({ + name: 'spy', + args: {}, + help: '', + fn: (value) => spyFn(value), + }); + + const result = executor.run('observable | flaky | spy', null, {}); + + expectObservable(result).toBe('abc|', { + a: { partial: true, result: a }, + b: { + partial: true, + result: { + type: 'error', + error: expect.objectContaining({ message: '[flaky] > Some error.' }), + }, + }, + c: { partial: false, result: c }, + }); + + flush(); + + expect(spyFn).toHaveBeenCalledTimes(2); + expect(spyFn).toHaveBeenNthCalledWith(1, a); + expect(spyFn).toHaveBeenNthCalledWith(2, c); + }); + }); }); describe('state', () => { diff --git a/src/plugins/expressions/common/execution/execution.ts b/src/plugins/expressions/common/execution/execution.ts index b4ef83389ce28..68cebaa65569b 100644 --- a/src/plugins/expressions/common/execution/execution.ts +++ b/src/plugins/expressions/common/execution/execution.ts @@ -295,87 +295,86 @@ export class Execution< } invokeChain( - chainArr: ExpressionAstFunction[], + [head, ...tail]: ExpressionAstFunction[], input: unknown - ): Observable { + ): Observable { + if (!head) { + return of(input as ChainOutput); + } + return of(input).pipe( - ...(chainArr.map((link) => - switchMap((currentInput) => { - const { function: fnName, arguments: fnArgs } = link; - const fn = getByAlias( - this.state.get().functions, - fnName, - this.execution.params.namespace - ); + switchMap((currentInput) => { + const { function: fnName, arguments: fnArgs } = head; + const fn = getByAlias(this.state.get().functions, fnName, this.execution.params.namespace); + + if (!fn) { + throw createError({ + name: 'fn not found', + message: i18n.translate('expressions.execution.functionNotFound', { + defaultMessage: `Function {fnName} could not be found.`, + values: { + fnName, + }, + }), + }); + } - if (!fn) { - throw createError({ - name: 'fn not found', - message: i18n.translate('expressions.execution.functionNotFound', { - defaultMessage: `Function {fnName} could not be found.`, - values: { - fnName, - }, - }), - }); - } + if (fn.disabled) { + throw createError({ + name: 'fn is disabled', + message: i18n.translate('expressions.execution.functionDisabled', { + defaultMessage: `Function {fnName} is disabled.`, + values: { + fnName, + }, + }), + }); + } - if (fn.disabled) { - throw createError({ - name: 'fn is disabled', - message: i18n.translate('expressions.execution.functionDisabled', { - defaultMessage: `Function {fnName} is disabled.`, - values: { - fnName, - }, - }), - }); - } + if (fn.deprecated) { + this.logger?.warn(`Function '${fnName}' is deprecated`); + } - if (fn.deprecated) { - this.logger?.warn(`Function '${fnName}' is deprecated`); - } + if (this.execution.params.debug) { + head.debug = { + args: {}, + duration: 0, + fn: fn.name, + input: currentInput, + success: true, + }; + } - if (this.execution.params.debug) { - link.debug = { - args: {}, - duration: 0, - fn: fn.name, - input: currentInput, - success: true, - }; - } + const timeStart = this.execution.params.debug ? now() : 0; + + // `resolveArgs` returns an object because the arguments themselves might + // actually have `then` or `subscribe` methods which would be treated as a `Promise` + // or an `Observable` accordingly. + return this.resolveArgs(fn, currentInput, fnArgs).pipe( + tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })), + switchMap((args) => this.invokeFunction(fn, currentInput, args)), + switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))), + tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })), + switchMap((output) => this.invokeChain(tail, output)), + catchError((rawError) => { + const error = createError(rawError); + error.error.message = `[${fnName}] > ${error.error.message}`; + + if (this.execution.params.debug) { + Object.assign(head.debug, { error, rawError, success: false }); + } - const timeStart = this.execution.params.debug ? now() : 0; - - // `resolveArgs` returns an object because the arguments themselves might - // actually have `then` or `subscribe` methods which would be treated as a `Promise` - // or an `Observable` accordingly. - return this.resolveArgs(fn, currentInput, fnArgs).pipe( - tap((args) => this.execution.params.debug && Object.assign(link.debug, { args })), - switchMap((args) => this.invokeFunction(fn, currentInput, args)), - switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))), - tap((output) => this.execution.params.debug && Object.assign(link.debug, { output })), - catchError((rawError) => { - const error = createError(rawError); - error.error.message = `[${fnName}] > ${error.error.message}`; - - if (this.execution.params.debug) { - Object.assign(link.debug, { error, rawError, success: false }); - } - - return throwError(error); - }), - finalize(() => { - if (this.execution.params.debug) { - Object.assign(link.debug, { duration: now() - timeStart }); - } - }) - ); - }) - ) as Parameters['pipe']>), + return of(error); + }), + finalize(() => { + if (this.execution.params.debug) { + Object.assign(head.debug, { duration: now() - timeStart }); + } + }) + ); + }), catchError((error) => of(error)) - ) as Observable; + ); } invokeFunction(