Skip to content

Commit

Permalink
Renable transactions
Browse files Browse the repository at this point in the history
Reverts e87ccdb; refs #1973, #2007
  • Loading branch information
thewilkybarkid committed Oct 9, 2024
1 parent be8a690 commit 3acf814
Showing 1 changed file with 66 additions and 62 deletions.
128 changes: 66 additions & 62 deletions src/LibsqlEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,76 +96,80 @@ export const make: Effect.Effect<EventStore.EventStore, SqlError.SqlError, SqlCl
const commitEvents: EventStore.EventStore['commitEvents'] =
(resourceId, lastKnownVersion) =>
(...events) =>
Effect.reduce(events, lastKnownVersion, (lastKnownVersion, event) =>
Effect.gen(function* () {
const newResourceVersion = lastKnownVersion + 1
const eventId = yield* generateUuid
const eventTimestamp = yield* DateTime.now
sql
.withTransaction(
Effect.reduce(events, lastKnownVersion, (lastKnownVersion, event) =>
Effect.gen(function* () {
const newResourceVersion = lastKnownVersion + 1
const eventId = yield* generateUuid
const eventTimestamp = yield* DateTime.now

const encoded = yield* Schema.encode(EventsTable)({
eventId,
resourceType: 'Feedback',
resourceId,
resourceVersion: newResourceVersion,
eventType: event._tag,
eventTimestamp,
payload: event,
})
const encoded = yield* Schema.encode(EventsTable)({
eventId,
resourceType: 'Feedback',
resourceId,
resourceVersion: newResourceVersion,
eventType: event._tag,
eventTimestamp,
payload: event,
})

const results = yield* pipe(
sql`
INSERT INTO
events (
event_id,
resource_type,
resource_id,
resource_version,
event_type,
event_timestamp,
payload
)
SELECT
${encoded.event_id},
${encoded.resource_type},
${encoded.resource_id},
${encoded.resource_version},
${encoded.event_type},
${encoded.event_timestamp},
${encoded.payload}
WHERE
NOT EXISTS (
const results = yield* pipe(
sql`
INSERT INTO
events (
event_id,
resource_type,
resource_id,
resource_version,
event_type,
event_timestamp,
payload
)
SELECT
event_id
FROM
events
${encoded.event_id},
${encoded.resource_type},
${encoded.resource_id},
${encoded.resource_version},
${encoded.event_type},
${encoded.event_timestamp},
${encoded.payload}
WHERE
resource_type = ${encoded.resource_type}
AND resource_id = ${encoded.resource_id}
AND resource_version >= ${encoded.resource_version}
)
NOT EXISTS (
SELECT
event_id
FROM
events
WHERE
resource_type = ${encoded.resource_type}
AND resource_id = ${encoded.resource_id}
AND resource_version >= ${encoded.resource_version}
)
`.raw,
Effect.andThen(Schema.decodeUnknown(LibsqlResults)),
)
Effect.andThen(Schema.decodeUnknown(LibsqlResults)),
)

if (results.rowsAffected !== 1) {
yield* new EventStore.ResourceHasChanged()
}
if (results.rowsAffected !== 1) {
yield* new EventStore.ResourceHasChanged()
}

return newResourceVersion
}),
).pipe(
Effect.tapError(error =>
Effect.annotateLogs(Effect.logError('Unable to commit events'), {
error,
resourceId,
resourceType: 'Feedback',
return newResourceVersion
}),
),
)
.pipe(
Effect.tapError(error =>
Effect.annotateLogs(Effect.logError('Unable to commit events'), {
error,
resourceId,
resourceType: 'Feedback',
}),
),
Effect.catchTags({
SqlError: () => new EventStore.FailedToCommitEvent(),
ParseError: () => new EventStore.FailedToCommitEvent(),
}),
),
Effect.catchTags({
SqlError: () => new EventStore.FailedToCommitEvent(),
ParseError: () => new EventStore.FailedToCommitEvent(),
}),
)
)

return { getAllEvents, getEvents, commitEvents }
})
Expand Down

0 comments on commit 3acf814

Please sign in to comment.