Skip to content

Commit

Permalink
Add the ability to index tags from records. (#21)
Browse files Browse the repository at this point in the history
Implement indexing for the work in:
decentralized-identity/dwn-sdk-js#706

- Adds reference tables:
  - messageStoreRecordsTags
  - eventLogRecordsTags
- Adds helper methods to deal with adding foreign key constraints to all
3 dialects.
- Adds helper methods to deal with returning `insertedId` across all 3
dialects.

---------

Co-authored-by: Henry Tsai <[email protected]>
  • Loading branch information
LiranCohen and thehenrytsai authored Apr 24, 2024
1 parent 9a8fc50 commit a1d5a66
Show file tree
Hide file tree
Showing 12 changed files with 524 additions and 91 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"react-native": "./dist/esm/src/main.js",
"dependencies": {
"@ipld/dag-cbor": "^9.0.5",
"@tbd54566975/dwn-sdk-js": "0.2.22",
"@tbd54566975/dwn-sdk-js": "0.2.23",
"kysely": "0.26.3",
"multiformats": "12.0.1",
"readable-stream": "4.4.2"
Expand Down
54 changes: 53 additions & 1 deletion src/dialect/dialect.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import {
ColumnBuilderCallback,
ColumnDataType,
CreateTableBuilder,
Dialect as KyselyDialect
Dialect as KyselyDialect,
Kysely,
InsertObject,
InsertQueryBuilder,
Selection,
SelectExpression,
Transaction,
} from 'kysely';

export interface Dialect extends KyselyDialect {
Expand All @@ -19,4 +26,49 @@ export interface Dialect extends KyselyDialect {
callback?: ColumnBuilderCallback
): CreateTableBuilder<TB>;

/**
* This is a helper method to add a column with foreign key constraints.
* This is primarily useful because the `mySQL` dialect adds the constraints in a different way than `sqlite` and `postgres`.
*
* @param builder the CreateTableBuilder to add the column to.
* @param tableName the name of the table to add the column to.
* @param columnName the name of the column to add.
* @param columnType the type of the column to add.
* @param referenceTable the foreign table to reference.
* @param referenceColumnName the foreign column to reference.
* @param onDeleteAction the action to take when the referenced row is deleted.
*
* @returns {CreateTableBuilder} the CreateTableBuilder with the added column.
*/
addReferencedColumn<TB extends string>(
builder: CreateTableBuilder<TB & string>,
tableName: TB,
columnName: string,
columnType: ColumnDataType,
referenceTable: string,
referenceColumnName: string,
onDeleteAction: 'cascade' | 'no action' | 'restrict' | 'set null' | 'set default',
): CreateTableBuilder<TB & string>;

/**
* This is a helper method to return an `insertId` across all dialects after inserting values.
* `postgres` and `sqlite` both support the `returning` clause, however `mysql` does not and instead returns the last inserted id.
*
* @param db the Kysely DB object or a DB Transaction.
* @param table the table to insert into.
* @param values the values to insert.
* @param returning a string representing the generated key you'd like returned as an insertId.
*
* NOTE: the `returning` value must be formatted to return an insertId value.
* ex. if the generated key is `id` the string should be `id as insertId`.
* if the generated key is `watermark` the string should be `watermark as insertId`.
*
* @returns {InsertQueryBuilder} object to further modify the query or execute it.
*/
insertThenReturnId<DB, TB extends keyof DB = keyof DB, SE extends SelectExpression<DB, TB & string> = any>(
db: Transaction<DB> | Kysely<DB>,
table: TB & string,
values: InsertObject<DB, TB & string>,
returning: SE & `${string} as insertId`,
): InsertQueryBuilder<DB, TB & string, Selection<DB, TB & string, SE & `${string} as insertId`>>;
}
45 changes: 44 additions & 1 deletion src/dialect/mysql-dialect.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { Dialect } from './dialect.js';
import {
AnyColumn,
ColumnDataType,
CreateTableBuilder,
ColumnBuilderCallback,
MysqlDialect as KyselyMysqlDialect
InsertObject,
InsertQueryBuilder,
SelectExpression,
Selection,
Transaction,
MysqlDialect as KyselyMysqlDialect,
Kysely,
} from 'kysely';

export class MysqlDialect extends KyselyMysqlDialect implements Dialect {
Expand All @@ -29,4 +37,39 @@ export class MysqlDialect extends KyselyMysqlDialect implements Dialect {
): CreateTableBuilder<TB> {
return builder.addColumn(columnName, 'blob', callback);
}

/**
* In MySQL, the ForeignKey name it creates in `mysql` will be in the following format:
* `${referenceTable}_${referenceColumnName}__${tableName}_${columnName}`
* ex: if the reference table is `users` and the reference column is `id` and the table is `profiles` and the column is `userId`,
* the resulting name for the foreign key is: `users_id__profiles_userId`
*/
addReferencedColumn<TB extends string>(
builder: CreateTableBuilder<TB & string>,
tableName: TB,
columnName: string,
columnType: ColumnDataType,
referenceTable: string,
referenceColumnName: string,
onDeleteAction: 'cascade' | 'no action' | 'restrict' | 'set null' | 'set default',
): CreateTableBuilder<TB & string> {
return builder
.addColumn(columnName, columnType, (col) => col.notNull())
.addForeignKeyConstraint(
`${referenceTable}_${referenceColumnName}__${tableName}_${columnName}`,
[columnName],
referenceTable,
[referenceColumnName],
(constraint) => constraint.onDelete(onDeleteAction)
);
}

insertThenReturnId<DB, TB extends keyof DB = keyof DB, SE extends SelectExpression<DB, TB & string> = AnyColumn<DB, TB>>(
db: Transaction<DB> | Kysely<DB>,
table: TB & string,
values: InsertObject<DB, TB & string>,
_returning: SE & `${string} as insertId`,
): InsertQueryBuilder<DB, TB & string, Selection<DB, TB & string, SE & `${string} as insertId`>> {
return db.insertInto(table).values(values);
}
}
33 changes: 31 additions & 2 deletions src/dialect/postgres-dialect.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { Dialect } from './dialect.js';
import {
CreateTableBuilder,
ColumnDataType,
ColumnBuilderCallback,
PostgresDialect as KyselyPostgresDialect
CreateTableBuilder,
InsertObject,
InsertQueryBuilder,
Kysely,
PostgresDialect as KyselyPostgresDialect,
SelectExpression,
Selection,
Transaction,
} from 'kysely';

export class PostgresDialect extends KyselyPostgresDialect implements Dialect {
Expand All @@ -23,4 +30,26 @@ export class PostgresDialect extends KyselyPostgresDialect implements Dialect {
): CreateTableBuilder<TB> {
return builder.addColumn(columnName, 'bytea', callback);
}

addReferencedColumn<TB extends string>(
builder: CreateTableBuilder<TB & string>,
_tableName: TB,
columnName: string,
columnType: ColumnDataType,
referenceTable: string,
referenceColumnName: string,
onDeleteAction: 'cascade' | 'no action' | 'restrict' | 'set null' | 'set default',
): CreateTableBuilder<TB & string> {
return builder.addColumn(columnName, columnType, (col) => col.notNull().references(`${referenceTable}.${referenceColumnName}`).onDelete(onDeleteAction));
}

insertThenReturnId<DB, TB extends keyof DB = keyof DB, SE extends SelectExpression<DB, TB & string> = any>(
db: Transaction<DB> | Kysely<DB>,
table: TB & string,
values: InsertObject<DB, TB & string>,
returning: SE & `${string} as insertId`,
): InsertQueryBuilder<DB, TB & string, Selection<DB, TB & string, SE & `${string} as insertId`>> {
return db.insertInto(table).values(values).returning(returning);
}

}
28 changes: 28 additions & 0 deletions src/dialect/sqlite-dialect.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { Dialect } from './dialect.js';
import {
ColumnBuilderCallback,
ColumnDataType,
CreateTableBuilder,
Kysely,
InsertObject,
InsertQueryBuilder,
SelectExpression,
Selection,
SqliteDialect as KyselySqliteDialect,
Transaction,
} from 'kysely';

export class SqliteDialect extends KyselySqliteDialect implements Dialect {
Expand All @@ -29,4 +36,25 @@ export class SqliteDialect extends KyselySqliteDialect implements Dialect {
): CreateTableBuilder<TB> {
return builder.addColumn(columnName, 'blob', callback);
}

addReferencedColumn<TB extends string>(
builder: CreateTableBuilder<TB & string>,
_tableName: TB,
columnName: string,
columnType: ColumnDataType,
referenceTable: string,
referenceColumnName: string,
onDeleteAction: 'cascade' | 'no action' | 'restrict' | 'set null' | 'set default',
): CreateTableBuilder<TB & string> {
return builder.addColumn(columnName, columnType, (col) => col.notNull().references(`${referenceTable}.${referenceColumnName}`).onDelete(onDeleteAction));
}

insertThenReturnId<DB, TB extends keyof DB = keyof DB, SE extends SelectExpression<DB, TB & string> = any>(
db: Transaction<DB> | Kysely<DB>,
table: TB & string,
values: InsertObject<DB, TB & string>,
returning: SE & `${string} as insertId`,
): InsertQueryBuilder<DB, TB & string, Selection<DB, TB & string, SE & `${string} as insertId`>> {
return db.insertInto(table).values(values).returning(returning);
}
}
76 changes: 58 additions & 18 deletions src/event-log-sql.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import type { DwnDatabaseType } from './types.js';
import type { DwnDatabaseType, KeyValues } from './types.js';
import type { EventLog, Filter, PaginationCursor } from '@tbd54566975/dwn-sdk-js';

import { Dialect } from './dialect/dialect.js';
import { filterSelectQuery } from './utils/filter.js';
import { Kysely } from 'kysely';
import { sanitizeFilters, sanitizeIndexes } from './utils/sanitize.js';
import { Kysely, Transaction } from 'kysely';
import { extractTagsAndSanitizeIndexes } from './utils/sanitize.js';
import { TagTables } from './utils/tags.js';

export class EventLogSql implements EventLog {
#dialect: Dialect;
#db: Kysely<DwnDatabaseType> | null = null;
#tags: TagTables;

constructor(dialect: Dialect) {
this.#dialect = dialect;
this.#tags = new TagTables(dialect, 'eventLogMessages');
}

async open(): Promise<void> {
Expand All @@ -21,7 +24,7 @@ export class EventLogSql implements EventLog {

this.#db = new Kysely<DwnDatabaseType>({ dialect: this.#dialect });
let createTable = this.#db.schema
.createTable('eventLog')
.createTable('eventLogMessages')
.ifNotExists()
.addColumn('tenant', 'text', (col) => col.notNull())
.addColumn('messageCid', 'varchar(60)', (col) => col.notNull())
Expand Down Expand Up @@ -57,10 +60,19 @@ export class EventLogSql implements EventLog {
.addColumn('permissionsGrantId', 'text');
// "indexes" end

let createRecordsTagsTable = this.#db.schema
.createTable('eventLogRecordsTags')
.ifNotExists()
.addColumn('tag', 'text', (col) => col.notNull())
.addColumn('valueString', 'text')
.addColumn('valueNumber', 'integer');
// Add columns that have dialect-specific constraints
createTable = this.#dialect.addAutoIncrementingColumn(createTable, 'watermark', (col) => col.primaryKey());
createRecordsTagsTable = this.#dialect.addAutoIncrementingColumn(createRecordsTagsTable, 'id', (col) => col.primaryKey());
createRecordsTagsTable = this.#dialect.addReferencedColumn(createRecordsTagsTable, 'eventLogRecordsTags', 'eventWatermark', 'integer', 'eventLogMessages', 'watermark', 'cascade');

await createTable.execute();
await createRecordsTagsTable.execute();
}

async close(): Promise<void> {
Expand All @@ -78,17 +90,44 @@ export class EventLogSql implements EventLog {
'Connection to database not open. Call `open` before using `append`.'
);
}
const appendIndexes = { ...indexes };

sanitizeIndexes(appendIndexes);
// we execute the insert in a transaction as we are making multiple inserts into multiple tables.
// if any of these inserts would throw, the whole transaction would be rolled back.
// otherwise it is committed.
await this.#db.transaction().execute(this.executePutTransaction({
tenant, messageCid, indexes
}));
}

await this.#db
.insertInto('eventLog')
.values({
private executePutTransaction(queryOptions: {
tenant: string;
messageCid: string;
indexes: KeyValues;
}): (tx: Transaction<DwnDatabaseType>) => Promise<void> {
const { tenant, messageCid, indexes } = queryOptions;

// we extract the tag indexes into their own object to be inserted separately.
// we also sanitize the indexes to convert any `boolean` values to `text` representations.
const { indexes: appendIndexes, tags } = extractTagsAndSanitizeIndexes(indexes);

return async (tx) => {

const eventIndexValues = {
tenant,
messageCid,
...appendIndexes
}).execute();
...appendIndexes,
};

// we use the dialect-specific `insertThenReturnId` in order to be able to extract the `insertId`
const result = await this.#dialect
.insertThenReturnId(tx, 'eventLogMessages', eventIndexValues, 'watermark as insertId')
.executeTakeFirstOrThrow();

// if tags exist, we execute those within the transaction associating them with the `insertId`.
if (Object.keys(tags).length > 0) {
await this.#tags.executeTagsInsert(result.insertId, tags, tx);
}
};
}

async getEvents(
Expand All @@ -112,19 +151,20 @@ export class EventLogSql implements EventLog {
}

let query = this.#db
.selectFrom('eventLog')
.select('watermark')
.selectFrom('eventLogMessages')
.leftJoin('eventLogRecordsTags', 'eventLogRecordsTags.eventWatermark', 'eventLogMessages.watermark')
.select('messageCid')
.distinct()
.select('watermark')
.where('tenant', '=', tenant);

if (filters.length > 0) {
// sqlite3 dialect does not support `boolean` types, so we convert the filter to match our index
sanitizeFilters(filters);
// filter sanitization takes place within `filterSelectQuery`
query = filterSelectQuery(filters, query);
}

if(cursor !== undefined) {
// eventLog in the sql store uses the watermark cursor value which is a number in SQL
// eventLogMessages in the sql store uses the watermark cursor value which is a number in SQL
// if not we will return empty results
const cursorValue = cursor.value as number;
const cursorMessageCid = cursor.messageCid;
Expand Down Expand Up @@ -171,7 +211,7 @@ export class EventLogSql implements EventLog {
}

await this.#db
.deleteFrom('eventLog')
.deleteFrom('eventLogMessages')
.where('tenant', '=', tenant)
.where('messageCid', 'in', messageCids)
.execute();
Expand All @@ -185,7 +225,7 @@ export class EventLogSql implements EventLog {
}

await this.#db
.deleteFrom('eventLog')
.deleteFrom('eventLogMessages')
.execute();
}
}
Loading

0 comments on commit a1d5a66

Please sign in to comment.