From 81ff63ccf7c71eccf342899d298a780d66045534 Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Tue, 6 Feb 2024 20:15:26 -0300 Subject: [PATCH 1/3] feat: optional backup on storage migration (#1745) Signed-off-by: Ariel Gentile --- .../src/updates/__tests__/0.3.test.ts | 4 +- .../src/__tests__/migration-postgres.test.ts | 44 +++++++++++++++++++ packages/askar/src/wallet/AskarWallet.ts | 5 ++- packages/core/src/agent/AgentConfig.ts | 4 ++ packages/core/src/agent/BaseAgent.ts | 2 +- .../src/storage/migration/UpdateAssistant.ts | 42 +++++++++++++----- .../storage/migration/__tests__/0.1.test.ts | 8 ++-- .../storage/migration/__tests__/0.2.test.ts | 2 +- .../storage/migration/__tests__/0.3.test.ts | 4 +- packages/core/src/types.ts | 1 + .../error/WalletExportUnsupportedError.ts | 7 +++ packages/core/src/wallet/error/index.ts | 1 + .../tenants/src/updates/__tests__/0.4.test.ts | 2 +- 13 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 packages/askar/src/__tests__/migration-postgres.test.ts create mode 100644 packages/core/src/wallet/error/WalletExportUnsupportedError.ts diff --git a/packages/anoncreds/src/updates/__tests__/0.3.test.ts b/packages/anoncreds/src/updates/__tests__/0.3.test.ts index 602c3d37b0..625b69327b 100644 --- a/packages/anoncreds/src/updates/__tests__/0.3.test.ts +++ b/packages/anoncreds/src/updates/__tests__/0.3.test.ts @@ -82,7 +82,7 @@ describe('UpdateAssistant | AnonCreds | v0.3.1 - v0.4', () => { }, ]) - await updateAssistant.update('0.4') + await updateAssistant.update({ updateToVersion: '0.4' }) expect(await updateAssistant.isUpToDate('0.4')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.4')).toEqual([]) @@ -224,7 +224,7 @@ describe('UpdateAssistant | AnonCreds | v0.3.1 - v0.4', () => { }, ]) - await updateAssistant.update('0.4') + await updateAssistant.update({ updateToVersion: '0.4' }) expect(await updateAssistant.isUpToDate('0.4')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.4')).toEqual([]) diff --git a/packages/askar/src/__tests__/migration-postgres.test.ts b/packages/askar/src/__tests__/migration-postgres.test.ts new file mode 100644 index 0000000000..6eeeb3769c --- /dev/null +++ b/packages/askar/src/__tests__/migration-postgres.test.ts @@ -0,0 +1,44 @@ +import { StorageUpdateService } from '@credo-ts/core' + +import { Agent } from '../../../core/src/agent/Agent' +import { CURRENT_FRAMEWORK_STORAGE_VERSION } from '../../../core/src/storage/migration/updates' +import { askarPostgresStorageConfig, getAskarPostgresAgentOptions } from '../../tests/helpers' + +const agentOptions = getAskarPostgresAgentOptions('Migration', askarPostgresStorageConfig, {}) + +describe('migration with postgres backend', () => { + test('Automatic update on agent startup', async () => { + // Initialize agent and set its storage version to 0.1 in order to force automatic update in the next startup + let agent = new Agent(agentOptions) + await agent.initialize() + + let storageUpdateService = agent.dependencyManager.resolve(StorageUpdateService) + await storageUpdateService.setCurrentStorageVersion(agent.context, '0.1') + await agent.shutdown() + + // Now start agent with auto update storage + agent = new Agent({ ...agentOptions, config: { ...agentOptions.config, autoUpdateStorageOnStartup: true } }) + storageUpdateService = agent.dependencyManager.resolve(StorageUpdateService) + + // Should fail because export is not supported when using postgres + await expect(agent.initialize()).rejects.toThrow(/backend does not support export/) + + expect(await storageUpdateService.getCurrentStorageVersion(agent.context)).toEqual('0.1') + await agent.shutdown() + + // Now start agent with auto update storage, but this time disable backup + agent = new Agent({ + ...agentOptions, + config: { ...agentOptions.config, autoUpdateStorageOnStartup: true, backupBeforeStorageUpdate: false }, + }) + + // Should work OK + await agent.initialize() + expect(await storageUpdateService.getCurrentStorageVersion(agent.context)).toEqual( + CURRENT_FRAMEWORK_STORAGE_VERSION + ) + await agent.shutdown() + + await agent.wallet.delete() + }) +}) diff --git a/packages/askar/src/wallet/AskarWallet.ts b/packages/askar/src/wallet/AskarWallet.ts index f404edb989..a778103998 100644 --- a/packages/askar/src/wallet/AskarWallet.ts +++ b/packages/askar/src/wallet/AskarWallet.ts @@ -13,6 +13,7 @@ import { WalletNotFoundError, KeyDerivationMethod, WalletImportPathExistsError, + WalletExportUnsupportedError, } from '@credo-ts/core' // eslint-disable-next-line import/order import { Store } from '@hyperledger/aries-askar-shared' @@ -277,10 +278,10 @@ export class AskarWallet extends AskarBaseWallet { const { path: sourcePath } = uriFromWalletConfig(this.walletConfig, this.fileSystem.dataPath) if (isAskarWalletSqliteStorageConfig(this.walletConfig.storage) && this.walletConfig.storage?.inMemory) { - throw new WalletError('Export is not supported for in memory wallet') + throw new WalletExportUnsupportedError('Export is not supported for in memory wallet') } if (!sourcePath) { - throw new WalletError('Export is only supported for SQLite backend') + throw new WalletExportUnsupportedError('Export is only supported for SQLite backend') } try { diff --git a/packages/core/src/agent/AgentConfig.ts b/packages/core/src/agent/AgentConfig.ts index 7e43029088..a6a4cb379f 100644 --- a/packages/core/src/agent/AgentConfig.ts +++ b/packages/core/src/agent/AgentConfig.ts @@ -71,6 +71,10 @@ export class AgentConfig { return this.initConfig.autoUpdateStorageOnStartup ?? false } + public get backupBeforeStorageUpdate() { + return this.initConfig.backupBeforeStorageUpdate ?? true + } + public extend(config: Partial): AgentConfig { return new AgentConfig( { ...this.initConfig, logger: this.logger, label: this.label, ...config }, diff --git a/packages/core/src/agent/BaseAgent.ts b/packages/core/src/agent/BaseAgent.ts index de051d7f50..ac35b4e79a 100644 --- a/packages/core/src/agent/BaseAgent.ts +++ b/packages/core/src/agent/BaseAgent.ts @@ -163,7 +163,7 @@ export abstract class BaseAgent = BaseAgent> { return neededUpdates } - public async update(updateToVersion?: UpdateToVersion) { + public async update(options?: { updateToVersion?: UpdateToVersion; backupBeforeStorageUpdate?: boolean }) { const updateIdentifier = Date.now().toString() + const updateToVersion = options?.updateToVersion + + // By default do a backup first (should be explicitly disabled in case the wallet backend does not support export) + const createBackup = options?.backupBeforeStorageUpdate ?? true try { this.agent.config.logger.info(`Starting update of agent storage with updateIdentifier ${updateIdentifier}`) @@ -143,7 +147,9 @@ export class UpdateAssistant = BaseAgent> { ) // Create backup in case migration goes wrong - await this.createBackup(updateIdentifier) + if (createBackup) { + await this.createBackup(updateIdentifier) + } try { for (const update of neededUpdates) { @@ -189,17 +195,23 @@ export class UpdateAssistant = BaseAgent> { `Successfully updated agent storage from version ${update.fromVersion} to version ${update.toVersion}` ) } - // Delete backup file, as it is not needed anymore - await this.fileSystem.delete(this.getBackupPath(updateIdentifier)) + if (createBackup) { + // Delete backup file, as it is not needed anymore + await this.fileSystem.delete(this.getBackupPath(updateIdentifier)) + } } catch (error) { - this.agent.config.logger.fatal('An error occurred while updating the wallet. Restoring backup', { + this.agent.config.logger.fatal('An error occurred while updating the wallet.', { error, }) - // In the case of an error we want to restore the backup - await this.restoreBackup(updateIdentifier) - // Delete backup file, as wallet was already restored (backup-error file will persist though) - await this.fileSystem.delete(this.getBackupPath(updateIdentifier)) + if (createBackup) { + this.agent.config.logger.debug('Restoring backup.') + // In the case of an error we want to restore the backup + await this.restoreBackup(updateIdentifier) + + // Delete backup file, as wallet was already restored (backup-error file will persist though) + await this.fileSystem.delete(this.getBackupPath(updateIdentifier)) + } throw error } @@ -215,6 +227,16 @@ export class UpdateAssistant = BaseAgent> { }) throw new StorageUpdateError(errorMessage, { cause: error }) } + // Wallet backend does not support export + if (error instanceof WalletExportUnsupportedError) { + const errorMessage = `Error updating storage with updateIdentifier ${updateIdentifier} because the wallet backend does not support exporting. + Make sure to do a manual backup of your wallet and disable 'backupBeforeStorageUpdate' before proceeding.` + this.agent.config.logger.fatal(errorMessage, { + error, + updateIdentifier, + }) + throw new StorageUpdateError(errorMessage, { cause: error }) + } this.agent.config.logger.error(`Error updating storage (updateIdentifier: ${updateIdentifier})`, { cause: error, diff --git a/packages/core/src/storage/migration/__tests__/0.1.test.ts b/packages/core/src/storage/migration/__tests__/0.1.test.ts index deaf622249..d7b7481263 100644 --- a/packages/core/src/storage/migration/__tests__/0.1.test.ts +++ b/packages/core/src/storage/migration/__tests__/0.1.test.ts @@ -74,7 +74,7 @@ describe('UpdateAssistant | v0.1 - v0.2', () => { }, ]) - await updateAssistant.update('0.2') + await updateAssistant.update({ updateToVersion: '0.2' }) expect(await updateAssistant.isUpToDate('0.2')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.2')).toEqual([]) @@ -138,7 +138,7 @@ describe('UpdateAssistant | v0.1 - v0.2', () => { }, ]) - await updateAssistant.update('0.2') + await updateAssistant.update({ updateToVersion: '0.2' }) expect(await updateAssistant.isUpToDate('0.2')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.2')).toEqual([]) @@ -201,7 +201,7 @@ describe('UpdateAssistant | v0.1 - v0.2', () => { }, ]) - await updateAssistant.update('0.2') + await updateAssistant.update({ updateToVersion: '0.2' }) expect(await updateAssistant.isUpToDate('0.2')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.2')).toEqual([]) @@ -268,7 +268,7 @@ describe('UpdateAssistant | v0.1 - v0.2', () => { }, ]) - await updateAssistant.update('0.2') + await updateAssistant.update({ updateToVersion: '0.2' }) expect(await updateAssistant.isUpToDate('0.2')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.2')).toEqual([]) diff --git a/packages/core/src/storage/migration/__tests__/0.2.test.ts b/packages/core/src/storage/migration/__tests__/0.2.test.ts index 52c45da3a2..bbb00eb2a8 100644 --- a/packages/core/src/storage/migration/__tests__/0.2.test.ts +++ b/packages/core/src/storage/migration/__tests__/0.2.test.ts @@ -77,7 +77,7 @@ describe('UpdateAssistant | v0.2 - v0.3.1', () => { }, ]) - await updateAssistant.update('0.3.1') + await updateAssistant.update({ updateToVersion: '0.3.1' }) expect(await updateAssistant.isUpToDate('0.3.1')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.3.1')).toEqual([]) diff --git a/packages/core/src/storage/migration/__tests__/0.3.test.ts b/packages/core/src/storage/migration/__tests__/0.3.test.ts index df9cc253e6..a40c1f5ffe 100644 --- a/packages/core/src/storage/migration/__tests__/0.3.test.ts +++ b/packages/core/src/storage/migration/__tests__/0.3.test.ts @@ -72,7 +72,7 @@ describe('UpdateAssistant | v0.3.1 - v0.4', () => { }, ]) - await updateAssistant.update('0.4') + await updateAssistant.update({ updateToVersion: '0.4' }) expect(await updateAssistant.isUpToDate('0.4')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.4')).toEqual([]) @@ -138,7 +138,7 @@ describe('UpdateAssistant | v0.3.1 - v0.4', () => { }, ]) - await updateAssistant.update('0.4') + await updateAssistant.update({ updateToVersion: '0.4' }) expect(await updateAssistant.isUpToDate('0.4')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.4')).toEqual([]) diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index a451b9c57c..cd00c8706b 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -82,6 +82,7 @@ export interface InitConfig { useDidSovPrefixWhereAllowed?: boolean connectionImageUrl?: string autoUpdateStorageOnStartup?: boolean + backupBeforeStorageUpdate?: boolean } export type ProtocolVersion = `${number}.${number}` diff --git a/packages/core/src/wallet/error/WalletExportUnsupportedError.ts b/packages/core/src/wallet/error/WalletExportUnsupportedError.ts new file mode 100644 index 0000000000..db7a313e86 --- /dev/null +++ b/packages/core/src/wallet/error/WalletExportUnsupportedError.ts @@ -0,0 +1,7 @@ +import { WalletError } from './WalletError' + +export class WalletExportUnsupportedError extends WalletError { + public constructor(message: string, { cause }: { cause?: Error } = {}) { + super(message, { cause }) + } +} diff --git a/packages/core/src/wallet/error/index.ts b/packages/core/src/wallet/error/index.ts index 0f9c04b4dd..343fd83913 100644 --- a/packages/core/src/wallet/error/index.ts +++ b/packages/core/src/wallet/error/index.ts @@ -5,3 +5,4 @@ export { WalletError } from './WalletError' export { WalletKeyExistsError } from './WalletKeyExistsError' export { WalletImportPathExistsError } from './WalletImportPathExistsError' export { WalletExportPathExistsError } from './WalletExportPathExistsError' +export { WalletExportUnsupportedError } from './WalletExportUnsupportedError' diff --git a/packages/tenants/src/updates/__tests__/0.4.test.ts b/packages/tenants/src/updates/__tests__/0.4.test.ts index 29c84c4f5d..11af20ab6a 100644 --- a/packages/tenants/src/updates/__tests__/0.4.test.ts +++ b/packages/tenants/src/updates/__tests__/0.4.test.ts @@ -76,7 +76,7 @@ describe('UpdateAssistant | Tenants | v0.4 - v0.5', () => { }, ]) - await updateAssistant.update('0.5') + await updateAssistant.update({ updateToVersion: '0.5' }) expect(await updateAssistant.isUpToDate('0.5')).toBe(true) expect(await updateAssistant.getNeededUpdates('0.5')).toEqual([]) From 2cb9ba881f76818a7feaaa126cbd8e5218aad0b8 Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Wed, 7 Feb 2024 16:53:29 +0700 Subject: [PATCH 2/3] refactor(askar)!: short-lived askar sessions (#1743) Signed-off-by: Timo Glastra --- .../askar/src/storage/AskarStorageService.ts | 24 ++-- .../__tests__/AskarStorageService.test.ts | 69 ++++----- packages/askar/src/wallet/AskarBaseWallet.ts | 135 +++++++++++++----- .../askar/src/wallet/AskarProfileWallet.ts | 51 +++---- packages/askar/src/wallet/AskarWallet.ts | 24 +++- packages/askar/tests/askar-sqlite.e2e.test.ts | 53 ++++--- .../__tests__/openid4vci-holder.e2e.test.ts | 2 +- .../__tests__/openid4vp-holder.e2e.test.ts | 2 +- tests/InMemoryWallet.ts | 6 +- 9 files changed, 225 insertions(+), 141 deletions(-) diff --git a/packages/askar/src/storage/AskarStorageService.ts b/packages/askar/src/storage/AskarStorageService.ts index 8edfb5c2f1..5f7c2121cf 100644 --- a/packages/askar/src/storage/AskarStorageService.ts +++ b/packages/askar/src/storage/AskarStorageService.ts @@ -13,7 +13,6 @@ export class AskarStorageService implements StorageService /** @inheritDoc */ public async save(agentContext: AgentContext, record: T) { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session record.updatedAt = new Date() @@ -21,7 +20,9 @@ export class AskarStorageService implements StorageService const tags = transformFromRecordTagValues(record.getTags()) as Record try { - await session.insert({ category: record.type, name: record.id, value, tags }) + await agentContext.wallet.withSession((session) => + session.insert({ category: record.type, name: record.id, value, tags }) + ) } catch (error) { if (isAskarError(error, AskarErrorCode.Duplicate)) { throw new RecordDuplicateError(`Record with id ${record.id} already exists`, { recordType: record.type }) @@ -34,7 +35,6 @@ export class AskarStorageService implements StorageService /** @inheritDoc */ public async update(agentContext: AgentContext, record: T): Promise { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session record.updatedAt = new Date() @@ -42,7 +42,9 @@ export class AskarStorageService implements StorageService const tags = transformFromRecordTagValues(record.getTags()) as Record try { - await session.replace({ category: record.type, name: record.id, value, tags }) + await agentContext.wallet.withSession((session) => + session.replace({ category: record.type, name: record.id, value, tags }) + ) } catch (error) { if (isAskarError(error, AskarErrorCode.NotFound)) { throw new RecordNotFoundError(`record with id ${record.id} not found.`, { @@ -58,10 +60,9 @@ export class AskarStorageService implements StorageService /** @inheritDoc */ public async delete(agentContext: AgentContext, record: T) { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session try { - await session.remove({ category: record.type, name: record.id }) + await agentContext.wallet.withSession((session) => session.remove({ category: record.type, name: record.id })) } catch (error) { if (isAskarError(error, AskarErrorCode.NotFound)) { throw new RecordNotFoundError(`record with id ${record.id} not found.`, { @@ -80,10 +81,9 @@ export class AskarStorageService implements StorageService id: string ): Promise { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session try { - await session.remove({ category: recordClass.type, name: id }) + await agentContext.wallet.withSession((session) => session.remove({ category: recordClass.type, name: id })) } catch (error) { if (isAskarError(error, AskarErrorCode.NotFound)) { throw new RecordNotFoundError(`record with id ${id} not found.`, { @@ -98,10 +98,11 @@ export class AskarStorageService implements StorageService /** @inheritDoc */ public async getById(agentContext: AgentContext, recordClass: BaseRecordConstructor, id: string): Promise { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session try { - const record = await session.fetch({ category: recordClass.type, name: id }) + const record = await agentContext.wallet.withSession((session) => + session.fetch({ category: recordClass.type, name: id }) + ) if (!record) { throw new RecordNotFoundError(`record with id ${id} not found.`, { recordType: recordClass.type, @@ -117,9 +118,8 @@ export class AskarStorageService implements StorageService /** @inheritDoc */ public async getAll(agentContext: AgentContext, recordClass: BaseRecordConstructor): Promise { assertAskarWallet(agentContext.wallet) - const session = agentContext.wallet.session - const records = await session.fetchAll({ category: recordClass.type }) + const records = await agentContext.wallet.withSession((session) => session.fetchAll({ category: recordClass.type })) const instances = [] for (const record of records) { diff --git a/packages/askar/src/storage/__tests__/AskarStorageService.test.ts b/packages/askar/src/storage/__tests__/AskarStorageService.test.ts index 1ef2aead91..eab6c93e5b 100644 --- a/packages/askar/src/storage/__tests__/AskarStorageService.test.ts +++ b/packages/askar/src/storage/__tests__/AskarStorageService.test.ts @@ -59,13 +59,15 @@ describe('AskarStorageService', () => { }, }) - const retrieveRecord = await ariesAskar.sessionFetch({ - category: record.type, - name: record.id, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - sessionHandle: wallet.session.handle!, - forUpdate: false, - }) + const retrieveRecord = await wallet.withSession((session) => + ariesAskar.sessionFetch({ + category: record.type, + name: record.id, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + sessionHandle: session.handle!, + forUpdate: false, + }) + ) expect(JSON.parse(retrieveRecord?.getTags(0) ?? '{}')).toEqual({ someBoolean: '1', @@ -81,31 +83,34 @@ describe('AskarStorageService', () => { }) it('should correctly transform tag values from string after retrieving', async () => { - await ariesAskar.sessionUpdate({ - category: TestRecord.type, - name: 'some-id', - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - sessionHandle: wallet.session.handle!, - value: TypedArrayEncoder.fromString('{}'), - tags: { - someBoolean: '1', - someOtherBoolean: '0', - someStringValue: 'string', - // Before 0.5.0, there was a bug where array values that contained a : would be incorrectly - // parsed back into a record as we would split on ':' and thus only the first part would be included - // in the record as the tag value. If the record was never re-saved it would work well, as well as if the - // record tag was generated dynamically before save (as then the incorrectly transformed back value would be - // overwritten again on save). - 'anArrayValueWhereValuesContainColon:foo:bar:test': '1', - 'anArrayValueWhereValuesContainColon:https://google.com': '1', - 'anArrayValue:foo': '1', - 'anArrayValue:bar': '1', - // booleans are stored as '1' and '0' so we store the string values '1' and '0' as 'n__1' and 'n__0' - someStringNumberValue: 'n__1', - anotherStringNumberValue: 'n__0', - }, - operation: 0, // EntryOperation.Insert - }) + await wallet.withSession( + async (session) => + await ariesAskar.sessionUpdate({ + category: TestRecord.type, + name: 'some-id', + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + sessionHandle: session.handle!, + value: TypedArrayEncoder.fromString('{}'), + tags: { + someBoolean: '1', + someOtherBoolean: '0', + someStringValue: 'string', + // Before 0.5.0, there was a bug where array values that contained a : would be incorrectly + // parsed back into a record as we would split on ':' and thus only the first part would be included + // in the record as the tag value. If the record was never re-saved it would work well, as well as if the + // record tag was generated dynamically before save (as then the incorrectly transformed back value would be + // overwritten again on save). + 'anArrayValueWhereValuesContainColon:foo:bar:test': '1', + 'anArrayValueWhereValuesContainColon:https://google.com': '1', + 'anArrayValue:foo': '1', + 'anArrayValue:bar': '1', + // booleans are stored as '1' and '0' so we store the string values '1' and '0' as 'n__1' and 'n__0' + someStringNumberValue: 'n__1', + anotherStringNumberValue: 'n__0', + }, + operation: 0, // EntryOperation.Insert + }) + ) const record = await storageService.getById(agentContext, TestRecord, 'some-id') diff --git a/packages/askar/src/wallet/AskarBaseWallet.ts b/packages/askar/src/wallet/AskarBaseWallet.ts index f2ba14a6f1..5d0415387b 100644 --- a/packages/askar/src/wallet/AskarBaseWallet.ts +++ b/packages/askar/src/wallet/AskarBaseWallet.ts @@ -41,8 +41,6 @@ import { didcommV1Pack, didcommV1Unpack } from './didcommV1' const isError = (error: unknown): error is Error => error instanceof Error export abstract class AskarBaseWallet implements Wallet { - protected _session?: Session - protected logger: Logger protected signingKeyProviderRegistry: SigningProviderRegistry @@ -67,12 +65,54 @@ export abstract class AskarBaseWallet implements Wallet { public abstract dispose(): void | Promise public abstract profile: string - public get session() { - if (!this._session) { - throw new CredoError('No Wallet Session is opened') + protected abstract store: Store + + /** + * Run callback with the session provided, the session will + * be closed once the callback resolves or rejects if it is not closed yet. + * + * TODO: update to new `using` syntax so we don't have to use a callback + */ + public async withSession(callback: (session: Session) => Return): Promise> { + let session: Session | undefined = undefined + try { + session = await this.store.session(this.profile).open() + + const result = await callback(session) + + return result + } finally { + if (session?.handle) { + await session.close() + } } + } + + /** + * Run callback with a transaction. If the callback resolves the transaction + * will be committed if the transaction is not closed yet. If the callback rejects + * the transaction will be rolled back if the transaction is not closed yet. + * + * TODO: update to new `using` syntax so we don't have to use a callback + */ + public async withTransaction(callback: (transaction: Session) => Return): Promise> { + let session: Session | undefined = undefined + try { + session = await this.store.transaction(this.profile).open() + + const result = await callback(session) - return this._session + if (session.handle) { + await session.commit() + } + return result + } catch (error) { + if (session?.handle) { + await session?.rollback() + } + + throw error + } } public get supportedKeyTypes() { @@ -105,15 +145,23 @@ export abstract class AskarBaseWallet implements Wallet { // Create key let key: AskarKey | undefined try { - key = privateKey + const _key = privateKey ? AskarKey.fromSecretBytes({ secretKey: privateKey, algorithm }) : seed ? AskarKey.fromSeed({ seed, algorithm }) : AskarKey.generate(algorithm) + // FIXME: we need to create a separate const '_key' so TS definitely knows _key is defined in the session callback. + // This will be fixed once we use the new 'using' syntax + key = _key + const keyPublicBytes = key.publicBytes + // Store key - await this.session.insertKey({ key, name: TypedArrayEncoder.toBase58(keyPublicBytes) }) + await this.withSession((session) => + session.insertKey({ key: _key, name: TypedArrayEncoder.toBase58(keyPublicBytes) }) + ) + key.handle.free() return Key.fromPublicKey(keyPublicBytes, keyType) } catch (error) { @@ -162,7 +210,9 @@ export abstract class AskarBaseWallet implements Wallet { try { if (isKeyTypeSupportedByAskarForPurpose(key.keyType, AskarKeyTypePurpose.KeyManagement)) { - askarKey = (await this.session.fetchKey({ name: key.publicKeyBase58 }))?.key + askarKey = await this.withSession( + async (session) => (await session.fetchKey({ name: key.publicKeyBase58 }))?.key + ) } // FIXME: remove the custom KeyPair record now that we deprecate Indy SDK. @@ -171,19 +221,25 @@ export abstract class AskarBaseWallet implements Wallet { // Fallback to fetching key from the non-askar storage, this is to handle the case // where a key wasn't supported at first by the wallet, but now is if (!askarKey) { + // TODO: we should probably make retrieveKeyPair + insertKey + deleteKeyPair a transaction keyPair = await this.retrieveKeyPair(key.publicKeyBase58) // If we have the key stored in a custom record, but it is now supported by Askar, // we 'import' the key into askar storage and remove the custom key record if (keyPair && isKeyTypeSupportedByAskarForPurpose(keyPair.keyType, AskarKeyTypePurpose.KeyManagement)) { - askarKey = AskarKey.fromSecretBytes({ + const _askarKey = AskarKey.fromSecretBytes({ secretKey: TypedArrayEncoder.fromBase58(keyPair.privateKeyBase58), algorithm: keyAlgFromString(keyPair.keyType), }) - await this.session.insertKey({ - name: key.publicKeyBase58, - key: askarKey, - }) + askarKey = _askarKey + + await this.withSession((session) => + session.insertKey({ + name: key.publicKeyBase58, + key: _askarKey, + }) + ) + // Now we can remove it from the custom record as we have imported it into Askar await this.deleteKeyPair(key.publicKeyBase58) keyPair = undefined @@ -313,7 +369,9 @@ export abstract class AskarBaseWallet implements Wallet { recipientKeys: string[], senderVerkey?: string // in base58 ): Promise { - const senderKey = senderVerkey ? await this.session.fetchKey({ name: senderVerkey }) : undefined + const senderKey = senderVerkey + ? await this.withSession((session) => session.fetchKey({ name: senderVerkey })) + : undefined try { if (senderVerkey && !senderKey) { @@ -339,18 +397,25 @@ export abstract class AskarBaseWallet implements Wallet { // eslint-disable-next-line @typescript-eslint/no-explicit-any const recipientKids: string[] = protectedJson.recipients.map((r: any) => r.header.kid) - for (const recipientKid of recipientKids) { - const recipientKeyEntry = await this.session.fetchKey({ name: recipientKid }) - try { - if (recipientKeyEntry) { - return didcommV1Unpack(messagePackage, recipientKeyEntry.key) + // TODO: how long should sessions last? Just for the duration of the unpack? Or should each item in the recipientKids get a separate session? + const returnValue = await this.withSession(async (session) => { + for (const recipientKid of recipientKids) { + const recipientKeyEntry = await session.fetchKey({ name: recipientKid }) + try { + if (recipientKeyEntry) { + return didcommV1Unpack(messagePackage, recipientKeyEntry.key) + } + } finally { + recipientKeyEntry?.key.handle.free() } - } finally { - recipientKeyEntry?.key.handle.free() } + }) + + if (!returnValue) { + throw new WalletError('No corresponding recipient key found') } - throw new WalletError('No corresponding recipient key found') + return returnValue } public async generateNonce(): Promise { @@ -376,7 +441,9 @@ export abstract class AskarBaseWallet implements Wallet { private async retrieveKeyPair(publicKeyBase58: string): Promise { try { - const entryObject = await this.session.fetch({ category: 'KeyPairRecord', name: `key-${publicKeyBase58}` }) + const entryObject = await this.withSession((session) => + session.fetch({ category: 'KeyPairRecord', name: `key-${publicKeyBase58}` }) + ) if (!entryObject) return null @@ -388,7 +455,7 @@ export abstract class AskarBaseWallet implements Wallet { private async deleteKeyPair(publicKeyBase58: string): Promise { try { - await this.session.remove({ category: 'KeyPairRecord', name: `key-${publicKeyBase58}` }) + await this.withSession((session) => session.remove({ category: 'KeyPairRecord', name: `key-${publicKeyBase58}` })) } catch (error) { throw new WalletError('Error removing KeyPair record', { cause: error }) } @@ -396,14 +463,16 @@ export abstract class AskarBaseWallet implements Wallet { private async storeKeyPair(keyPair: KeyPair): Promise { try { - await this.session.insert({ - category: 'KeyPairRecord', - name: `key-${keyPair.publicKeyBase58}`, - value: JSON.stringify(keyPair), - tags: { - keyType: keyPair.keyType, - }, - }) + await this.withSession((session) => + session.insert({ + category: 'KeyPairRecord', + name: `key-${keyPair.publicKeyBase58}`, + value: JSON.stringify(keyPair), + tags: { + keyType: keyPair.keyType, + }, + }) + ) } catch (error) { if (isAskarError(error, AskarErrorCode.Duplicate)) { throw new WalletKeyExistsError('Key already exists') diff --git a/packages/askar/src/wallet/AskarProfileWallet.ts b/packages/askar/src/wallet/AskarProfileWallet.ts index f5d9c4abc9..8c6e272d70 100644 --- a/packages/askar/src/wallet/AskarProfileWallet.ts +++ b/packages/askar/src/wallet/AskarProfileWallet.ts @@ -19,6 +19,7 @@ import { AskarBaseWallet } from './AskarBaseWallet' export class AskarProfileWallet extends AskarBaseWallet { private walletConfig?: WalletConfig public readonly store: Store + public isInitialized = false public constructor( store: Store, @@ -30,10 +31,6 @@ export class AskarProfileWallet extends AskarBaseWallet { this.store = store } - public get isInitialized() { - return this._session !== undefined - } - public get isProvisioned() { return this.walletConfig !== undefined } @@ -89,19 +86,15 @@ export class AskarProfileWallet extends AskarBaseWallet { try { this.walletConfig = walletConfig - this._session = await this.store.session(walletConfig.id).open() - - // FIXME: opening a session for a profile that does not exist, will not throw an error until - // the session is actually used. We can check if the profile exists by doing something with - // the session, which will throw a not found error if the profile does not exists, - // but that is not very efficient as it needs to be done on every open. - // See: https://github.com/hyperledger/aries-askar/issues/163 - await this._session.fetch({ - category: 'fetch-to-see-if-profile-exists', - name: 'fetch-to-see-if-profile-exists', - forUpdate: false, - isJson: false, + // TODO: what is faster? listProfiles or open and close session? + // I think open/close is more scalable (what if profiles is 10.000.000?) + // We just want to check if the profile exists. Because the wallet initialization logic + // first tries to open, and if it doesn't exist it will create it. So we must check here + // if the profile exists + await this.withSession(() => { + /* no-op */ }) + this.isInitialized = true } catch (error) { // Profile does not exist if (isAskarError(error, AskarErrorCode.NotFound)) { @@ -138,16 +131,15 @@ export class AskarProfileWallet extends AskarBaseWallet { ) } - this.logger.info(`Deleting profile '${this.walletConfig.id}'`) - - if (this._session) { + this.logger.info(`Deleting profile '${this.profile}'`) + if (this.isInitialized) { await this.close() } try { - await this.store.removeProfile(this.walletConfig.id) + await this.store.removeProfile(this.profile) } catch (error) { - const errorMessage = `Error deleting wallet for profile '${this.walletConfig.id}': ${error.message}` + const errorMessage = `Error deleting wallet for profile '${this.profile}': ${error.message}` this.logger.error(errorMessage, { error, errorMessage: error.message, @@ -176,21 +168,10 @@ export class AskarProfileWallet extends AskarBaseWallet { public async close() { this.logger.debug(`Closing wallet for profile ${this.walletConfig?.id}`) - if (!this._session) { - throw new WalletError('Wallet is in invalid state, you are trying to close wallet that has no handle.') + if (!this.isInitialized) { + throw new WalletError('Wallet is in invalid state, you are trying to close wallet that is not initialized.') } - try { - await this.session.close() - this._session = undefined - } catch (error) { - const errorMessage = `Error closing wallet for profile ${this.walletConfig?.id}: ${error.message}` - this.logger.error(errorMessage, { - error, - errorMessage: error.message, - }) - - throw new WalletError(errorMessage, { cause: error }) - } + this.isInitialized = false } } diff --git a/packages/askar/src/wallet/AskarWallet.ts b/packages/askar/src/wallet/AskarWallet.ts index a778103998..34be645d9c 100644 --- a/packages/askar/src/wallet/AskarWallet.ts +++ b/packages/askar/src/wallet/AskarWallet.ts @@ -15,9 +15,7 @@ import { WalletImportPathExistsError, WalletExportUnsupportedError, } from '@credo-ts/core' -// eslint-disable-next-line import/order import { Store } from '@hyperledger/aries-askar-shared' - import { inject, injectable } from 'tsyringe' import { AskarErrorCode, isAskarError, keyDerivationMethodToStoreKeyMethod, uriFromWalletConfig } from '../utils' @@ -117,8 +115,11 @@ export class AskarWallet extends AskarBaseWallet { keyMethod: askarWalletConfig.keyMethod, passKey: askarWalletConfig.passKey, }) + + // TODO: Should we do something to check if it exists? + // Like this.withSession()? + this.walletConfig = walletConfig - this._session = await this._store.openSession() } catch (error) { // FIXME: Askar should throw a Duplicate error code, but is currently returning Encryption // And if we provide the very same wallet key, it will open it without any error @@ -204,7 +205,9 @@ export class AskarWallet extends AskarBaseWallet { keyMethod: keyDerivationMethodToStoreKeyMethod(rekeyDerivation ?? KeyDerivationMethod.Argon2IMod), }) } - this._session = await this._store.openSession() + + // TODO: Should we do something to check if it exists? + // Like this.withSession()? this.walletConfig = walletConfig } catch (error) { @@ -327,6 +330,7 @@ export class AskarWallet extends AskarBaseWallet { throw new WalletError('Import is only supported for SQLite backend') } + let sourceWalletStore: Store | undefined = undefined try { const importWalletConfig = await this.getAskarWalletConfig(walletConfig) @@ -338,12 +342,19 @@ export class AskarWallet extends AskarBaseWallet { // Make sure destination path exists await this.fileSystem.createDirectory(destinationPath) // Open imported wallet and copy to destination - const sourceWalletStore = await Store.open({ + sourceWalletStore = await Store.open({ uri: `sqlite://${sourcePath}`, keyMethod: importWalletConfig.keyMethod, passKey: importKey, }) + const defaultProfile = await sourceWalletStore.getDefaultProfile() + if (defaultProfile !== importWalletConfig.profile) { + throw new WalletError( + `Trying to import wallet with walletConfig.id ${importWalletConfig.profile}, however the wallet contains a default profile with id ${defaultProfile}. The walletConfig.id MUST match with the default profile. In the future this behavior may be changed. See https://github.com/hyperledger/aries-askar/issues/221 for more information.` + ) + } + await sourceWalletStore.copyTo({ recreate: false, uri: importWalletConfig.uri, @@ -353,6 +364,7 @@ export class AskarWallet extends AskarBaseWallet { await sourceWalletStore.close() } catch (error) { + await sourceWalletStore?.close() const errorMessage = `Error importing wallet '${walletConfig.id}': ${error.message}` this.logger.error(errorMessage, { error, @@ -380,9 +392,7 @@ export class AskarWallet extends AskarBaseWallet { } try { - await this.session.close() await this.store.close() - this._session = undefined this._store = undefined } catch (error) { const errorMessage = `Error closing wallet': ${error.message}` diff --git a/packages/askar/tests/askar-sqlite.e2e.test.ts b/packages/askar/tests/askar-sqlite.e2e.test.ts index 7de1f8408b..2f13b84b23 100644 --- a/packages/askar/tests/askar-sqlite.e2e.test.ts +++ b/packages/askar/tests/askar-sqlite.e2e.test.ts @@ -129,9 +129,9 @@ describe('Askar SQLite agents', () => { expect(await bobBasicMessageRepository.findById(bobAgent.context, basicMessageRecord.id)).toBeNull() await bobAgent.wallet.delete() - // Import backup with different wallet id and initialize - await bobAgent.wallet.import({ id: backupWalletName, key: backupWalletName }, { path: backupPath, key: backupKey }) - await bobAgent.wallet.initialize({ id: backupWalletName, key: backupWalletName }) + // Import backup with SAME wallet id and initialize + await bobAgent.wallet.import(bobAgent.config.walletConfig, { path: backupPath, key: backupKey }) + await bobAgent.wallet.initialize(bobAgent.config.walletConfig) // Expect same basic message record to exist in new wallet expect(await bobBasicMessageRepository.getById(bobAgent.context, basicMessageRecord.id)).toMatchObject({ @@ -144,6 +144,29 @@ describe('Askar SQLite agents', () => { }) }) + test('throws error when exporting a wallet and importing it with a different walletConfig.id', async () => { + await bobAgent.initialize() + + if (!bobAgent.config.walletConfig) { + throw new Error('No wallet config on bobAgent') + } + + const backupKey = 'someBackupKey' + const backupWalletName = `backup-${utils.uuid()}` + const backupPath = path.join(tmpdir(), backupWalletName) + + // Create backup and delete wallet + await bobAgent.wallet.export({ path: backupPath, key: backupKey }) + await bobAgent.wallet.delete() + + // Import backup with different wallet id and initialize + await expect( + bobAgent.wallet.import({ id: backupWalletName, key: backupWalletName }, { path: backupPath, key: backupKey }) + ).rejects.toThrow( + `Error importing wallet '${backupWalletName}': Trying to import wallet with walletConfig.id ${backupWalletName}, however the wallet contains a default profile with id ${bobAgent.config.walletConfig.id}. The walletConfig.id MUST match with the default profile. In the future this behavior may be changed. See https://github.com/hyperledger/aries-askar/issues/221 for more information.` + ) + }) + test('throws error when attempting to export and import to existing paths', async () => { await bobAgent.initialize() @@ -157,25 +180,21 @@ describe('Askar SQLite agents', () => { // Create backup and try to export it again to the same path await bobAgent.wallet.export({ path: backupPath, key: backupKey }) - await expect(async () => await bobAgent.wallet.export({ path: backupPath, key: backupKey })).rejects.toThrowError( + await expect(bobAgent.wallet.export({ path: backupPath, key: backupKey })).rejects.toThrow( /Unable to create export/ ) await bobAgent.wallet.delete() // Import backup with different wallet id and initialize - await bobAgent.wallet.import({ id: backupWalletName, key: backupWalletName }, { path: backupPath, key: backupKey }) - await bobAgent.wallet.initialize({ id: backupWalletName, key: backupWalletName }) + await bobAgent.wallet.import(bobAgent.config.walletConfig, { path: backupPath, key: backupKey }) + await bobAgent.wallet.initialize(bobAgent.config.walletConfig) await bobAgent.wallet.close() // Try to import again an existing wallet await expect( - async () => - await bobAgent.wallet.import( - { id: backupWalletName, key: backupWalletName }, - { path: backupPath, key: backupKey } - ) - ).rejects.toThrowError(/Unable to import wallet/) + bobAgent.wallet.import(bobAgent.config.walletConfig, { path: backupPath, key: backupKey }) + ).rejects.toThrow(/Unable to import wallet/) }) test('throws error when attempting to import using wrong key', async () => { @@ -196,16 +215,12 @@ describe('Askar SQLite agents', () => { // Try to import backup with wrong key await expect( - async () => - await bobAgent.wallet.import( - { id: backupWalletName, key: backupWalletName }, - { path: backupPath, key: wrongBackupKey } - ) + bobAgent.wallet.import(bobAgent.config.walletConfig, { path: backupPath, key: wrongBackupKey }) ).rejects.toThrow() // Try to import again using the correct key - await bobAgent.wallet.import({ id: backupWalletName, key: backupWalletName }, { path: backupPath, key: backupKey }) - await bobAgent.wallet.initialize({ id: backupWalletName, key: backupWalletName }) + await bobAgent.wallet.import(bobAgent.config.walletConfig, { path: backupPath, key: backupKey }) + await bobAgent.wallet.initialize(bobAgent.config.walletConfig) await bobAgent.wallet.close() }) diff --git a/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vci-holder.e2e.test.ts b/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vci-holder.e2e.test.ts index a2a7614cf9..7c9a1e64c9 100644 --- a/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vci-holder.e2e.test.ts +++ b/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vci-holder.e2e.test.ts @@ -289,7 +289,7 @@ describe('OpenId4VcHolder', () => { ) // FIXME: credential returned by walt.id has nbf and issuanceDate that do not match // but we know that we at least received the credential if we got to this error - .rejects.toThrowError('JWT nbf and vc.issuanceDate do not match') + .rejects.toThrow('JWT nbf and vc.issuanceDate do not match') }) }) }) diff --git a/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vp-holder.e2e.test.ts b/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vp-holder.e2e.test.ts index 8b511cc99a..fbc348d5a3 100644 --- a/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vp-holder.e2e.test.ts +++ b/packages/openid4vc/src/openid4vc-holder/__tests__/openid4vp-holder.e2e.test.ts @@ -5,11 +5,11 @@ import type { Server } from 'http' import express from 'express' -import { OpenId4VcHolderModule } from '..' import { AskarModule } from '../../../../askar/src' import { askarModuleConfig } from '../../../../askar/tests/helpers' import { createAgentFromModules } from '../../../tests/utils' import { OpenId4VcVerifierModule } from '../../openid4vc-verifier' +import { OpenId4VcHolderModule } from '../OpenId4VcHolderModule' const port = 3121 const verificationEndpointPath = '/proofResponse' diff --git a/tests/InMemoryWallet.ts b/tests/InMemoryWallet.ts index 3d32ff9ae1..effe1fa215 100644 --- a/tests/InMemoryWallet.ts +++ b/tests/InMemoryWallet.ts @@ -27,6 +27,8 @@ import { TypedArrayEncoder, } from '@credo-ts/core' +const inMemoryWallets: InMemoryWallets = {} + const isError = (error: unknown): error is Error => error instanceof Error interface InMemoryKey { @@ -52,7 +54,9 @@ export class InMemoryWallet implements Wallet { // isInitialized to see if the wallet is actually open public activeWalletId?: string - public inMemoryWallets: InMemoryWallets = {} + public get inMemoryWallets() { + return inMemoryWallets + } /** * Abstract methods that need to be implemented by subclasses */ From faa390f2e2bb438596b5d9e3a69e1442f551ff1e Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Mon, 12 Feb 2024 22:22:31 -0300 Subject: [PATCH 3/3] feat(mesage-pickup): option for awaiting completion (#1755) --- .../message-pickup/MessagePickupApi.ts | 63 +++++++-- .../message-pickup/MessagePickupApiOptions.ts | 2 + .../message-pickup/MessagePickupEvents.ts | 10 ++ .../message-pickup/__tests__/pickup.test.ts | 127 +++++++++++++++++- .../protocol/v1/V1MessagePickupProtocol.ts | 44 +++++- .../protocol/v1/handlers/V1BatchHandler.ts | 30 ++--- .../protocol/v2/V2MessagePickupProtocol.ts | 16 ++- .../protocol/v2/handlers/V2StatusHandler.ts | 8 +- .../modules/routing/MediationRecipientApi.ts | 15 ++- 9 files changed, 279 insertions(+), 36 deletions(-) diff --git a/packages/core/src/modules/message-pickup/MessagePickupApi.ts b/packages/core/src/modules/message-pickup/MessagePickupApi.ts index e9025d9d29..871fce42f2 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupApi.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupApi.ts @@ -10,12 +10,16 @@ import type { DeliverMessagesReturnType, DeliverMessagesFromQueueReturnType, } from './MessagePickupApiOptions' +import type { MessagePickupCompletedEvent } from './MessagePickupEvents' import type { MessagePickupSession, MessagePickupSessionRole } from './MessagePickupSession' import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' import type { MessagePickupRepository } from './storage/MessagePickupRepository' +import { ReplaySubject, Subject, filter, firstValueFrom, takeUntil, timeout } from 'rxjs' + import { AgentContext } from '../../agent' +import { EventEmitter } from '../../agent/EventEmitter' import { MessageSender } from '../../agent/MessageSender' import { OutboundMessageContext } from '../../agent/models' import { InjectionSymbols } from '../../constants' @@ -24,6 +28,7 @@ import { Logger } from '../../logger/Logger' import { inject, injectable } from '../../plugins' import { ConnectionService } from '../connections/services' +import { MessagePickupEventTypes } from './MessagePickupEvents' import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' import { MessagePickupSessionService } from './services/MessagePickupSessionService' @@ -47,23 +52,29 @@ export class MessagePickupApi public constructor( messageSender: MessageSender, agentContext: AgentContext, connectionService: ConnectionService, + eventEmitter: EventEmitter, messagePickupSessionService: MessagePickupSessionService, config: MessagePickupModuleConfig, + @inject(InjectionSymbols.Stop$) stop$: Subject, @inject(InjectionSymbols.Logger) logger: Logger ) { this.messageSender = messageSender this.connectionService = connectionService this.agentContext = agentContext + this.eventEmitter = eventEmitter this.config = config this.messagePickupSessionService = messagePickupSessionService + this.stop$ = stop$ this.logger = logger } @@ -123,9 +134,9 @@ export class MessagePickupApi): Promise { const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) @@ -199,17 +214,41 @@ export class MessagePickupApi(MessagePickupEventTypes.MessagePickupCompleted) + .pipe( + // Stop when the agent shuts down + takeUntil(this.stop$), + // filter by connection id + filter((e) => e.payload.connection.id === connectionRecord.id), + // If we don't receive all messages within timeoutMs miliseconds (no response, not supported, etc...) error + timeout({ + first: options.awaitCompletionTimeoutMs ?? 10000, + meta: 'MessagePickupApi.pickupMessages', + }) + ) + .subscribe(replaySubject) + } + + await this.messageSender.sendMessage(outboundMessageContext) + + if (options.awaitCompletion) { + await firstValueFrom(replaySubject) + } } /** - * Enable or disable Live Delivery mode as a recipient. If there were previous queued messages, it will pick-up them - * automatically. + * Enable or disable Live Delivery mode as a recipient. Depending on the message pickup protocol used, + * after receiving a response from the mediator the agent might retrieve any pending message. * * @param options connectionId, protocol version to use and boolean to enable/disable Live Mode */ diff --git a/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts b/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts index 7700915017..351753b2fb 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts @@ -29,6 +29,8 @@ export interface PickupMessagesOptions recipientDid?: string batchSize?: number + awaitCompletion?: boolean + awaitCompletionTimeoutMs?: number } export interface SetLiveDeliveryModeOptions { diff --git a/packages/core/src/modules/message-pickup/MessagePickupEvents.ts b/packages/core/src/modules/message-pickup/MessagePickupEvents.ts index ea12ad5131..bc95e70d29 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupEvents.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupEvents.ts @@ -1,9 +1,11 @@ import type { MessagePickupSession } from './MessagePickupSession' import type { BaseEvent } from '../../agent/Events' +import type { ConnectionRecord } from '../connections' export enum MessagePickupEventTypes { LiveSessionSaved = 'LiveSessionSaved', LiveSessionRemoved = 'LiveSessionRemoved', + MessagePickupCompleted = 'MessagePickupCompleted', } export interface MessagePickupLiveSessionSavedEvent extends BaseEvent { @@ -19,3 +21,11 @@ export interface MessagePickupLiveSessionRemovedEvent extends BaseEvent { session: MessagePickupSession } } + +export interface MessagePickupCompletedEvent extends BaseEvent { + type: typeof MessagePickupEventTypes.MessagePickupCompleted + payload: { + connection: ConnectionRecord + threadId?: string + } +} diff --git a/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts b/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts index c0ca0c1271..8a63f67e33 100644 --- a/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts +++ b/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts @@ -41,6 +41,8 @@ describe('E2E Pick Up protocol', () => { let mediatorAgent: Agent afterEach(async () => { + await recipientAgent.mediationRecipient.stopMessagePickup() + await recipientAgent.shutdown() await recipientAgent.wallet.delete() await mediatorAgent.shutdown() @@ -106,6 +108,66 @@ describe('E2E Pick Up protocol', () => { expect(basicMessage.content).toBe(message) }) + test('E2E manual Pick Up V1 loop - waiting for completion', async () => { + const mediatorMessages = new Subject() + + const subjectMap = { + 'wss://mediator': mediatorMessages, + } + + // Initialize mediatorReceived message + mediatorAgent = new Agent(mediatorOptions) + mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages)) + await mediatorAgent.initialize() + + // Create connection to use for recipient + const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({ + label: 'mediator invitation', + handshake: true, + handshakeProtocols: [HandshakeProtocol.DidExchange], + }) + + // Initialize recipient + recipientAgent = new Agent(recipientOptions) + recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + await recipientAgent.initialize() + + // Connect + const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation + + let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl( + mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' }) + ) + + recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected( + recipientMediatorConnection!.id + ) + + let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id) + + mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id) + + // Now they are connected, reinitialize recipient agent in order to lose the session (as with SubjectTransport it remains open) + await recipientAgent.shutdown() + await recipientAgent.initialize() + + const message = 'hello pickup V1' + await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) + + const basicMessagePromise = waitForBasicMessage(recipientAgent, { + content: message, + }) + await recipientAgent.messagePickup.pickupMessages({ + connectionId: recipientMediatorConnection.id, + protocolVersion: 'v1', + awaitCompletion: true, + }) + + const basicMessage = await basicMessagePromise + expect(basicMessage.content).toBe(message) + }) + test('E2E manual Pick Up V2 loop', async () => { const mediatorMessages = new Subject() @@ -185,7 +247,70 @@ describe('E2E Pick Up protocol', () => { }) expect((secondStatusMessage as V2StatusMessage).messageCount).toBe(0) + }) - await recipientAgent.mediationRecipient.stopMessagePickup() + test('E2E manual Pick Up V2 loop - waiting for completion', async () => { + const mediatorMessages = new Subject() + + // FIXME: we harcoded that pickup of messages MUST be using ws(s) scheme when doing implicit pickup + // For liver delivery we need a duplex transport. however that means we can't test it with the subject transport. Using wss here to 'hack' this. We should + // extend the API to allow custom schemes (or maybe add a `supportsDuplex` transport / `supportMultiReturnMessages`) + // For pickup v2 pickup message (which we're testing here) we could just as well use `http` as it is just request/response. + const subjectMap = { + 'wss://mediator': mediatorMessages, + } + + // Initialize mediatorReceived message + mediatorAgent = new Agent(mediatorOptions) + mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages)) + await mediatorAgent.initialize() + + // Create connection to use for recipient + const mediatorOutOfBandRecord = await mediatorAgent.oob.createInvitation({ + label: 'mediator invitation', + handshake: true, + handshakeProtocols: [HandshakeProtocol.DidExchange], + }) + + // Initialize recipient + recipientAgent = new Agent(recipientOptions) + recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap)) + await recipientAgent.initialize() + + // Connect + const mediatorInvitation = mediatorOutOfBandRecord.outOfBandInvitation + + let { connectionRecord: recipientMediatorConnection } = await recipientAgent.oob.receiveInvitationFromUrl( + mediatorInvitation.toUrl({ domain: 'https://example.com/ssi' }) + ) + + recipientMediatorConnection = await recipientAgent.connections.returnWhenIsConnected( + recipientMediatorConnection!.id + ) + + let [mediatorRecipientConnection] = await mediatorAgent.connections.findAllByOutOfBandId(mediatorOutOfBandRecord.id) + + mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id) + + // Now they are connected, reinitialize recipient agent in order to lose the session (as with SubjectTransport it remains open) + await recipientAgent.shutdown() + await recipientAgent.initialize() + + const message = 'hello pickup V2' + + await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) + + const basicMessagePromise = waitForBasicMessage(recipientAgent, { + content: message, + }) + await recipientAgent.messagePickup.pickupMessages({ + connectionId: recipientMediatorConnection.id, + protocolVersion: 'v2', + awaitCompletion: true, + }) + + const basicMessage = await basicMessagePromise + expect(basicMessage.content).toBe(message) }) }) diff --git a/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts index cfb7eb02e1..e893a5e2ea 100644 --- a/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts @@ -1,8 +1,10 @@ import type { AgentContext } from '../../../../agent' import type { AgentMessage } from '../../../../agent/AgentMessage' +import type { AgentMessageReceivedEvent } from '../../../../agent/Events' import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import type { DependencyManager } from '../../../../plugins' +import type { MessagePickupCompletedEvent } from '../../MessagePickupEvents' import type { MessagePickupRepository } from '../../storage/MessagePickupRepository' import type { DeliverMessagesProtocolOptions, @@ -12,10 +14,13 @@ import type { SetLiveDeliveryModeProtocolReturnType, } from '../MessagePickupProtocolOptions' +import { EventEmitter } from '../../../../agent/EventEmitter' +import { AgentEventTypes } from '../../../../agent/Events' import { OutboundMessageContext, Protocol } from '../../../../agent/models' import { InjectionSymbols } from '../../../../constants' import { CredoError } from '../../../../error' import { injectable } from '../../../../plugins' +import { MessagePickupEventTypes } from '../../MessagePickupEvents' import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' import { BaseMessagePickupProtocol } from '../BaseMessagePickupProtocol' @@ -33,7 +38,7 @@ export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { * Registers the protocol implementation (handlers, feature registry) on the agent. */ public register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void { - dependencyManager.registerMessageHandlers([new V1BatchPickupHandler(this), new V1BatchHandler()]) + dependencyManager.registerMessageHandlers([new V1BatchPickupHandler(this), new V1BatchHandler(this)]) featureRegistry.register( new Protocol({ @@ -74,6 +79,7 @@ export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { (await pickupMessageQueue.takeFromQueue({ connectionId: connectionRecord.id, limit: batchSize, // TODO: Define as config parameter for message holder side + deleteMessages: true, })) const batchMessages = messagesToDeliver.map( @@ -127,4 +133,40 @@ export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { return new OutboundMessageContext(batchMessage, { agentContext: messageContext.agentContext, connection }) } + + public async processBatch(messageContext: InboundMessageContext) { + const { message: batchMessage, agentContext } = messageContext + const { messages } = batchMessage + + const connection = messageContext.assertReadyConnection() + + const eventEmitter = messageContext.agentContext.dependencyManager.resolve(EventEmitter) + + messages.forEach((message) => { + eventEmitter.emit(messageContext.agentContext, { + type: AgentEventTypes.AgentMessageReceived, + payload: { + message: message.message, + contextCorrelationId: messageContext.agentContext.contextCorrelationId, + }, + }) + }) + + // A Batch message without messages at all means that we are done with the + // message pickup process (Note: this is not optimal since we'll always doing an extra + // Batch Pickup. However, it is safer and should be faster than waiting an entire loop + // interval to retrieve more messages) + if (messages.length === 0) { + eventEmitter.emit(messageContext.agentContext, { + type: MessagePickupEventTypes.MessagePickupCompleted, + payload: { + connection, + threadId: batchMessage.threadId, + }, + }) + return null + } + + return (await this.createPickupMessage(agentContext, { connectionRecord: connection })).message + } } diff --git a/packages/core/src/modules/message-pickup/protocol/v1/handlers/V1BatchHandler.ts b/packages/core/src/modules/message-pickup/protocol/v1/handlers/V1BatchHandler.ts index 071711f9e3..f49e8130d1 100644 --- a/packages/core/src/modules/message-pickup/protocol/v1/handlers/V1BatchHandler.ts +++ b/packages/core/src/modules/message-pickup/protocol/v1/handlers/V1BatchHandler.ts @@ -1,28 +1,26 @@ -import type { AgentMessageReceivedEvent } from '../../../../../agent/Events' import type { MessageHandler, MessageHandlerInboundMessage } from '../../../../../agent/MessageHandler' +import type { V1MessagePickupProtocol } from '../V1MessagePickupProtocol' -import { EventEmitter } from '../../../../../agent/EventEmitter' -import { AgentEventTypes } from '../../../../../agent/Events' +import { OutboundMessageContext } from '../../../../../agent/models' import { V1BatchMessage } from '../messages' export class V1BatchHandler implements MessageHandler { public supportedMessages = [V1BatchMessage] + private messagePickupProtocol: V1MessagePickupProtocol - public async handle(messageContext: MessageHandlerInboundMessage) { - const { message } = messageContext - const eventEmitter = messageContext.agentContext.dependencyManager.resolve(EventEmitter) + public constructor(messagePickupProtocol: V1MessagePickupProtocol) { + this.messagePickupProtocol = messagePickupProtocol + } - messageContext.assertReadyConnection() + public async handle(messageContext: MessageHandlerInboundMessage) { + const connection = messageContext.assertReadyConnection() + const batchRequestMessage = await this.messagePickupProtocol.processBatch(messageContext) - const forwardedMessages = message.messages - forwardedMessages.forEach((message) => { - eventEmitter.emit(messageContext.agentContext, { - type: AgentEventTypes.AgentMessageReceived, - payload: { - message: message.message, - contextCorrelationId: messageContext.agentContext.contextCorrelationId, - }, + if (batchRequestMessage) { + return new OutboundMessageContext(batchRequestMessage, { + agentContext: messageContext.agentContext, + connection, }) - }) + } } } diff --git a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts index c0b927b039..f42fd5d126 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts @@ -5,6 +5,7 @@ import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import type { DependencyManager } from '../../../../plugins' import type { EncryptedMessage } from '../../../../types' +import type { MessagePickupCompletedEvent } from '../../MessagePickupEvents' import type { MessagePickupRepository } from '../../storage/MessagePickupRepository' import type { DeliverMessagesProtocolOptions, @@ -24,6 +25,7 @@ import { injectable } from '../../../../plugins' import { verkeyToDidKey } from '../../../dids/helpers' import { ProblemReportError } from '../../../problem-reports' import { RoutingProblemReportReason } from '../../../routing/error' +import { MessagePickupEventTypes } from '../../MessagePickupEvents' import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' import { MessagePickupSessionRole } from '../../MessagePickupSession' import { MessagePickupSessionService } from '../../services' @@ -242,12 +244,24 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { const { message: statusMessage } = messageContext const { messageCount, recipientKey } = statusMessage + const connection = messageContext.assertReadyConnection() + const messagePickupModuleConfig = messageContext.agentContext.dependencyManager.resolve(MessagePickupModuleConfig) - //No messages to be retrieved + const eventEmitter = messageContext.agentContext.dependencyManager.resolve(EventEmitter) + + //No messages to be retrieved: message pick-up is completed if (messageCount === 0) { + eventEmitter.emit(messageContext.agentContext, { + type: MessagePickupEventTypes.MessagePickupCompleted, + payload: { + connection, + threadId: statusMessage.threadId, + }, + }) return null } + const { maximumBatchSize: maximumMessagePickup } = messagePickupModuleConfig const limit = messageCount < maximumMessagePickup ? messageCount : maximumMessagePickup diff --git a/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2StatusHandler.ts b/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2StatusHandler.ts index 0e4d1467f2..598c4a447f 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2StatusHandler.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2StatusHandler.ts @@ -7,15 +7,15 @@ import { V2StatusMessage } from '../messages' export class V2StatusHandler implements MessageHandler { public supportedMessages = [V2StatusMessage] - private messagePickupService: V2MessagePickupProtocol + private messagePickupProtocol: V2MessagePickupProtocol - public constructor(messagePickupService: V2MessagePickupProtocol) { - this.messagePickupService = messagePickupService + public constructor(messagePickupProtocol: V2MessagePickupProtocol) { + this.messagePickupProtocol = messagePickupProtocol } public async handle(messageContext: InboundMessageContext) { const connection = messageContext.assertReadyConnection() - const deliveryRequestMessage = await this.messagePickupService.processStatus(messageContext) + const deliveryRequestMessage = await this.messagePickupProtocol.processStatus(messageContext) if (deliveryRequestMessage) { return new OutboundMessageContext(deliveryRequestMessage, { diff --git a/packages/core/src/modules/routing/MediationRecipientApi.ts b/packages/core/src/modules/routing/MediationRecipientApi.ts index 6b19e80b67..43609fa60b 100644 --- a/packages/core/src/modules/routing/MediationRecipientApi.ts +++ b/packages/core/src/modules/routing/MediationRecipientApi.ts @@ -206,6 +206,12 @@ export class MediationRecipientApi { try { if (pickupStrategy === MediatorPickupStrategy.PickUpV2LiveMode) { // Start Pickup v2 protocol in live mode (retrieve any queued message before) + await this.messagePickupApi.pickupMessages({ + connectionId: mediator.connectionId, + protocolVersion: 'v2', + awaitCompletion: true, + }) + await this.messagePickupApi.setLiveDeliveryMode({ connectionId: mediator.connectionId, liveDelivery: true, @@ -263,8 +269,15 @@ export class MediationRecipientApi { } case MediatorPickupStrategy.PickUpV2LiveMode: // PickUp V2 in Live Mode will retrieve queued messages and then set up live delivery mode - this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`) + this.logger.info(`Starting Live Mode pickup of messages from mediator '${mediatorRecord.id}'`) await this.monitorMediatorWebSocketEvents(mediatorRecord, mediatorPickupStrategy) + + await this.messagePickupApi.pickupMessages({ + connectionId: mediatorConnection.id, + protocolVersion: 'v2', + awaitCompletion: true, + }) + await this.messagePickupApi.setLiveDeliveryMode({ connectionId: mediatorConnection.id, liveDelivery: true,