Skip to content

Commit

Permalink
fix(storage): snapshotChanges should return a success snapshot (#2704)
Browse files Browse the repository at this point in the history
* Works on tasks that are already running
* Now appropriately emits a final success snapshot before completion
* In theory should emit a canceled or error snapshot before throwing (once the JS SDK is patched)
* Added tests for canceling, pausing, and resuming
  • Loading branch information
jamesdaniels authored Dec 3, 2020
1 parent 984006d commit 972aa85
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 7 deletions.
28 changes: 22 additions & 6 deletions src/storage/observable/fromTask.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
import { Observable } from 'rxjs';
import { shareReplay } from 'rxjs/operators';
import { debounceTime } from 'rxjs/operators';
import { UploadTask, UploadTaskSnapshot } from '../interfaces';

// need to import, else the types become import('firebase/app').default.storage.UploadTask
// and it no longer works w/Firebase v7
import firebase from 'firebase/app';

// Things aren't working great, I'm having to put in a lot of work-arounds for what
// appear to be Firebase JS SDK bugs https://github.com/firebase/firebase-js-sdk/issues/4158
export function fromTask(task: UploadTask) {
return new Observable<UploadTaskSnapshot>(subscriber => {
const progress = (snap: UploadTaskSnapshot) => subscriber.next(snap);
const error = e => subscriber.error(e);
const complete = () => subscriber.complete();
task.on('state_changed', progress, (e) => {
// emit the current snapshot, so they don't have to wait for state_changes
// to fire next... this is stale if the task is no longer running :(
progress(task.snapshot);
const unsub = task.on('state_changed', progress);
// it turns out that neither task snapshot nor 'state_changed' fire the last
// snapshot before completion, the one with status 'success" and 100% progress
// so let's use the promise form of the task for that
task.then(snapshot => {
progress(snapshot);
complete();
}, e => {
// TODO investigate, again this is stale, we never fire a canceled or error it seems
progress(task.snapshot);
error(e);
}, () => {
progress(task.snapshot);
complete();
});
// on's type if Function, rather than () => void, need to wrap
return function unsubscribe() {
unsub();
};
}).pipe(
shareReplay({ bufferSize: 1, refCount: false })
// deal with sync emissions from first emitting `task.snapshot`, this makes sure
// that if the task is already finished we don't emit the old running state
debounceTime(0)
);
}
71 changes: 70 additions & 1 deletion src/storage/storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { forkJoin } from 'rxjs';
import { mergeMap, tap } from 'rxjs/operators';
import { TestBed } from '@angular/core/testing';
import { AngularFireModule, FIREBASE_APP_NAME, FIREBASE_OPTIONS, FirebaseApp } from '@angular/fire';
import { AngularFireStorage, AngularFireStorageModule, AngularFireUploadTask, BUCKET } from '@angular/fire/storage';
import { AngularFireStorage, AngularFireStorageModule, AngularFireUploadTask, BUCKET, fromTask } from '@angular/fire/storage';
import { COMMON_CONFIG } from '../test-config';
import { rando } from '../firestore/utils.spec';
import { ChangeDetectorRef } from '@angular/core';
import 'firebase/storage';
import firebase from 'firebase/app';

if (typeof XMLHttpRequest === 'undefined') {
globalThis.XMLHttpRequest = require('xhr2');
Expand Down Expand Up @@ -64,13 +65,19 @@ describe('AngularFireStorage', () => {
const blob = blobOrBuffer(JSON.stringify(data), { type: 'application/json' });
const ref = afStorage.ref(rando());
const task = ref.put(blob);
let emissionCount = 0;
let lastSnap: firebase.storage.UploadTaskSnapshot;
task.snapshotChanges()
.subscribe(
snap => {
lastSnap = snap;
emissionCount++;
expect(snap).toBeDefined();
},
done.fail,
() => {
expect(lastSnap.state).toBe('success');
expect(emissionCount).toBeGreaterThan(0);
ref.delete().subscribe(done, done.fail);
});
});
Expand Down Expand Up @@ -104,6 +111,68 @@ describe('AngularFireStorage', () => {
}).catch(done.fail);
});

it('should cancel the task', (done) => {
const data = { angular: 'promise' };
const blob = blobOrBuffer(JSON.stringify(data), { type: 'application/json' });
const ref = afStorage.ref(rando());
const task: AngularFireUploadTask = ref.put(blob);
let emissionCount = 0;
let lastSnap: firebase.storage.UploadTaskSnapshot;
task.snapshotChanges().subscribe(snap => {
emissionCount++;
lastSnap = snap;
if (emissionCount === 1) {
task.cancel();
}
}, () => {
// TODO investigate, this doesn't appear to work...
// https://github.com/firebase/firebase-js-sdk/issues/4158
// expect(lastSnap.state).toEqual('canceled');
done();
}, done.fail);
});

it('should be able to pause/resume the task', (done) => {
const data = { angular: 'promise' };
const blob = blobOrBuffer(JSON.stringify(data), { type: 'application/json' });
const ref = afStorage.ref(rando());
const task: AngularFireUploadTask = ref.put(blob);
let paused = false;
task.pause();
task.snapshotChanges().subscribe(snap => {
if (snap.state === 'paused') {
paused = true;
task.resume();
}
}, done.fail, () => {
expect(paused).toBeTruthy();
done();
});
});

it('should work with an already finished task', (done) => {
const data = { angular: 'promise' };
const blob = blobOrBuffer(JSON.stringify(data), { type: 'application/json' });
const ref = afStorage.storage.ref(rando());
const task = ref.put(blob);
let emissionCount = 0;
let lastSnap: firebase.storage.UploadTaskSnapshot;
task.then(_snap => {
fromTask(task).subscribe(
snap => {
lastSnap = snap;
emissionCount++;
expect(snap).toBeDefined();
},
done.fail,
() => {
expect(lastSnap.state).toBe('success');
expect(emissionCount).toBe(1);
ref.delete().then(done, done.fail);
});
});
});

});

describe('reference', () => {
Expand Down

0 comments on commit 972aa85

Please sign in to comment.