diff --git a/modules/signals/rxjs-interop/src/rx-method.ts b/modules/signals/rxjs-interop/src/rx-method.ts index d97f9450bb..54858e99f8 100644 --- a/modules/signals/rxjs-interop/src/rx-method.ts +++ b/modules/signals/rxjs-interop/src/rx-method.ts @@ -1,13 +1,13 @@ import { assertInInjectionContext, DestroyRef, - effect, inject, Injector, isSignal, Signal, } from '@angular/core'; -import { isObservable, noop, Observable, Subject, Unsubscribable } from 'rxjs'; +import { toObservable } from '@angular/core/rxjs-interop'; +import { isObservable, Observable, of, Subject, Unsubscribable } from 'rxjs'; type RxMethodInput = Input | Observable | Signal; @@ -30,23 +30,20 @@ export function rxMethod( destroyRef.onDestroy(() => sourceSub.unsubscribe()); const rxMethodFn = (input: RxMethodInput) => { - if (isSignal(input)) { - const watcher = effect(() => source$.next(input()), { injector }); - const instanceSub = { unsubscribe: () => watcher.destroy() }; - sourceSub.add(instanceSub); + let input$: Observable; - return instanceSub; + if (isSignal(input)) { + input$ = toObservable(input, { injector }); + } else if (isObservable(input)) { + input$ = input; + } else { + input$ = of(input); } - if (isObservable(input)) { - const instanceSub = input.subscribe((value) => source$.next(value)); - sourceSub.add(instanceSub); - - return instanceSub; - } + const instanceSub = input$.subscribe((value) => source$.next(value)); + sourceSub.add(instanceSub); - source$.next(input); - return { unsubscribe: noop }; + return instanceSub; }; rxMethodFn.unsubscribe = sourceSub.unsubscribe.bind(sourceSub);