Skip to content

Commit

Permalink
Added SQL implementation of the ResumableDataStore interface (#39)
Browse files Browse the repository at this point in the history
1. Added SQL implementation of the ResumableDataStore interface
2. Added script to easily delete database dialects for testing
  • Loading branch information
thehenrytsai authored Jun 15, 2024
1 parent fcc2aa7 commit 24f51c6
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 21 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,18 @@ Docker is used to spin up a local containerized DBs for testing purposes. Docker
0. clone the repo and `cd` into the project directory
1. Install all project dependencies by running `npm install`
2. Start docker
> NOTE: You might need to delete the existing PostgreSQL and MySQL docker containers as well as `dwn.sqlite` file when a breaking change is introduced if you see tests that used to pass is now failing after a `git pull`.
> NOTE: You might need to delete the existing PostgreSQL and MySQL docker containers as well as `dwn.sqlite` file when a breaking change is introduced if you see tests that used to pass is now failing after a `git pull`. You can run `./scripts/delete-databases` to do this.
3. start the test databases using `./scripts/start-databases` (requires Docker)
4. run tests using `npm run test`

## `npm` scripts

| Script | Description |
| ----------------------- | ------------------------------------------- |
| `npm run build:esm` | compiles typescript into ESM JS |
| `npm run build:cjs` | compiles typescript into CommonJS |
| `npm run build` | compiles typescript into ESM JS & CommonJS |
| `npm run clean` | deletes compiled JS |
| `npm run build:cjs` | compiles typescript into CommonJS |
| `npm run build:esm` | compiles typescript into ESM JS |
| `npm run build` | compiles typescript into ESM JS & CommonJS |
| `npm run clean` | deletes compiled JS |
| `npm run test` | runs tests. |
| `npm run test-coverage` | runs tests and includes coverage |
| `npm run lint` | runs linter |
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tbd54566975/dwn-sql-store",
"version": "0.4.5",
"version": "0.5.0",
"description": "SQL backed implementations of DWN MessageStore, DataStore, and EventLog",
"type": "module",
"license": "Apache-2.0",
Expand Down
10 changes: 10 additions & 0 deletions scripts/delete-databases
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

THIS_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

sh $THIS_DIR/mysql/stop-mysql -rm
sh $THIS_DIR/postgres/stop-postgres -rm

echo "Deleting SQLite database file..."
rm -f $THIS_DIR/../dwn.sqlite
echo "SQLite database file deleted."
155 changes: 155 additions & 0 deletions src/resumable-task-store-sql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { DwnDatabaseType } from './types.js';
import { Dialect } from './dialect/dialect.js';
import { executeWithRetryIfDatabaseIsLocked } from './utils/transaction.js';
import { Kysely } from 'kysely';
import { Cid, ManagedResumableTask, ResumableTaskStore } from '@tbd54566975/dwn-sdk-js';

export class ResumableTaskStoreSql implements ResumableTaskStore {
private static readonly taskTimeoutInSeconds = 60;

#dialect: Dialect;
#db: Kysely<DwnDatabaseType> | null = null;

constructor(dialect: Dialect) {
this.#dialect = dialect;
}

async open(): Promise<void> {
if (this.#db) {
return;
}

this.#db = new Kysely<DwnDatabaseType>({ dialect: this.#dialect });

let table = this.#db.schema
.createTable('resumableTasks')
.ifNotExists()
.addColumn('id', 'varchar(255)', (col) => col.primaryKey())
.addColumn('task', 'text')
.addColumn('timeout', 'integer')
.addColumn('retryCount', 'integer');

await table.execute();

this.#db.schema
.createIndex('index_timeout')
.ifNotExists()
.on('resumableTasks')
.column('timeout')
.execute();
}

async close(): Promise<void> {
await this.#db?.destroy();
this.#db = null;
}

async register(task: any, timeoutInSeconds: number): Promise<ManagedResumableTask> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `register`.');
}

const id = await Cid.computeCid(task);
const timeout = Date.now() + timeoutInSeconds * 1000;
const taskString = JSON.stringify(task);
const retryCount = 0;
const taskEntryInDatabase: ManagedResumableTask = { id, task: taskString, timeout, retryCount };
await this.#db.insertInto('resumableTasks').values(taskEntryInDatabase).execute();

return {
id,
task,
retryCount,
timeout,
};
}

async grab(count: number): Promise<ManagedResumableTask[]> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `grab`.');
}

const now = Date.now();
const newTimeout = now + (ResumableTaskStoreSql.taskTimeoutInSeconds * 1000);

let tasks: DwnDatabaseType['resumableTasks'][] = [];

const operation = async (transaction) => {
tasks = await transaction
.selectFrom('resumableTasks')
.selectAll()
.where('timeout', '<=', now)
.limit(count)
.execute();

if (tasks.length > 0) {
const ids = tasks.map((task) => task.id);
await transaction
.updateTable('resumableTasks')
.set({ timeout: newTimeout })
.where((eb) => eb('id', 'in', ids))
.execute();
}
};

await executeWithRetryIfDatabaseIsLocked(this.#db, operation);

const tasksToReturn = tasks.map((task) => {
return {
id : task.id,
task : JSON.parse(task.task),
retryCount : task.retryCount,
timeout : task.timeout,
};
});

return tasksToReturn;
}

async read(taskId: string): Promise<ManagedResumableTask | undefined> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `read`.');
}

return this.#db
.selectFrom('resumableTasks')
.selectAll()
.where('id', '=', taskId)
.executeTakeFirst();
}

async extend(taskId: string, timeoutInSeconds: number): Promise<void> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `extend`.');
}

const timeout = Date.now() + (timeoutInSeconds * 1000);

await this.#db
.updateTable('resumableTasks')
.set({ timeout })
.where('id', '=', taskId)
.execute();
}

async delete(taskId: string): Promise<void> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `delete`.');
}

await this.#db
.deleteFrom('resumableTasks')
.where('id', '=', taskId)
.execute();
}

async clear(): Promise<void> {
if (!this.#db) {
throw new Error('Connection to database not open. Call `open` before using `clear`.');
}

await this.#db
.deleteFrom('resumableTasks')
.execute();
}
}
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,18 @@ type DataStoreTable = {
data: Uint8Array;
}

type ResumableTaskTable = {
id: string;
task: string;
timeout: number;
retryCount: number;
}

export type DwnDatabaseType = {
eventLogMessages: EventLogTable;
eventLogRecordsTags: EventLogRecordsTagsTable;
messageStoreMessages: MessageStoreTable;
messageStoreRecordsTags: MessageStoreRecordsTagsTable;
dataStore: DataStoreTable;
resumableTasks: ResumableTaskTable;
}
2 changes: 1 addition & 1 deletion tests/test-dialects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const testSqliteDialect = new SqliteDialect({
{
fileMustExist : true,
// IMPORTANT: denotes how long to wait before attempting to execute a query when database is locked, and throws an error if it is still locked
// See https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options
// The default is 5 seconds, see https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options
// NOTE: this is also equivalent to setting the pragma "busy_timeout" directly: database.pragma('busy_timeout = 100');
timeout : 100 // 100ms
}
Expand Down
28 changes: 16 additions & 12 deletions tests/test-suite.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TestSuite } from '@tbd54566975/dwn-sdk-js/tests';
import { testMysqlDialect, testPostgresDialect, testSqliteDialect } from './test-dialects.js';
import { MessageStoreSql } from '../src/message-store-sql.js';
import { DataStoreSql } from '../src/data-store-sql.js';
import { EventLogSql } from '../src/event-log-sql.js';
import { MessageStoreSql } from '../src/message-store-sql.js';
import { ResumableTaskStoreSql } from '../src/resumable-task-store-sql.js';
import { TestSuite } from '@tbd54566975/dwn-sdk-js/tests';
import { testMysqlDialect, testPostgresDialect, testSqliteDialect } from './test-dialects.js';

// Remove when we Node.js v18 is no longer supported by this project.
// Node.js v18 maintenance begins 2023-10-18 and is EoL 2025-04-30: https://github.com/nodejs/release#release-schedule
Expand All @@ -13,25 +14,28 @@ if (!globalThis.crypto) globalThis.crypto = webcrypto;
describe('SQL Store Test Suite', () => {
describe('MysqlDialect Support', () => {
TestSuite.runStoreDependentTests({
messageStore : new MessageStoreSql(testMysqlDialect),
dataStore : new DataStoreSql(testMysqlDialect),
eventLog : new EventLogSql(testMysqlDialect),
messageStore : new MessageStoreSql(testMysqlDialect),
dataStore : new DataStoreSql(testMysqlDialect),
eventLog : new EventLogSql(testMysqlDialect),
resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect),
});
});

describe('PostgresDialect Support', () => {
TestSuite.runStoreDependentTests({
messageStore : new MessageStoreSql(testPostgresDialect),
dataStore : new DataStoreSql(testPostgresDialect),
eventLog : new EventLogSql(testPostgresDialect),
messageStore : new MessageStoreSql(testPostgresDialect),
dataStore : new DataStoreSql(testPostgresDialect),
eventLog : new EventLogSql(testPostgresDialect),
resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect),
});
});

describe('SqliteDialect Support', () => {
TestSuite.runStoreDependentTests({
messageStore : new MessageStoreSql(testSqliteDialect),
dataStore : new DataStoreSql(testSqliteDialect),
eventLog : new EventLogSql(testSqliteDialect),
messageStore : new MessageStoreSql(testSqliteDialect),
dataStore : new DataStoreSql(testSqliteDialect),
eventLog : new EventLogSql(testSqliteDialect),
resumableTaskStore : new ResumableTaskStoreSql(testSqliteDialect),
});
});
});

0 comments on commit 24f51c6

Please sign in to comment.