Skip to content

Commit

Permalink
Resolve #1831 - make sure liveQueries always run in separate macro ta…
Browse files Browse the repository at this point in the history
…sks.
  • Loading branch information
dfahlander committed Dec 11, 2023
1 parent f088a7d commit 7c13770
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/helpers/promise.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export function incrementExpectedAwaits(): number;
export function decrementExpectedAwaits(sourceTaskId?: number): void;
export function beginMicroTickScope(): boolean;
export function endMicroTickScope(): void;
export function execInGlobalContext(cb: Function): void;
export declare var DexiePromise : DexiePromiseConstructor;

export default DexiePromise;
18 changes: 16 additions & 2 deletions src/helpers/promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ export function wrap (fn, errorCatcher) {
const task = { awaits: 0, echoes: 0, id: 0}; // The ongoing macro-task when using zone-echoing.
var taskCounter = 0; // ID counter for macro tasks.
var zoneStack = []; // Stack of left zones to restore asynchronically.
var zoneEchoes = 0; // zoneEchoes is a must in order to persist zones between native await expressions.
var zoneEchoes = 0; // When > 0, zoneLeaveEcho is queued. When 0 and task.echoes is also 0, nothing is queued.
var totalEchoes = 0; // ID counter for micro-tasks. Used to detect possible native await in our Promise.prototype.then.


Expand Down Expand Up @@ -724,8 +724,9 @@ export function onPossibleParallellAsync (possiblePromise) {
function zoneEnterEcho(targetZone) {
++totalEchoes;
//console.log("Total echoes ", totalEchoes);
//if (task.echoes === 1) console.warn("Cancelling echoing of async context.");
if (!task.echoes || --task.echoes === 0) {
task.echoes = task.id = 0; // Cancel zone echoing.
task.echoes = task.awaits = task.id = 0; // Cancel echoing.
}

zoneStack.push(PSD);
Expand Down Expand Up @@ -829,6 +830,19 @@ function getPatchedPromiseThen (origThen, zone) {
};
}

/** Execute callback in global context */
export function execInGlobalContext(cb) {
if (Promise === NativePromise && task.echoes === 0) {
if (zoneEchoes === 0) {
cb();
} else {
enqueueNativeMicroTask(cb);
}
} else {
setTimeout(cb, 0);
}
}

export var rejection = DexiePromise.reject;

export {DexiePromise};
50 changes: 23 additions & 27 deletions src/live-query/live-query.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isAsyncFunction, keys, objectIsEmpty } from '../functions/utils';
import { _global, isAsyncFunction, keys, objectIsEmpty } from '../functions/utils';
import {
globalEvents,
DEXIE_STORAGE_MUTATED_EVENT_NAME,
Expand All @@ -7,6 +7,7 @@ import {
beginMicroTickScope,
decrementExpectedAwaits,
endMicroTickScope,
execInGlobalContext,
incrementExpectedAwaits,
NativePromise,
newScope,
Expand All @@ -33,6 +34,8 @@ export interface LiveQueryContext {
querier: Function; // For debugging purposes and Error messages
}

let liveQueryCounter = 0;

export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
let hasValue = false;
let currentValue: T;
Expand All @@ -41,6 +44,7 @@ export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
function execute(ctx: LiveQueryContext) {
const wasRootExec = beginMicroTickScope(); // Performance: Avoid starting a new microtick scope within the async context.
try {
console.log("LiveQuery ID: " + ++liveQueryCounter);
if (scopeFuncIsAsync) {
incrementExpectedAwaits();
}
Expand All @@ -50,21 +54,7 @@ export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
// This fixes zone leaking issue that the liveQuery zone can leak to observer's next microtask.
rv = (rv as Promise<any>).finally(decrementExpectedAwaits);
}
return Promise.resolve(rv).finally(()=>{
if (PSD.subscr === ctx.subscr) {
// Querier did not await all code paths. We must wait for the next macrotask to run in order to
// escape from zone echoing. Warn to console so that app code can be corrected. liveQuery callbacks
// shall be pure functions and should never spawn side effects - so there is never a need to call
// other async functions or generated promises without awaiting them.
console.warn(`Dexie liveQuery()'s querier callback did'nt await all of its spawned promises. Querier source: ${querier}`);
return new NativePromise(resolve => setTimeout(resolve, 0)); // Wait for the next macrotask to run.
// @ts-ignore
} else if (Promise.PSD) {
// Still in async context (zone echoing) from another task. Wait for the next macrotask to run.
// @ts-ignore
return new NativePromise(resolve => setTimeout(resolve, 0)); // Wait for the next macrotask to run.
}
});
return rv;
} finally {
wasRootExec && endMicroTickScope(); // Given that we created the microtick scope, we must also end it.
}
Expand Down Expand Up @@ -92,6 +82,8 @@ export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {

let startedListening = false;

const doQuery = () => execInGlobalContext(_doQuery);

function shouldNotify() {
return obsSetsOverlap(currentObs, accumMuts);
}
Expand Down Expand Up @@ -132,7 +124,7 @@ export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
(result) => {
hasValue = true;
currentValue = result;
if (closed || ctx.signal.aborted) {
if (closed || ctx.signal.aborted) {
// closed - no subscriber anymore.
// signal.aborted - new query was made before this one completed and
// the querier might have catched AbortError and return successful result.
Expand All @@ -148,25 +140,29 @@ export function liveQuery<T>(querier: () => T | Promise<T>): IObservable<T> {
globalEvents(DEXIE_STORAGE_MUTATED_EVENT_NAME, mutationListener);
startedListening = true;
}
observer.next && observer.next(result);
execInGlobalContext(()=>!closed && observer.next && observer.next(result));
},
(err) => {
hasValue = false;
if (!['DatabaseClosedError', 'AbortError'].includes(err?.name)) {
if (closed) return;
observer.error && observer.error(err);
if (!closed) execInGlobalContext(()=>{
if (closed) return;
observer.error && observer.error(err);
});
}
}
);
};

const doQuery = () => {
// @ts-ignore
if (PSD.global && !Promise.PSD) _doQuery();
else setTimeout(_doQuery, 0);
}

doQuery();
// Use setTimeot here to guarantee execution in a private macro task before and
// after. The helper executeInGlobalContext(_doQuery) is not enough here because
// caller of `subscribe()` could be anything, such as a frontend framework that will
// continue in the same tick after subscribe() is called and call other
// eftects, that could involve dexie operations such as writing to the DB.
// If that happens, the private zone echoes from a live query tast started here
// could still be ongoing when the other operations start and make them inherit
// the async context from a live query.
setTimeout(doQuery, 0);
return subscription;
});
observable.hasValue = () => hasValue;
Expand Down

0 comments on commit 7c13770

Please sign in to comment.