Skip to content

Commit

Permalink
Prevent reactive variables from retaining unwatched caches. (#7279)
Browse files Browse the repository at this point in the history
Each reactive variable must remember any caches previously involved in
reading its value, so those caches can be notified if/when the variable
value is next modified.

To prevent reactive variables from retaining references to caches that no
longer need to be notified, we preemptively remove the cache from the
memory of all of its associated reactive variables whenever all of its
watches are cancelled, since not having any active cache.watches means
cache.broadcastWatches has nothing to do.

This change should prevent memory leaks when lots of caches are created
and discarded by the same process, as in the server-side rendering use
case described by @manojVivek in #7274.
  • Loading branch information
benjamn authored Nov 3, 2020
1 parent 6f36032 commit 49e68a3
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 8 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
{
"name": "apollo-client",
"path": "./dist/apollo-client.cjs.min.js",
"maxSize": "25.25 kB"
"maxSize": "25.3 kB"
}
],
"peerDependencies": {
Expand Down
100 changes: 96 additions & 4 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2971,10 +2971,6 @@ describe('@connection', () => {
const bResults = watch(gql`{ b }`);
const abResults = watch(gql`{ a b }`);

function wait(time = 10) {
return new Promise(resolve => setTimeout(resolve, 10));
}

await wait();

function checkLastResult(
Expand Down Expand Up @@ -3114,6 +3110,102 @@ describe('@connection', () => {
resolve();
});

function wait(time = 10) {
return new Promise(resolve => setTimeout(resolve, time));
}

itAsync('should call forgetCache for reactive vars when stopped', async (resolve, reject) => {
const aVar = makeVar(123);
const bVar = makeVar("asdf");
const aSpy = jest.spyOn(aVar, "forgetCache");
const bSpy = jest.spyOn(bVar, "forgetCache");
const cache: InMemoryCache = new InMemoryCache({
typePolicies: {
Query: {
fields: {
a() {
return aVar();
},
b() {
return bVar();
},
},
},
},
});

const client = new ApolloClient({ cache });

const obsQueries = new Set<ObservableQuery<any>>();
const subs = new Set<ObservableSubscription>();
function watch(
query: DocumentNode,
fetchPolicy: WatchQueryFetchPolicy = "cache-first",
): any[] {
const results: any[] = [];
const obsQuery = client.watchQuery({
query,
fetchPolicy,
});
obsQueries.add(obsQuery);
subs.add(obsQuery.subscribe({
next(result) {
results.push(result.data);
},
}));
return results;
}

const aQuery = gql`{ a }`;
const bQuery = gql`{ b }`;
const abQuery = gql`{ a b }`;

const aResults = watch(aQuery);
const bResults = watch(bQuery);

expect(cache["watches"].size).toBe(2);

expect(aResults).toEqual([]);
expect(bResults).toEqual([]);

expect(aSpy).not.toBeCalled();
expect(bSpy).not.toBeCalled();

subs.forEach(sub => sub.unsubscribe());

expect(aSpy).toBeCalledTimes(1);
expect(aSpy).toBeCalledWith(cache);
expect(bSpy).toBeCalledTimes(1);
expect(bSpy).toBeCalledWith(cache);

expect(aResults).toEqual([]);
expect(bResults).toEqual([]);

expect(cache["watches"].size).toBe(0);
const abResults = watch(abQuery);
expect(abResults).toEqual([]);
expect(cache["watches"].size).toBe(1);

await wait();

expect(aResults).toEqual([]);
expect(bResults).toEqual([]);
expect(abResults).toEqual([
{ a: 123, b: "asdf" }
]);

client.stop();

await wait();

expect(aSpy).toBeCalledTimes(2);
expect(aSpy).toBeCalledWith(cache);
expect(bSpy).toBeCalledTimes(2);
expect(bSpy).toBeCalledWith(cache);

resolve();
});

describe('default settings', () => {
const query = gql`
query number {
Expand Down
51 changes: 51 additions & 0 deletions src/cache/inmemory/__tests__/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,57 @@ describe("ReactiveVar and makeVar", () => {
});
});

it("should forget cache once all watches are cancelled", () => {
const { cache, nameVar, query } = makeCacheAndVar(false);
const spy = jest.spyOn(nameVar, "forgetCache");

const diffs: Cache.DiffResult<any>[] = [];
const watch = () => cache.watch({
query,
optimistic: true,
immediate: true,
callback(diff) {
diffs.push(diff);
},
});

const unwatchers = [
watch(),
watch(),
watch(),
watch(),
watch(),
];

expect(diffs.length).toBe(5);

expect(cache["watches"].size).toBe(5);
expect(spy).not.toBeCalled();

unwatchers.pop()!();
expect(cache["watches"].size).toBe(4);
expect(spy).not.toBeCalled();

unwatchers.shift()!();
expect(cache["watches"].size).toBe(3);
expect(spy).not.toBeCalled();

unwatchers.pop()!();
expect(cache["watches"].size).toBe(2);
expect(spy).not.toBeCalled();

expect(diffs.length).toBe(5);
unwatchers.push(watch());
expect(diffs.length).toBe(6);

expect(unwatchers.length).toBe(3);
unwatchers.forEach(unwatch => unwatch());

expect(cache["watches"].size).toBe(0);
expect(spy).toBeCalledTimes(1);
expect(spy).toBeCalledWith(cache);
});

it("should broadcast only once for multiple reads of same variable", () => {
const nameVar = makeVar("Ben");
const cache = new InMemoryCache({
Expand Down
9 changes: 7 additions & 2 deletions src/cache/inmemory/inMemoryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
import { StoreReader } from './readFromStore';
import { StoreWriter } from './writeToStore';
import { EntityStore, supportsResultCaching } from './entityStore';
import { makeVar } from './reactiveVars';
import { makeVar, forgetCache } from './reactiveVars';
import {
defaultDataIdFromObject,
PossibleTypesMap,
Expand Down Expand Up @@ -199,7 +199,12 @@ export class InMemoryCache extends ApolloCache<NormalizedCacheObject> {
this.maybeBroadcastWatch(watch);
}
return () => {
this.watches.delete(watch);
// Once we remove the last watch from this.watches, cache.broadcastWatches
// no longer does anything, so we preemptively tell the reactive variable
// system to exclude this cache from future broadcasts.
if (this.watches.delete(watch) && !this.watches.size) {
forgetCache(this);
}
this.watchDep.dirty(watch);
// Remove this watch from the LRU cache managed by the
// maybeBroadcastWatch OptimisticWrapperFunction, to prevent memory
Expand Down
20 changes: 19 additions & 1 deletion src/cache/inmemory/reactiveVars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ApolloCache } from '../../core';
export interface ReactiveVar<T> {
(newValue?: T): T;
onNextChange(listener: ReactiveListener<T>): () => void;
forgetCache(cache: ApolloCache<any>): boolean;
}

export type ReactiveListener<T> = (value: T) => any;
Expand All @@ -27,6 +28,16 @@ function consumeAndIterate<T>(set: Set<T>, callback: (item: T) => any) {
items.forEach(callback);
}

const varsByCache = new WeakMap<ApolloCache<any>, Set<ReactiveVar<any>>>();

export function forgetCache(cache: ApolloCache<any>) {
const vars = varsByCache.get(cache);
if (vars) {
consumeAndIterate(vars, rv => rv.forgetCache(cache));
varsByCache.delete(cache);
}
}

export function makeVar<T>(value: T): ReactiveVar<T> {
const caches = new Set<ApolloCache<any>>();
const listeners = new Set<ReactiveListener<T>>();
Expand All @@ -50,7 +61,12 @@ export function makeVar<T>(value: T): ReactiveVar<T> {
// context via cacheSlot. This isn't entirely foolproof, but it's
// the same system that powers varDep.
const cache = cacheSlot.getValue();
if (cache) caches.add(cache);
if (cache) {
caches.add(cache);
let vars = varsByCache.get(cache)!;
if (!vars) varsByCache.set(cache, vars = new Set);
vars.add(rv);
}
varDep(rv);
}

Expand All @@ -64,6 +80,8 @@ export function makeVar<T>(value: T): ReactiveVar<T> {
};
};

rv.forgetCache = cache => caches.delete(cache);

return rv;
}

Expand Down

0 comments on commit 49e68a3

Please sign in to comment.