Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fetch): add selector #5306

Merged
merged 6 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions spec-dtslint/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { fromFetch } from 'rxjs/fetch';
import { a$ } from '../../helpers';

it('should emit the fetch Response by default', () => {
const a = fromFetch("a"); // $ExpectType Observable<Response>
});

it('should support a selector that returns a Response promise', () => {
const a = fromFetch("a", { selector: response => response.text() }); // $ExpectType Observable<string>
});

it('should support a selector that returns an arbitrary type', () => {
const a = fromFetch("a", { selector: response => a$ }); // $ExpectType Observable<A>
});

it('should error for selectors that don\'t return an ObservableInput', () => {
const a = fromFetch("a", { selector: response => 42 }); // $ExpectError
});
6 changes: 5 additions & 1 deletion spec-dtslint/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
"noEmit": true,
"paths": {
"rxjs": ["../dist/types"],
"rxjs/operators": ["../dist/types/operators"]
"rxjs/ajax": ["../dist/types/ajax"],
"rxjs/fetch": ["../dist/types/fetch"],
"rxjs/operators": ["../dist/types/operators"],
"rxjs/testing": ["../dist/types/testing"],
"rxjs/webSocket": ["../dist/types/webSocket"]
},
"skipLibCheck": true,
"strict": true,
Expand Down
53 changes: 53 additions & 0 deletions spec/observables/dom/fetch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,57 @@ describe('fromFetch', () => {
}
});
});

it('should support a selector', done => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', {
selector: response => response.text()
});
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);

fetch$.subscribe({
next: text => {
expect(text).to.equal('bar');
},
error: done,
complete: () => {
// Wait until the complete and the subsequent unsubscribe are finished
// before testing these expectations:
setTimeout(() => {
expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;
done();
}, 0);
}
});
});

it('should abort when unsubscribed and a selector is specified', () => {
mockFetch.respondWith = {
...OK_RESPONSE,
text: () => Promise.resolve('bar')
};
const fetch$ = fromFetch('/foo', {
selector: response => response.text()
});
expect(mockFetch.calls.length).to.equal(0);
expect(MockAbortController.created).to.equal(0);
const subscription = fetch$.subscribe();

expect(MockAbortController.created).to.equal(1);
expect(mockFetch.calls.length).to.equal(1);
expect(mockFetch.calls[0].input).to.equal('/foo');
expect(mockFetch.calls[0].init!.signal).not.to.be.undefined;
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.false;

subscription.unsubscribe();
expect(mockFetch.calls[0].init!.signal!.aborted).to.be.true;
});
});
78 changes: 72 additions & 6 deletions src/internal/observable/dom/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { from } from '../../observable/from';
import { ObservableInput } from '../../types';

export function fromFetch<T>(
input: string | Request,
init: RequestInit & {
selector: (response: Response) => ObservableInput<T>
}
): Observable<T>;

export function fromFetch(
input: string | Request,
init?: RequestInit
): Observable<Response>;

/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
Expand Down Expand Up @@ -42,7 +56,36 @@ import { Subscription } from '../../Subscription';
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* })
* });
* ```
*
* ### Use with Chunked Transfer Encoding
*
* With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
* the promise returned by `fetch` will resolve as soon as the response's headers are
* received.
*
* That means the `fromFetch` observable will emit a `Response` - and will
* then complete - before the body is received. When one of the methods on the
* `Response` - like `text()` or `json()` - is called, the returned promise will not
* resolve until the entire body has been received. Unsubscribing from any observable
* that uses the promise as an observable input will not abort the request.
*
* To facilitate aborting the retrieval of responses that use chunked transfer encoding,
* a `selector` can be specified via the `init` parameter:
*
* ```ts
* import { of } from 'rxjs';
* import { fromFetch } from 'rxjs/fetch';
*
* const data$ = fromFetch('https://api.github.com/users?per_page=5', {
* selector: response => response.json()
* });
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* });
* ```
*
* @param input The resource you would like to fetch. Can be a url or a request object.
Expand All @@ -51,8 +94,14 @@ import { Subscription } from '../../Subscription';
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
*/
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response> {
return new Observable<Response>(subscriber => {
export function fromFetch<T>(
input: string | Request,
initWithSelector: RequestInit & {
selector?: (response: Response) => ObservableInput<T>
} = {}
): Observable<Response | T> {
const { selector, ...init } = initWithSelector;
Copy link
Contributor

@iamandrewluca iamandrewluca Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benlesh @cartant An observation. Now that init is created using spread, this means that init always will be an object, even it has no fields. And below wrap with if(init) has no value

https://github.com/ReactiveX/rxjs/pull/5306/files#diff-717ea4ff7feb7c6009e9048143f3ef0fc9ddc6cdeb16b1effcb19d18f3a9c875R119

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamandrewluca 👍 Yep. Feel free to open a PR to simplify it.

return new Observable<Response | T>(subscriber => {
const controller = new AbortController();
const signal = controller.signal;
let abortable = true;
Expand Down Expand Up @@ -91,9 +140,26 @@ export function fromFetch(input: string | Request, init?: RequestInit): Observab
}

fetch(input, perSubscriberInit).then(response => {
abortable = false;
subscriber.next(response);
subscriber.complete();
if (selector) {
subscription.add(from(selector(response)).subscribe(
value => subscriber.next(value),
err => {
abortable = false;
if (!unsubscribed) {
// Only forward the error if it wasn't an abort.
subscriber.error(err);
}
},
() => {
abortable = false;
subscriber.complete();
}
));
} else {
abortable = false;
subscriber.next(response);
subscriber.complete();
}
}).catch(err => {
abortable = false;
if (!unsubscribed) {
Expand Down