Skip to content

Commit

Permalink
[Expressions] Fix expressions chain invocation not to unsubscribe on …
Browse files Browse the repository at this point in the history
…error (#142105)
  • Loading branch information
dokmic authored Sep 29, 2022
1 parent ea046ac commit 6a0b2fd
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 73 deletions.
57 changes: 57 additions & 0 deletions src/plugins/expressions/common/execution/execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,63 @@ describe('Execution', () => {

expect(spy.fn).toHaveBeenCalledTimes(0);
});

test('continues execution when error state is gone', async () => {
testScheduler.run(({ cold, expectObservable, flush }) => {
const a = 1;
const b = 2;
const c = 3;
const observable$ = cold('abc|', { a, b, c });
const flakyFn = jest
.fn()
.mockImplementationOnce((value) => value)
.mockImplementationOnce(() => {
throw new Error('Some error.');
})
.mockImplementationOnce((value) => value);
const spyFn = jest.fn((value) => value);

const executor = createUnitTestExecutor();
executor.registerFunction({
name: 'observable',
args: {},
help: '',
fn: () => observable$,
});
executor.registerFunction({
name: 'flaky',
args: {},
help: '',
fn: (value) => flakyFn(value),
});
executor.registerFunction({
name: 'spy',
args: {},
help: '',
fn: (value) => spyFn(value),
});

const result = executor.run('observable | flaky | spy', null, {});

expectObservable(result).toBe('abc|', {
a: { partial: true, result: a },
b: {
partial: true,
result: {
type: 'error',
error: expect.objectContaining({ message: '[flaky] > Some error.' }),
},
},
c: { partial: false, result: c },
});

flush();

expect(spyFn).toHaveBeenCalledTimes(2);
expect(spyFn).toHaveBeenNthCalledWith(1, a);
expect(spyFn).toHaveBeenNthCalledWith(2, c);
});
});
});

describe('state', () => {
Expand Down
145 changes: 72 additions & 73 deletions src/plugins/expressions/common/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,87 +295,86 @@ export class Execution<
}

invokeChain<ChainOutput = unknown>(
chainArr: ExpressionAstFunction[],
[head, ...tail]: ExpressionAstFunction[],
input: unknown
): Observable<ChainOutput> {
): Observable<ChainOutput | ExpressionValueError> {
if (!head) {
return of(input as ChainOutput);
}

return of(input).pipe(
...(chainArr.map((link) =>
switchMap((currentInput) => {
const { function: fnName, arguments: fnArgs } = link;
const fn = getByAlias(
this.state.get().functions,
fnName,
this.execution.params.namespace
);
switchMap((currentInput) => {
const { function: fnName, arguments: fnArgs } = head;
const fn = getByAlias(this.state.get().functions, fnName, this.execution.params.namespace);

if (!fn) {
throw createError({
name: 'fn not found',
message: i18n.translate('expressions.execution.functionNotFound', {
defaultMessage: `Function {fnName} could not be found.`,
values: {
fnName,
},
}),
});
}

if (!fn) {
throw createError({
name: 'fn not found',
message: i18n.translate('expressions.execution.functionNotFound', {
defaultMessage: `Function {fnName} could not be found.`,
values: {
fnName,
},
}),
});
}
if (fn.disabled) {
throw createError({
name: 'fn is disabled',
message: i18n.translate('expressions.execution.functionDisabled', {
defaultMessage: `Function {fnName} is disabled.`,
values: {
fnName,
},
}),
});
}

if (fn.disabled) {
throw createError({
name: 'fn is disabled',
message: i18n.translate('expressions.execution.functionDisabled', {
defaultMessage: `Function {fnName} is disabled.`,
values: {
fnName,
},
}),
});
}
if (fn.deprecated) {
this.logger?.warn(`Function '${fnName}' is deprecated`);
}

if (fn.deprecated) {
this.logger?.warn(`Function '${fnName}' is deprecated`);
}
if (this.execution.params.debug) {
head.debug = {
args: {},
duration: 0,
fn: fn.name,
input: currentInput,
success: true,
};
}

if (this.execution.params.debug) {
link.debug = {
args: {},
duration: 0,
fn: fn.name,
input: currentInput,
success: true,
};
}
const timeStart = this.execution.params.debug ? now() : 0;

// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(head.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(head.debug, { output })),
switchMap((output) => this.invokeChain<ChainOutput>(tail, output)),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;

if (this.execution.params.debug) {
Object.assign(head.debug, { error, rawError, success: false });
}

const timeStart = this.execution.params.debug ? now() : 0;

// `resolveArgs` returns an object because the arguments themselves might
// actually have `then` or `subscribe` methods which would be treated as a `Promise`
// or an `Observable` accordingly.
return this.resolveArgs(fn, currentInput, fnArgs).pipe(
tap((args) => this.execution.params.debug && Object.assign(link.debug, { args })),
switchMap((args) => this.invokeFunction(fn, currentInput, args)),
switchMap((output) => (getType(output) === 'error' ? throwError(output) : of(output))),
tap((output) => this.execution.params.debug && Object.assign(link.debug, { output })),
catchError((rawError) => {
const error = createError(rawError);
error.error.message = `[${fnName}] > ${error.error.message}`;

if (this.execution.params.debug) {
Object.assign(link.debug, { error, rawError, success: false });
}

return throwError(error);
}),
finalize(() => {
if (this.execution.params.debug) {
Object.assign(link.debug, { duration: now() - timeStart });
}
})
);
})
) as Parameters<Observable<unknown>['pipe']>),
return of(error);
}),
finalize(() => {
if (this.execution.params.debug) {
Object.assign(head.debug, { duration: now() - timeStart });
}
})
);
}),
catchError((error) => of(error))
) as Observable<ChainOutput>;
);
}

invokeFunction<Fn extends ExpressionFunction>(
Expand Down

0 comments on commit 6a0b2fd

Please sign in to comment.