diff --git a/modules/signals/rxjs-interop/src/rx-method.ts b/modules/signals/rxjs-interop/src/rx-method.ts
index 54858e99f8..d97f9450bb 100644
--- a/modules/signals/rxjs-interop/src/rx-method.ts
+++ b/modules/signals/rxjs-interop/src/rx-method.ts
@@ -1,13 +1,13 @@
import {
assertInInjectionContext,
DestroyRef,
+ effect,
inject,
Injector,
isSignal,
Signal,
} from '@angular/core';
-import { toObservable } from '@angular/core/rxjs-interop';
-import { isObservable, Observable, of, Subject, Unsubscribable } from 'rxjs';
+import { isObservable, noop, Observable, Subject, Unsubscribable } from 'rxjs';
type RxMethodInput = Input | Observable | Signal;
@@ -30,20 +30,23 @@ export function rxMethod(
destroyRef.onDestroy(() => sourceSub.unsubscribe());
const rxMethodFn = (input: RxMethodInput) => {
- let input$: Observable;
-
if (isSignal(input)) {
- input$ = toObservable(input, { injector });
- } else if (isObservable(input)) {
- input$ = input;
- } else {
- input$ = of(input);
+ const watcher = effect(() => source$.next(input()), { injector });
+ const instanceSub = { unsubscribe: () => watcher.destroy() };
+ sourceSub.add(instanceSub);
+
+ return instanceSub;
}
- const instanceSub = input$.subscribe((value) => source$.next(value));
- sourceSub.add(instanceSub);
+ if (isObservable(input)) {
+ const instanceSub = input.subscribe((value) => source$.next(value));
+ sourceSub.add(instanceSub);
+
+ return instanceSub;
+ }
- return instanceSub;
+ source$.next(input);
+ return { unsubscribe: noop };
};
rxMethodFn.unsubscribe = sourceSub.unsubscribe.bind(sourceSub);