Skip to content

Commit

Permalink
fix: correctly remove changes from pendingChanges queue when sending …
Browse files Browse the repository at this point in the history
…a snapshot or update
  • Loading branch information
nikgraf committed Oct 13, 2023
1 parent 7b6b764 commit 0d27125
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,8 @@ test("should increase context._snapshotSaveFailedCounter on every snapshot-save-

if (state.context._snapshotSaveFailedCounter === 2) {
expect(state.context._snapshotSaveFailedCounter).toBe(2);
expect(state.context._snapshotInFlight?.changes.length).toBe(2);
expect(state.context._snapshotInFlight?.changes.length).toBe(0);
expect(state.context._pendingChangesQueue.length).toBe(2);
done();
}
});
Expand Down
64 changes: 48 additions & 16 deletions packages/secsync/src/createSyncMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ type ProcessQueueData = {
snapshotInfosWithUpdateClocks: SnapshotInfoWithUpdateClocks[];
updatesLocalClock: number;
updatesInFlight: UpdateInFlight[];
pendingChangesQueue: any[];
pendingChangesToRemoveCount: number;
pendingChangesToPrepend: any[];
ephemeralMessageReceivingErrors: Error[];
documentDecryptionState: DocumentDecryptionState;
ephemeralMessagesSession: EphemeralMessagesSession | null;
Expand Down Expand Up @@ -459,6 +460,9 @@ export const createSyncMachine = () =>
};
}),
resetContext: assign((context, event) => {
if (context.logging === "debug") {
console.log("resetContext");
}
let unconfirmedChanges = context._updatesInFlight.reduce(
(accumulator, updateInFlight) => {
return [...accumulator, ...updateInFlight.changes];
Expand Down Expand Up @@ -524,7 +528,15 @@ export const createSyncMachine = () =>
if (event.data.handledQueue === "incoming") {
return {
_incomingQueue: context._incomingQueue.slice(1),
_pendingChangesQueue: event.data.pendingChangesQueue,
// because changes might have been add while processing the queue ans creating a
// snapshot or update we can't just overwrite the _pendingChangesQueue
// instead we need to track how many to remove from the beginning of the list
// and of some should be restored also add them to the list
_pendingChangesQueue: event.data.pendingChangesToPrepend.concat(
context._pendingChangesQueue.slice(
event.data.pendingChangesToRemoveCount
)
),
_snapshotInfosWithUpdateClocks:
event.data.snapshotInfosWithUpdateClocks,
_snapshotInFlight: event.data.snapshotInFlight,
Expand All @@ -540,7 +552,15 @@ export const createSyncMachine = () =>
} else if (event.data.handledQueue === "customMessage") {
return {
_customMessageQueue: context._customMessageQueue.slice(1),
_pendingChangesQueue: event.data.pendingChangesQueue,
// because changes might have been add while processing the queue ans creating a
// snapshot or update we can't just overwrite the _pendingChangesQueue
// instead we need to track how many to remove from the beginning of the list
// and of some should be restored also add them to the list
_pendingChangesQueue: event.data.pendingChangesToPrepend.concat(
context._pendingChangesQueue.slice(
event.data.pendingChangesToRemoveCount
)
),
_snapshotInfosWithUpdateClocks:
event.data.snapshotInfosWithUpdateClocks,
_snapshotInFlight: event.data.snapshotInFlight,
Expand All @@ -555,7 +575,15 @@ export const createSyncMachine = () =>
};
} else if (event.data.handledQueue === "pending") {
return {
_pendingChangesQueue: event.data.pendingChangesQueue,
// because changes might have been add while processing the queue ans creating a
// snapshot or update we can't just overwrite the _pendingChangesQueue
// instead we need to track how many to remove from the beginning of the list
// and of some should be restored also add them to the list
_pendingChangesQueue: event.data.pendingChangesToPrepend.concat(
context._pendingChangesQueue.slice(
event.data.pendingChangesToRemoveCount
)
),
_snapshotInfosWithUpdateClocks:
event.data.snapshotInfosWithUpdateClocks,
_snapshotInFlight: event.data.snapshotInFlight,
Expand Down Expand Up @@ -635,12 +663,13 @@ export const createSyncMachine = () =>
let snapshotInFlight = context._snapshotInFlight;
let updatesLocalClock = context._updatesLocalClock;
let updatesInFlight = context._updatesInFlight;
let pendingChangesQueue = context._pendingChangesQueue;
let documentDecryptionState = context._documentDecryptionState;
let ephemeralMessagesSession = context._ephemeralMessagesSession;
let errorCausingDocumentToFail: Error | null = null;
let errorNotCausingDocumentToFail: Error | null = null;
let snapshotSaveFailedCounter = context._snapshotSaveFailedCounter;
let pendingChangesToRemoveCount = 0;
let pendingChangesToPrepend: any[] = [];

let ephemeralMessageReceivingErrors =
context._ephemeralMessageReceivingErrors;
Expand Down Expand Up @@ -713,6 +742,8 @@ export const createSyncMachine = () =>
context.sodium
);

pendingChangesToRemoveCount =
context._pendingChangesQueue.length;
snapshotInFlight = {
updateClocks: {},
snapshotId: snapshot.publicData.snapshotId,
Expand All @@ -726,7 +757,6 @@ export const createSyncMachine = () =>
changes: context._pendingChangesQueue,
additionalPublicData,
};
pendingChangesQueue = [];

send({
type: "SEND",
Expand Down Expand Up @@ -761,6 +791,8 @@ export const createSyncMachine = () =>
context.sodium
);

pendingChangesToRemoveCount =
context._pendingChangesQueue.length;
snapshotInFlight = {
updateClocks: {},
snapshotId: snapshot.publicData.snapshotId,
Expand All @@ -774,7 +806,6 @@ export const createSyncMachine = () =>
changes: context._pendingChangesQueue,
additionalPublicData,
};
pendingChangesQueue = [];

send({
type: "SEND",
Expand Down Expand Up @@ -1340,9 +1371,7 @@ export const createSyncMachine = () =>
}

// put changes from the failed snapshot back in the queue
pendingChangesQueue = (
snapshotInFlight?.changes || []
).concat(pendingChangesQueue);
pendingChangesToPrepend = snapshotInFlight?.changes || [];
snapshotInFlight = null;

if (context.logging === "debug") {
Expand Down Expand Up @@ -1405,10 +1434,11 @@ export const createSyncMachine = () =>
const changes = updatesInFlight.reduce(
(acc, updateInFlight) =>
acc.concat(updateInFlight.changes),
[] as unknown[]
[] as any[]
);
updatesInFlight = [];
pendingChangesQueue = changes.concat(pendingChangesQueue);
// put the changes from the failed updated and after back to the queue
pendingChangesToPrepend = changes;

const currentClientPublicKey = context.sodium.to_base64(
context.signatureKeyPair.publicKey
Expand Down Expand Up @@ -1575,10 +1605,11 @@ export const createSyncMachine = () =>
if (context.logging === "debug") {
console.debug("send update");
}
const rawChanges = context._pendingChangesQueue;
pendingChangesQueue = [];
pendingChangesToRemoveCount =
context._pendingChangesQueue.length;

await createAndSendUpdate(
rawChanges,
context._pendingChangesQueue,
activeSnapshotInfoWithUpdateClocks,
updatesLocalClock
);
Expand All @@ -1595,7 +1626,8 @@ export const createSyncMachine = () =>
snapshotInFlight,
updatesLocalClock,
updatesInFlight,
pendingChangesQueue,
pendingChangesToPrepend,
pendingChangesToRemoveCount,
ephemeralMessageReceivingErrors:
ephemeralMessageReceivingErrors.slice(0, 20), // avoid a memory leak by storing max 20 errors
documentDecryptionState,
Expand Down

0 comments on commit 0d27125

Please sign in to comment.