diff --git a/src/LibsqlEventStore.ts b/src/LibsqlEventStore.ts index 439857b97..42f7ee44d 100644 --- a/src/LibsqlEventStore.ts +++ b/src/LibsqlEventStore.ts @@ -96,76 +96,80 @@ export const make: Effect.Effect (...events) => - Effect.reduce(events, lastKnownVersion, (lastKnownVersion, event) => - Effect.gen(function* () { - const newResourceVersion = lastKnownVersion + 1 - const eventId = yield* generateUuid - const eventTimestamp = yield* DateTime.now + sql + .withTransaction( + Effect.reduce(events, lastKnownVersion, (lastKnownVersion, event) => + Effect.gen(function* () { + const newResourceVersion = lastKnownVersion + 1 + const eventId = yield* generateUuid + const eventTimestamp = yield* DateTime.now - const encoded = yield* Schema.encode(EventsTable)({ - eventId, - resourceType: 'Feedback', - resourceId, - resourceVersion: newResourceVersion, - eventType: event._tag, - eventTimestamp, - payload: event, - }) + const encoded = yield* Schema.encode(EventsTable)({ + eventId, + resourceType: 'Feedback', + resourceId, + resourceVersion: newResourceVersion, + eventType: event._tag, + eventTimestamp, + payload: event, + }) - const results = yield* pipe( - sql` - INSERT INTO - events ( - event_id, - resource_type, - resource_id, - resource_version, - event_type, - event_timestamp, - payload - ) - SELECT - ${encoded.event_id}, - ${encoded.resource_type}, - ${encoded.resource_id}, - ${encoded.resource_version}, - ${encoded.event_type}, - ${encoded.event_timestamp}, - ${encoded.payload} - WHERE - NOT EXISTS ( + const results = yield* pipe( + sql` + INSERT INTO + events ( + event_id, + resource_type, + resource_id, + resource_version, + event_type, + event_timestamp, + payload + ) SELECT - event_id - FROM - events + ${encoded.event_id}, + ${encoded.resource_type}, + ${encoded.resource_id}, + ${encoded.resource_version}, + ${encoded.event_type}, + ${encoded.event_timestamp}, + ${encoded.payload} WHERE - resource_type = ${encoded.resource_type} - AND resource_id = ${encoded.resource_id} - AND resource_version >= ${encoded.resource_version} - ) + NOT EXISTS ( + SELECT + event_id + FROM + events + WHERE + resource_type = ${encoded.resource_type} + AND resource_id = ${encoded.resource_id} + AND resource_version >= ${encoded.resource_version} + ) `.raw, - Effect.andThen(Schema.decodeUnknown(LibsqlResults)), - ) + Effect.andThen(Schema.decodeUnknown(LibsqlResults)), + ) - if (results.rowsAffected !== 1) { - yield* new EventStore.ResourceHasChanged() - } + if (results.rowsAffected !== 1) { + yield* new EventStore.ResourceHasChanged() + } - return newResourceVersion - }), - ).pipe( - Effect.tapError(error => - Effect.annotateLogs(Effect.logError('Unable to commit events'), { - error, - resourceId, - resourceType: 'Feedback', + return newResourceVersion + }), + ), + ) + .pipe( + Effect.tapError(error => + Effect.annotateLogs(Effect.logError('Unable to commit events'), { + error, + resourceId, + resourceType: 'Feedback', + }), + ), + Effect.catchTags({ + SqlError: () => new EventStore.FailedToCommitEvent(), + ParseError: () => new EventStore.FailedToCommitEvent(), }), - ), - Effect.catchTags({ - SqlError: () => new EventStore.FailedToCommitEvent(), - ParseError: () => new EventStore.FailedToCommitEvent(), - }), - ) + ) return { getAllEvents, getEvents, commitEvents } })