diff --git a/README.md b/README.md index 893e852..89fbb5d 100644 --- a/README.md +++ b/README.md @@ -155,7 +155,7 @@ Docker is used to spin up a local containerized DBs for testing purposes. Docker 0. clone the repo and `cd` into the project directory 1. Install all project dependencies by running `npm install` 2. Start docker - > NOTE: You might need to delete the existing PostgreSQL and MySQL docker containers as well as `dwn.sqlite` file when a breaking change is introduced if you see tests that used to pass is now failing after a `git pull`. + > NOTE: You might need to delete the existing PostgreSQL and MySQL docker containers as well as `dwn.sqlite` file when a breaking change is introduced if you see tests that used to pass is now failing after a `git pull`. You can run `./scripts/delete-databases` to do this. 3. start the test databases using `./scripts/start-databases` (requires Docker) 4. run tests using `npm run test` @@ -163,10 +163,10 @@ Docker is used to spin up a local containerized DBs for testing purposes. Docker | Script | Description | | ----------------------- | ------------------------------------------- | -| `npm run build:esm` | compiles typescript into ESM JS | -| `npm run build:cjs` | compiles typescript into CommonJS | -| `npm run build` | compiles typescript into ESM JS & CommonJS | -| `npm run clean` | deletes compiled JS | +| `npm run build:cjs` | compiles typescript into CommonJS | +| `npm run build:esm` | compiles typescript into ESM JS | +| `npm run build` | compiles typescript into ESM JS & CommonJS | +| `npm run clean` | deletes compiled JS | | `npm run test` | runs tests. | | `npm run test-coverage` | runs tests and includes coverage | | `npm run lint` | runs linter | diff --git a/package-lock.json b/package-lock.json index 4e77aff..c7428a6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@tbd54566975/dwn-sql-store", - "version": "0.4.5", + "version": "0.5.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@tbd54566975/dwn-sql-store", - "version": "0.4.5", + "version": "0.5.0", "license": "Apache-2.0", "dependencies": { "@ipld/dag-cbor": "9.0.5", diff --git a/package.json b/package.json index 5c5006f..7357d72 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tbd54566975/dwn-sql-store", - "version": "0.4.5", + "version": "0.5.0", "description": "SQL backed implementations of DWN MessageStore, DataStore, and EventLog", "type": "module", "license": "Apache-2.0", diff --git a/scripts/delete-databases b/scripts/delete-databases new file mode 100755 index 0000000..081912a --- /dev/null +++ b/scripts/delete-databases @@ -0,0 +1,10 @@ +#!/bin/bash + +THIS_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +sh $THIS_DIR/mysql/stop-mysql -rm +sh $THIS_DIR/postgres/stop-postgres -rm + +echo "Deleting SQLite database file..." +rm -f $THIS_DIR/../dwn.sqlite +echo "SQLite database file deleted." \ No newline at end of file diff --git a/src/resumable-task-store-sql.ts b/src/resumable-task-store-sql.ts new file mode 100644 index 0000000..0b84dcf --- /dev/null +++ b/src/resumable-task-store-sql.ts @@ -0,0 +1,155 @@ +import { DwnDatabaseType } from './types.js'; +import { Dialect } from './dialect/dialect.js'; +import { executeWithRetryIfDatabaseIsLocked } from './utils/transaction.js'; +import { Kysely } from 'kysely'; +import { Cid, ManagedResumableTask, ResumableTaskStore } from '@tbd54566975/dwn-sdk-js'; + +export class ResumableTaskStoreSql implements ResumableTaskStore { + private static readonly taskTimeoutInSeconds = 60; + + #dialect: Dialect; + #db: Kysely | null = null; + + constructor(dialect: Dialect) { + this.#dialect = dialect; + } + + async open(): Promise { + if (this.#db) { + return; + } + + this.#db = new Kysely({ dialect: this.#dialect }); + + let table = this.#db.schema + .createTable('resumableTasks') + .ifNotExists() + .addColumn('id', 'varchar(255)', (col) => col.primaryKey()) + .addColumn('task', 'text') + .addColumn('timeout', 'integer') + .addColumn('retryCount', 'integer'); + + await table.execute(); + + this.#db.schema + .createIndex('index_timeout') + .ifNotExists() + .on('resumableTasks') + .column('timeout') + .execute(); + } + + async close(): Promise { + await this.#db?.destroy(); + this.#db = null; + } + + async register(task: any, timeoutInSeconds: number): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `register`.'); + } + + const id = await Cid.computeCid(task); + const timeout = Date.now() + timeoutInSeconds * 1000; + const taskString = JSON.stringify(task); + const retryCount = 0; + const taskEntryInDatabase: ManagedResumableTask = { id, task: taskString, timeout, retryCount }; + await this.#db.insertInto('resumableTasks').values(taskEntryInDatabase).execute(); + + return { + id, + task, + retryCount, + timeout, + }; + } + + async grab(count: number): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `grab`.'); + } + + const now = Date.now(); + const newTimeout = now + (ResumableTaskStoreSql.taskTimeoutInSeconds * 1000); + + let tasks: DwnDatabaseType['resumableTasks'][] = []; + + const operation = async (transaction) => { + tasks = await transaction + .selectFrom('resumableTasks') + .selectAll() + .where('timeout', '<=', now) + .limit(count) + .execute(); + + if (tasks.length > 0) { + const ids = tasks.map((task) => task.id); + await transaction + .updateTable('resumableTasks') + .set({ timeout: newTimeout }) + .where((eb) => eb('id', 'in', ids)) + .execute(); + } + }; + + await executeWithRetryIfDatabaseIsLocked(this.#db, operation); + + const tasksToReturn = tasks.map((task) => { + return { + id : task.id, + task : JSON.parse(task.task), + retryCount : task.retryCount, + timeout : task.timeout, + }; + }); + + return tasksToReturn; + } + + async read(taskId: string): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `read`.'); + } + + return this.#db + .selectFrom('resumableTasks') + .selectAll() + .where('id', '=', taskId) + .executeTakeFirst(); + } + + async extend(taskId: string, timeoutInSeconds: number): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `extend`.'); + } + + const timeout = Date.now() + (timeoutInSeconds * 1000); + + await this.#db + .updateTable('resumableTasks') + .set({ timeout }) + .where('id', '=', taskId) + .execute(); + } + + async delete(taskId: string): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `delete`.'); + } + + await this.#db + .deleteFrom('resumableTasks') + .where('id', '=', taskId) + .execute(); + } + + async clear(): Promise { + if (!this.#db) { + throw new Error('Connection to database not open. Call `open` before using `clear`.'); + } + + await this.#db + .deleteFrom('resumableTasks') + .execute(); + } +} diff --git a/src/types.ts b/src/types.ts index baa0c93..9a4f655 100644 --- a/src/types.ts +++ b/src/types.ts @@ -103,10 +103,18 @@ type DataStoreTable = { data: Uint8Array; } +type ResumableTaskTable = { + id: string; + task: string; + timeout: number; + retryCount: number; +} + export type DwnDatabaseType = { eventLogMessages: EventLogTable; eventLogRecordsTags: EventLogRecordsTagsTable; messageStoreMessages: MessageStoreTable; messageStoreRecordsTags: MessageStoreRecordsTagsTable; dataStore: DataStoreTable; + resumableTasks: ResumableTaskTable; } \ No newline at end of file diff --git a/tests/test-dialects.ts b/tests/test-dialects.ts index 11ccef7..cdab08c 100644 --- a/tests/test-dialects.ts +++ b/tests/test-dialects.ts @@ -33,7 +33,7 @@ export const testSqliteDialect = new SqliteDialect({ { fileMustExist : true, // IMPORTANT: denotes how long to wait before attempting to execute a query when database is locked, and throws an error if it is still locked - // See https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options + // The default is 5 seconds, see https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options // NOTE: this is also equivalent to setting the pragma "busy_timeout" directly: database.pragma('busy_timeout = 100'); timeout : 100 // 100ms } diff --git a/tests/test-suite.spec.ts b/tests/test-suite.spec.ts index 82d11df..3199240 100644 --- a/tests/test-suite.spec.ts +++ b/tests/test-suite.spec.ts @@ -1,8 +1,9 @@ -import { TestSuite } from '@tbd54566975/dwn-sdk-js/tests'; -import { testMysqlDialect, testPostgresDialect, testSqliteDialect } from './test-dialects.js'; -import { MessageStoreSql } from '../src/message-store-sql.js'; import { DataStoreSql } from '../src/data-store-sql.js'; import { EventLogSql } from '../src/event-log-sql.js'; +import { MessageStoreSql } from '../src/message-store-sql.js'; +import { ResumableTaskStoreSql } from '../src/resumable-task-store-sql.js'; +import { TestSuite } from '@tbd54566975/dwn-sdk-js/tests'; +import { testMysqlDialect, testPostgresDialect, testSqliteDialect } from './test-dialects.js'; // Remove when we Node.js v18 is no longer supported by this project. // Node.js v18 maintenance begins 2023-10-18 and is EoL 2025-04-30: https://github.com/nodejs/release#release-schedule @@ -13,25 +14,28 @@ if (!globalThis.crypto) globalThis.crypto = webcrypto; describe('SQL Store Test Suite', () => { describe('MysqlDialect Support', () => { TestSuite.runStoreDependentTests({ - messageStore : new MessageStoreSql(testMysqlDialect), - dataStore : new DataStoreSql(testMysqlDialect), - eventLog : new EventLogSql(testMysqlDialect), + messageStore : new MessageStoreSql(testMysqlDialect), + dataStore : new DataStoreSql(testMysqlDialect), + eventLog : new EventLogSql(testMysqlDialect), + resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect), }); }); describe('PostgresDialect Support', () => { TestSuite.runStoreDependentTests({ - messageStore : new MessageStoreSql(testPostgresDialect), - dataStore : new DataStoreSql(testPostgresDialect), - eventLog : new EventLogSql(testPostgresDialect), + messageStore : new MessageStoreSql(testPostgresDialect), + dataStore : new DataStoreSql(testPostgresDialect), + eventLog : new EventLogSql(testPostgresDialect), + resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect), }); }); describe('SqliteDialect Support', () => { TestSuite.runStoreDependentTests({ - messageStore : new MessageStoreSql(testSqliteDialect), - dataStore : new DataStoreSql(testSqliteDialect), - eventLog : new EventLogSql(testSqliteDialect), + messageStore : new MessageStoreSql(testSqliteDialect), + dataStore : new DataStoreSql(testSqliteDialect), + eventLog : new EventLogSql(testSqliteDialect), + resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect), }); }); }); \ No newline at end of file