Skip to content

Commit

Permalink
Apply range-filters in get when probing active-listener cache (#4408)
Browse files Browse the repository at this point in the history
* Apply range-filters in get when probing active-listener cache

* Fix get() _again_

* cleanup

* lint error and test feedback

* increase test coverage

* Remove unused var

* chnageset

* fix changeset

* Fix misunderstanding of suggested fix.

* Update packages/database/test/query.test.ts

Co-authored-by: Sebastian Schmidt <[email protected]>

* Update packages/database/test/query.test.ts

Co-authored-by: Sebastian Schmidt <[email protected]>

* add more tests for get

* Update .changeset/small-icons-allow.md

Co-authored-by: Sebastian Schmidt <[email protected]>

* Update packages/database/src/core/view/View.ts

Co-authored-by: Sebastian Schmidt <[email protected]>

* Update packages/database/src/core/SyncPoint.ts

Co-authored-by: Sebastian Schmidt <[email protected]>

* Fix get with pending writes case

Co-authored-by: Sebastian Schmidt <[email protected]>
  • Loading branch information
jmwski and schmidt-sebastian authored Feb 10, 2021
1 parent f513922 commit 318af54
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 20 deletions.
4 changes: 4 additions & 0 deletions .changeset/small-icons-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
'@firebase/database': patch
---
Fixed an issue with `Query.get()` where Query filters are not applied to data in some cases.
4 changes: 2 additions & 2 deletions packages/database/src/core/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ export class Repo {
*/
getValue(query: Query): Promise<DataSnapshot> {
// Only active queries are cached. There is no persisted cache.
const cached = this.serverSyncTree_.calcCompleteEventCache(query.path);
if (!cached.isEmpty()) {
const cached = this.serverSyncTree_.getServerValue(query);
if (cached != null) {
return Promise.resolve(
new DataSnapshot(
cached,
Expand Down
47 changes: 38 additions & 9 deletions packages/database/src/core/SyncPoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,24 @@ export class SyncPoint {
return events;
}
}

/**
* Add an event callback for the specified query.
* Get a view for the specified query.
*
* @param serverCache Complete server cache, if we have it.
* @param query The query to return a view for
* @param writesCache
* @param serverCache
* @param serverCacheComplete
* @return Events to raise.
*/
addEventRegistration(
getView(
query: Query,
eventRegistration: EventRegistration,
writesCache: WriteTreeRef,
serverCache: Node | null,
serverCacheComplete: boolean
): Event[] {
): View {
const queryId = query.queryIdentifier();
let view = this.views.get(queryId);
const view = this.views.get(queryId);
if (!view) {
// TODO: make writesCache take flag for complete server node
let eventCache = writesCache.calcCompleteEventCache(
Expand All @@ -128,10 +130,37 @@ export class SyncPoint {
new CacheNode(eventCache, eventCacheComplete, false),
new CacheNode(serverCache, serverCacheComplete, false)
);
view = new View(query, viewCache);
this.views.set(queryId, view);
return new View(query, viewCache);
}
return view;
}

/**
* Add an event callback for the specified query.
*
* @param query
* @param eventRegistration
* @param writesCache
* @param serverCache Complete server cache, if we have it.
* @param serverCacheComplete
* @return Events to raise.
*/
addEventRegistration(
query: Query,
eventRegistration: EventRegistration,
writesCache: WriteTreeRef,
serverCache: Node | null,
serverCacheComplete: boolean
): Event[] {
const view = this.getView(
query,
writesCache,
serverCache,
serverCacheComplete
);
if (!this.views.has(query.queryIdentifier())) {
this.views.set(query.queryIdentifier(), view);
}
// This is guaranteed to exist now, we just created anything that was missing
view.addEventRegistration(eventRegistration);
return view.getInitialEvents(eventRegistration);
Expand Down
33 changes: 33 additions & 0 deletions packages/database/src/core/SyncTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { Node } from './snap/Node';
import { Event } from './view/Event';
import { EventRegistration } from './view/EventRegistration';
import { View } from './view/View';
import { CacheNode } from './view/CacheNode';

/**
* @typedef {{
Expand Down Expand Up @@ -503,6 +504,38 @@ export class SyncTree {
);
}

getServerValue(query: Query): Node | null {
const path = query.path;
let serverCache: Node | null = null;
// Any covering writes will necessarily be at the root, so really all we need to find is the server cache.
// Consider optimizing this once there's a better understanding of what actual behavior will be.
this.syncPointTree_.foreachOnPath(path, (pathToSyncPoint, sp) => {
const relativePath = Path.relativePath(pathToSyncPoint, path);
serverCache = serverCache || sp.getCompleteServerCache(relativePath);
});
let syncPoint = this.syncPointTree_.get(path);
if (!syncPoint) {
syncPoint = new SyncPoint();
this.syncPointTree_ = this.syncPointTree_.set(path, syncPoint);
} else {
serverCache = serverCache || syncPoint.getCompleteServerCache(Path.Empty);
}
const serverCacheComplete = serverCache != null;
const serverCacheNode: CacheNode | null = serverCacheComplete
? new CacheNode(serverCache, true, false)
: null;
const writesCache: WriteTreeRef | null = this.pendingWriteTree_.childWrites(
query.path
);
const view: View = syncPoint.getView(
query,
writesCache,
serverCacheComplete ? serverCacheNode.getNode() : ChildrenNode.EMPTY_NODE,
serverCacheComplete
);
return view.getCompleteNode();
}

/**
* This collapses multiple unfiltered views into a single view, since we only need a single
* listener for them.
Expand Down
4 changes: 4 additions & 0 deletions packages/database/src/core/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ export class View {
return this.viewCache_.getServerCache().getNode();
}

getCompleteNode(): Node | null {
return this.viewCache_.getCompleteEventSnap();
}

getCompleteServerCache(path: Path): Node | null {
const cache = this.viewCache_.getCompleteServerSnap();
if (cache) {
Expand Down
188 changes: 179 additions & 9 deletions packages/database/test/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,40 @@ describe('Query Tests', () => {
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childOne.key]]);
});

it('Ensure startAfter on key index works with overlapping listener', async () => {
const node = getRandomNode() as Reference;
const childOne = node.push();
const childTwo = node.push();
// Create a server synced and a latency-compensated write
await childOne.set(1);
childTwo.set(2);
const ea = EventAccumulatorFactory.waitsForCount(1);
node.on('value', snap => {
ea.addEvent(snap.val());
});
await ea.promise;
const snap = await node.orderByKey().startAfter(childOne.key).get();
expect(Object.keys(snap.val())).to.deep.equal([childTwo.key]);
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childTwo.key]]);
});

it('Ensure endBefore on key index works with overlapping listener', async () => {
const node = getRandomNode() as Reference;
const childOne = node.push();
const childTwo = node.push();
// Create a server synced and a latency-compensated write
await childOne.set(1);
childTwo.set(2);
const ea = EventAccumulatorFactory.waitsForCount(1);
node.on('value', snap => {
ea.addEvent(snap.val());
});
await ea.promise;
const snap = await node.orderByKey().endBefore(childTwo.key).get();
expect(Object.keys(snap.val())).to.deep.equal([childOne.key]);
expect(Object.values(snap.val())).to.deep.equal([snap.val()[childOne.key]]);
});

it('Ensure startAt / endAt with priority works.', async () => {
const node = getRandomNode() as Reference;

Expand Down Expand Up @@ -3120,14 +3154,28 @@ describe('Query Tests', () => {
expect((await node.get()).val()).to.equal(null);
});

it('get at non-empty root returns correct value', async () => {
it('get at node returns correct value', async () => {
const node = getRandomNode() as Reference;
const expected = { foo: 'a', bar: 'b' };
await node.set(expected);
const snapshot = await node.get();
expect(snapshot.val()).to.deep.equal(expected);
});

it('get for child returns correct value', async () => {
const node = getRandomNode() as Reference;
await node.set({ foo: 'a', bar: 'b', baz: 'c' });
const snapshot = await node.child('baz').get();
expect(snapshot.val()).to.deep.equal('c');
});

it('get for parent returns correct value', async () => {
const node = getRandomNode() as Reference;
const child = node.child('child');
await child.set(1);
expect((await node.get()).val()).to.deep.equal({ child: 1 });
});

it('get for removed node returns correct value', async () => {
const node = getRandomNode() as Reference;
const expected = { foo: 'a', bar: 'b' };
Expand All @@ -3140,7 +3188,7 @@ describe('Query Tests', () => {
expect(snapshot.val()).to.be.null;
});

it('get while offline is rejected', async () => {
it('get for missing node while offline is rejected', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
Expand All @@ -3150,13 +3198,7 @@ describe('Query Tests', () => {
}
});

it('get returns the latest value', async () => {
const node = getRandomNode() as Reference;
await node.set({ foo: 'bar' });
expect((await node.get()).val()).to.deep.equal({ foo: 'bar' });
});

it('get reads from cache if database is not connected', async () => {
it('get reads node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
Expand All @@ -3178,6 +3220,134 @@ describe('Query Tests', () => {
}
});

it('get reads child node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
const getSnapshot = await node.child('foo').get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal('bar');
} finally {
node.database.goOnline();
}
});

it('get reads parent node from cache when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
await node2.child('baz').set(1);
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'bar', baz: 1 });
} finally {
node.database.goOnline();
}
});

it('get with pending node writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.set({ foo: 'baz' });
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'baz' });
} finally {
node.database.goOnline();
}
});

it('get with pending child writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.child('baz').set(true);
const getSnapshot = await node.get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal({ foo: 'bar', baz: true });
} finally {
node.database.goOnline();
}
});

it('get with pending parent writes when not connected', async () => {
const node = getRandomNode() as Reference;
const node2 = getFreshRepo(node.path);
try {
await node2.set({ foo: 'bar' });
const onSnapshot = await new Promise((resolve, _) => {
node.on('value', snap => {
resolve(snap);
});
});
node.database.goOffline();
node.set({ foo: 'baz' });
const getSnapshot = await node.child('foo').get();
// node's cache dropped here.
node.off();
expect(getSnapshot.val()).to.deep.equal('baz');
} finally {
node.database.goOnline();
}
});

it('get with pending writes', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
node.set({ foo: 'bar' });
const snap = await node.get();
expect(snap.val()).to.deep.equal({ foo: 'bar' });
} finally {
node.database.goOnline();
}
});

it('get child of pending writes', async () => {
const node = getRandomNode() as Reference;
node.database.goOffline();
try {
node.set({ foo: 'bar' });
const snap = await node.child('foo').get();
expect(snap.val()).to.deep.equal('bar');
} finally {
node.database.goOnline();
}
});

it('get does not cache sibling data', async () => {
const reader = getRandomNode() as Reference;
const writer = getFreshRepo(reader.path);
Expand Down

0 comments on commit 318af54

Please sign in to comment.