Skip to content

Commit

Permalink
add-retry-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nikgraf committed Sep 27, 2023
1 parent 762ea62 commit 0705167
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 130 deletions.
150 changes: 85 additions & 65 deletions examples/backend/src/database/createSnapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,85 +3,105 @@ import {
CreateSnapshotParams,
SecsyncSnapshotBasedOnOutdatedSnapshotError,
SecsyncSnapshotMissesUpdatesError,
Snapshot,
compareUpdateClocks,
hash,
} from "secsync";
import { serializeSnapshot } from "../utils/serialize";
import { Prisma, prisma } from "./prisma";

export async function createSnapshot({ snapshot }: CreateSnapshotParams) {
return await prisma.$transaction(
async (prisma) => {
const document = await prisma.document.findUniqueOrThrow({
where: { id: snapshot.publicData.docId },
select: {
activeSnapshot: true,
},
});
const MAX_RETRIES = 5;
let retries = 0;
let result: Snapshot;

// function sleep(ms) {
// return new Promise((resolve) => setTimeout(resolve, ms));
// }
// await sleep(3000);
// use retries approach as described here: https://www.prisma.io/docs/concepts/components/prisma-client/transactions#transaction-timing-issues
while (retries < MAX_RETRIES) {
try {
result = await prisma.$transaction(
async (prisma) => {
const document = await prisma.document.findUniqueOrThrow({
where: { id: snapshot.publicData.docId },
select: {
activeSnapshot: true,
},
});

// const random = Math.floor(Math.random() * 10);
// if (random < 8) {
// throw new SecsyncSnapshotBasedOnOutdatedSnapshotError(
// "Snapshot is out of date."
// );
// }
// function sleep(ms) {
// return new Promise((resolve) => setTimeout(resolve, ms));
// }
// await sleep(3000);

// const random = Math.floor(Math.random() * 10);
// if (random < 8) {
// throw new SecsyncSnapshotMissesUpdatesError(
// "Snapshot does not include the latest changes."
// );
// }
// const random = Math.floor(Math.random() * 10);
// if (random < 8) {
// throw new SecsyncSnapshotBasedOnOutdatedSnapshotError(
// "Snapshot is out of date."
// );
// }

if (document.activeSnapshot) {
if (
snapshot.publicData.parentSnapshotId !== undefined &&
snapshot.publicData.parentSnapshotId !== document.activeSnapshot.id
) {
throw new SecsyncSnapshotBasedOnOutdatedSnapshotError(
"Snapshot is out of date."
);
}
// const random = Math.floor(Math.random() * 10);
// if (random < 8) {
// throw new SecsyncSnapshotMissesUpdatesError(
// "Snapshot does not include the latest changes."
// );
// }

const compareUpdateClocksResult = compareUpdateClocks(
// @ts-expect-error the values are parsed by the function
document.activeSnapshot.clocks,
snapshot.publicData.parentSnapshotUpdateClocks
);
if (document.activeSnapshot) {
if (
snapshot.publicData.parentSnapshotId !== undefined &&
snapshot.publicData.parentSnapshotId !==
document.activeSnapshot.id
) {
throw new SecsyncSnapshotBasedOnOutdatedSnapshotError(
"Snapshot is out of date."
);
}

if (!compareUpdateClocksResult.equal) {
throw new SecsyncSnapshotMissesUpdatesError(
"Snapshot does not include the latest changes."
);
}
}
const compareUpdateClocksResult = compareUpdateClocks(
// @ts-expect-error the values are parsed by the function
document.activeSnapshot.clocks,
snapshot.publicData.parentSnapshotUpdateClocks
);

const newSnapshot = await prisma.snapshot.create({
data: {
id: snapshot.publicData.snapshotId,
latestVersion: 0,
data: JSON.stringify(snapshot),
ciphertextHash: hash(snapshot.ciphertext, sodium),
activeSnapshotDocument: {
connect: { id: snapshot.publicData.docId },
},
document: { connect: { id: snapshot.publicData.docId } },
clocks: {},
parentSnapshotProof: snapshot.publicData.parentSnapshotProof,
parentSnapshotUpdateClocks:
snapshot.publicData.parentSnapshotUpdateClocks,
},
});
if (!compareUpdateClocksResult.equal) {
throw new SecsyncSnapshotMissesUpdatesError(
"Snapshot does not include the latest changes."
);
}
}

return serializeSnapshot(newSnapshot);
},
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
const newSnapshot = await prisma.snapshot.create({
data: {
id: snapshot.publicData.snapshotId,
latestVersion: 0,
data: JSON.stringify(snapshot),
ciphertextHash: hash(snapshot.ciphertext, sodium),
activeSnapshotDocument: {
connect: { id: snapshot.publicData.docId },
},
document: { connect: { id: snapshot.publicData.docId } },
clocks: {},
parentSnapshotProof: snapshot.publicData.parentSnapshotProof,
parentSnapshotUpdateClocks:
snapshot.publicData.parentSnapshotUpdateClocks,
},
});

return serializeSnapshot(newSnapshot);
},
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
}
);
break;
} catch (error) {
if (error.code === "P2034") {
retries++;
continue;
}
throw error;
}
);
}

return result;
}
151 changes: 86 additions & 65 deletions examples/backend/src/database/createUpdate.ts
Original file line number Diff line number Diff line change
@@ -1,79 +1,100 @@
import { CreateUpdateParams } from "secsync";
import { CreateUpdateParams, Update } from "secsync";
import { Prisma } from "../../prisma/generated/output";
import { serializeUpdate } from "../utils/serialize";
import { prisma } from "./prisma";

export async function createUpdate({ update }: CreateUpdateParams) {
return await prisma.$transaction(
async (prisma) => {
const snapshot = await prisma.snapshot.findUniqueOrThrow({
where: { id: update.publicData.refSnapshotId },
select: {
latestVersion: true,
clocks: true,
document: { select: { activeSnapshotId: true } },
},
});
if (
snapshot.document.activeSnapshotId !== update.publicData.refSnapshotId
) {
throw new Error("Update referencing an out of date snapshot.");
}
const MAX_RETRIES = 5;
let retries = 0;
let result: Update;

if (
snapshot.clocks &&
typeof snapshot.clocks === "object" &&
!Array.isArray(snapshot.clocks)
) {
if (snapshot.clocks[update.publicData.pubKey] === undefined) {
if (update.publicData.clock !== 0) {
throw new Error(
`Update clock incorrect. Clock: ${update.publicData.clock}, but should be 0`
);
// use retries approach as described here: https://www.prisma.io/docs/concepts/components/prisma-client/transactions#transaction-timing-issues
while (retries < MAX_RETRIES) {
try {
result = await prisma.$transaction(
async (prisma) => {
const snapshot = await prisma.snapshot.findUniqueOrThrow({
where: { id: update.publicData.refSnapshotId },
select: {
latestVersion: true,
clocks: true,
document: { select: { activeSnapshotId: true } },
},
});
if (
snapshot.document.activeSnapshotId !==
update.publicData.refSnapshotId
) {
throw new Error("Update referencing an out of date snapshot.");
}
// update the clock for the public key
snapshot.clocks[update.publicData.pubKey] = update.publicData.clock;
} else {
const expectedClockValue =
// @ts-expect-error
snapshot.clocks[update.publicData.pubKey] + 1;
if (expectedClockValue !== update.publicData.clock) {
throw new Error(
`Update clock incorrect. Clock: ${update.publicData.clock}, but should be ${expectedClockValue}`
);

if (
snapshot.clocks &&
typeof snapshot.clocks === "object" &&
!Array.isArray(snapshot.clocks)
) {
if (snapshot.clocks[update.publicData.pubKey] === undefined) {
if (update.publicData.clock !== 0) {
throw new Error(
`Update clock incorrect. Clock: ${update.publicData.clock}, but should be 0`
);
}
// update the clock for the public key
snapshot.clocks[update.publicData.pubKey] =
update.publicData.clock;
} else {
const expectedClockValue =
// @ts-expect-error
snapshot.clocks[update.publicData.pubKey] + 1;
if (expectedClockValue !== update.publicData.clock) {
throw new Error(
`Update clock incorrect. Clock: ${update.publicData.clock}, but should be ${expectedClockValue}`
);
}
// update the clock for the public key
snapshot.clocks[update.publicData.pubKey] =
update.publicData.clock;
}
}
// update the clock for the public key
snapshot.clocks[update.publicData.pubKey] = update.publicData.clock;
}
}

await prisma.snapshot.update({
where: { id: update.publicData.refSnapshotId },
data: {
latestVersion: snapshot.latestVersion + 1,
clocks: snapshot.clocks as Prisma.JsonObject,
},
});
await prisma.snapshot.update({
where: { id: update.publicData.refSnapshotId },
data: {
latestVersion: snapshot.latestVersion + 1,
clocks: snapshot.clocks as Prisma.JsonObject,
},
});

return serializeUpdate(
await prisma.update.create({
data: {
id: `${update.publicData.refSnapshotId}-${update.publicData.pubKey}-${update.publicData.clock}`,
data: JSON.stringify(update),
version: snapshot.latestVersion + 1,
snapshot: {
connect: {
id: update.publicData.refSnapshotId,
return serializeUpdate(
await prisma.update.create({
data: {
id: `${update.publicData.refSnapshotId}-${update.publicData.pubKey}-${update.publicData.clock}`,
data: JSON.stringify(update),
version: snapshot.latestVersion + 1,
snapshot: {
connect: {
id: update.publicData.refSnapshotId,
},
},
clock: update.publicData.clock,
pubKey: update.publicData.pubKey,
},
},
clock: update.publicData.clock,
pubKey: update.publicData.pubKey,
},
})
})
);
},
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
}
);
},
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
break;
} catch (error) {
if (error.code === "P2034") {
retries++;
continue;
}
throw error;
}
);
}

return result;
}

0 comments on commit 0705167

Please sign in to comment.