Skip to content

Commit

Permalink
rename messageStore table to messageStoreMessages and eventLog
Browse files Browse the repository at this point in the history
…to `eventLogMessages`. Also renamed `eventLogWatermark` to `eventWatermark` field in the `eventLogRecordsTags` table.
  • Loading branch information
LiranCohen committed Apr 23, 2024
1 parent d6a61d2 commit 7cd7848
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
18 changes: 9 additions & 9 deletions src/event-log-sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class EventLogSql implements EventLog {

constructor(dialect: Dialect) {
this.#dialect = dialect;
this.#tags = new TagTables(dialect, 'eventLog');
this.#tags = new TagTables(dialect, 'eventLogMessages');
}

async open(): Promise<void> {
Expand All @@ -24,7 +24,7 @@ export class EventLogSql implements EventLog {

this.#db = new Kysely<DwnDatabaseType>({ dialect: this.#dialect });
let createTable = this.#db.schema
.createTable('eventLog')
.createTable('eventLogMessages')
.ifNotExists()
.addColumn('tenant', 'text', (col) => col.notNull())
.addColumn('messageCid', 'varchar(60)', (col) => col.notNull())
Expand Down Expand Up @@ -69,7 +69,7 @@ export class EventLogSql implements EventLog {
// Add columns that have dialect-specific constraints
createTable = this.#dialect.addAutoIncrementingColumn(createTable, 'watermark', (col) => col.primaryKey());
createRecordsTagsTable = this.#dialect.addAutoIncrementingColumn(createRecordsTagsTable, 'id', (col) => col.primaryKey());
createRecordsTagsTable = this.#dialect.addReferencedColumn(createRecordsTagsTable, 'eventLogRecordsTags', 'eventLogWatermark', 'integer', 'eventLog', 'watermark', 'cascade');
createRecordsTagsTable = this.#dialect.addReferencedColumn(createRecordsTagsTable, 'eventLogRecordsTags', 'eventWatermark', 'integer', 'eventLogMessages', 'watermark', 'cascade');

await createTable.execute();
await createRecordsTagsTable.execute();
Expand Down Expand Up @@ -120,7 +120,7 @@ export class EventLogSql implements EventLog {

// we use the dialect-specific `insertThenReturnId` in order to be able to extract the `insertId`
const result = await this.#dialect
.insertThenReturnId(tx, 'eventLog', eventIndexValues, 'watermark as insertId')
.insertThenReturnId(tx, 'eventLogMessages', eventIndexValues, 'watermark as insertId')
.executeTakeFirstOrThrow();

// if tags exist, we execute those within the transaction associating them with the `insertId`.
Expand Down Expand Up @@ -151,8 +151,8 @@ export class EventLogSql implements EventLog {
}

let query = this.#db
.selectFrom('eventLog')
.leftJoin('eventLogRecordsTags', 'eventLogRecordsTags.eventLogWatermark', 'eventLog.watermark')
.selectFrom('eventLogMessages')
.leftJoin('eventLogRecordsTags', 'eventLogRecordsTags.eventWatermark', 'eventLogMessages.watermark')
.select('messageCid')
.distinct()
.select('watermark')
Expand All @@ -164,7 +164,7 @@ export class EventLogSql implements EventLog {
}

if(cursor !== undefined) {
// eventLog in the sql store uses the watermark cursor value which is a number in SQL
// eventLogMessages in the sql store uses the watermark cursor value which is a number in SQL
// if not we will return empty results
const cursorValue = cursor.value as number;
const cursorMessageCid = cursor.messageCid;
Expand Down Expand Up @@ -211,7 +211,7 @@ export class EventLogSql implements EventLog {
}

await this.#db
.deleteFrom('eventLog')
.deleteFrom('eventLogMessages')
.where('tenant', '=', tenant)
.where('messageCid', 'in', messageCids)
.execute();
Expand All @@ -225,7 +225,7 @@ export class EventLogSql implements EventLog {
}

await this.#db
.deleteFrom('eventLog')
.deleteFrom('eventLogMessages')
.execute();
}
}
18 changes: 9 additions & 9 deletions src/message-store-sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class MessageStoreSql implements MessageStore {

constructor(dialect: Dialect) {
this.#dialect = dialect;
this.#tags = new TagTables(dialect, 'messageStore');
this.#tags = new TagTables(dialect, 'messageStoreMessages');
}

async open(): Promise<void> {
Expand All @@ -40,7 +40,7 @@ export class MessageStoreSql implements MessageStore {

this.#db = new Kysely<DwnDatabaseType>({ dialect: this.#dialect });
let createTable = this.#db.schema
.createTable('messageStore')
.createTable('messageStoreMessages')
.ifNotExists()
.addColumn('tenant', 'varchar(255)', (col) => col.notNull())
.addColumn('messageCid', 'varchar(60)', (col) => col.notNull())
Expand Down Expand Up @@ -88,7 +88,7 @@ export class MessageStoreSql implements MessageStore {
createTable = this.#dialect.addAutoIncrementingColumn(createTable, 'id', (col) => col.primaryKey());
createTable = this.#dialect.addBlobColumn(createTable, 'encodedMessageBytes', (col) => col.notNull());
createRecordsTagsTable = this.#dialect.addAutoIncrementingColumn(createRecordsTagsTable, 'id', (col) => col.primaryKey());
createRecordsTagsTable = this.#dialect.addReferencedColumn(createRecordsTagsTable, 'messageStoreRecordsTags', 'messageInsertId', 'integer', 'messageStore', 'id', 'cascade');
createRecordsTagsTable = this.#dialect.addReferencedColumn(createRecordsTagsTable, 'messageStoreRecordsTags', 'messageInsertId', 'integer', 'messageStoreMessages', 'id', 'cascade');

await createTable.execute();
await createRecordsTagsTable.execute();
Expand Down Expand Up @@ -171,7 +171,7 @@ export class MessageStoreSql implements MessageStore {

// we use the dialect-specific `insertThenReturnId` in order to be able to extract the `insertId`
const result = await this.#dialect
.insertThenReturnId(tx, 'messageStore', messageIndexValues, 'id as insertId')
.insertThenReturnId(tx, 'messageStoreMessages', messageIndexValues, 'id as insertId')
.executeTakeFirstOrThrow();

// if tags exist, we execute those within the transaction associating them with the `insertId`.
Expand All @@ -197,7 +197,7 @@ export class MessageStoreSql implements MessageStore {

const result = await executeUnlessAborted(
this.#db
.selectFrom('messageStore')
.selectFrom('messageStoreMessages')
.selectAll()
.where('tenant', '=', tenant)
.where('messageCid', '=', cid)
Expand Down Expand Up @@ -231,8 +231,8 @@ export class MessageStoreSql implements MessageStore {
const { property: sortProperty, direction: sortDirection } = this.extractSortProperties(messageSort);

let query = this.#db
.selectFrom('messageStore')
.leftJoin('messageStoreRecordsTags', 'messageStoreRecordsTags.messageInsertId', 'messageStore.id')
.selectFrom('messageStoreMessages')
.leftJoin('messageStoreRecordsTags', 'messageStoreRecordsTags.messageInsertId', 'messageStoreMessages.id')
.select('messageCid')
.distinct()
.select([
Expand Down Expand Up @@ -294,7 +294,7 @@ export class MessageStoreSql implements MessageStore {

await executeUnlessAborted(
this.#db
.deleteFrom('messageStore')
.deleteFrom('messageStoreMessages')
.where('tenant', '=', tenant)
.where('messageCid', '=', cid)
.execute(),
Expand All @@ -310,7 +310,7 @@ export class MessageStoreSql implements MessageStore {
}

await this.#db
.deleteFrom('messageStore')
.deleteFrom('messageStoreMessages')
.execute();
}

Expand Down
6 changes: 3 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type MessageStoreRecordsTagsTable = {
type EventLogRecordsTagsTable = {
id: Generated<number>;
tag: string;
eventLogWatermark: number;
eventWatermark: number;
valueString: string | null;
valueNumber: number | null;
};
Expand All @@ -102,9 +102,9 @@ type DataStoreTable = {
}

export type DwnDatabaseType = {
eventLog: EventLogTable;
eventLogMessages: EventLogTable;
eventLogRecordsTags: EventLogRecordsTagsTable;
messageStore: MessageStoreTable;
messageStoreMessages: MessageStoreTable;
messageStoreRecordsTags: MessageStoreRecordsTagsTable;
dataStore: DataStoreTable;
}
8 changes: 4 additions & 4 deletions src/utils/tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export class TagTables {

/**
* @param dialect the target dialect, necessary for returning the `insertId`
* @param table the DB Table in order to index the tags and values in the correct tables. Choice between `messageStore` and `eventLog`
* @param table the DB Table in order to index the tags and values in the correct tables. Choice between `messageStoreMessages` and `eventLogMessages`
*/
constructor(private dialect: Dialect, private table: 'messageStore' | 'eventLog'){}
constructor(private dialect: Dialect, private table: 'messageStoreMessages' | 'eventLogMessages'){}

/**
* Inserts the given tags associated with the given foreign `insertId`.
Expand All @@ -24,8 +24,8 @@ export class TagTables {
tags: KeyValues,
tx: Transaction<DwnDatabaseType>,
):Promise<void> {
const tagTable = this.table === 'messageStore' ? 'messageStoreRecordsTags' : 'eventLogRecordsTags';
const foreignKeyReference = tagTable === 'messageStoreRecordsTags' ? { messageInsertId: foreignInsertId } : { eventLogWatermark: foreignInsertId };
const tagTable = this.table === 'messageStoreMessages' ? 'messageStoreRecordsTags' : 'eventLogRecordsTags';
const foreignKeyReference = tagTable === 'messageStoreRecordsTags' ? { messageInsertId: foreignInsertId } : { eventWatermark: foreignInsertId };

for (const tag in tags) {
const tagValues = tags[tag];
Expand Down

0 comments on commit 7cd7848

Please sign in to comment.