Skip to content

Commit

Permalink
feat: basic message pthid/thid support (#1381)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <[email protected]>
  • Loading branch information
genaris authored Mar 15, 2023
1 parent d59366a commit f27fb99
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 10 deletions.
11 changes: 8 additions & 3 deletions packages/askar/tests/askar-sqlite.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,14 @@ describeRunInNodeVersion([18], 'Askar SQLite agents', () => {
await bobAgent.wallet.initialize({ id: backupWalletName, key: backupWalletName })

// Expect same basic message record to exist in new wallet
expect(await bobBasicMessageRepository.getById(bobAgent.context, basicMessageRecord.id)).toMatchObject(
basicMessageRecord
)
expect(await bobBasicMessageRepository.getById(bobAgent.context, basicMessageRecord.id)).toMatchObject({
id: basicMessageRecord.id,
connectionId: basicMessageRecord.connectionId,
content: basicMessageRecord.content,
createdAt: basicMessageRecord.createdAt,
updatedAt: basicMessageRecord.updatedAt,
type: basicMessageRecord.type,
})
})

test('changing wallet key', async () => {
Expand Down
17 changes: 15 additions & 2 deletions packages/core/src/modules/basic-messages/BasicMessagesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ export class BasicMessagesApi {
* @throws {MessageSendingError} If message is undeliverable
* @returns the created record
*/
public async sendMessage(connectionId: string, message: string) {
public async sendMessage(connectionId: string, message: string, parentThreadId?: string) {
const connection = await this.connectionService.getById(this.agentContext, connectionId)

const { message: basicMessage, record: basicMessageRecord } = await this.basicMessageService.createMessage(
this.agentContext,
message,
connection
connection,
parentThreadId
)
const outboundMessageContext = new OutboundMessageContext(basicMessage, {
agentContext: this.agentContext,
Expand Down Expand Up @@ -81,6 +82,18 @@ export class BasicMessagesApi {
return this.basicMessageService.getById(this.agentContext, basicMessageRecordId)
}

/**
* Retrieve a basic message record by thread id
*
* @param threadId The thread id
* @throws {RecordNotFoundError} If no record is found
* @throws {RecordDuplicateError} If multiple records are found
* @returns The connection record
*/
public async getByThreadId(basicMessageRecordId: string) {
return this.basicMessageService.getByThreadId(this.agentContext, basicMessageRecordId)
}

/**
* Delete a basic message record by id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,55 @@ describe('Basic Messages E2E', () => {
})
})

test('Alice and Faber exchange messages using threadId', async () => {
testLogger.test('Alice sends message to Faber')
const helloRecord = await aliceAgent.basicMessages.sendMessage(aliceConnection.id, 'Hello')

expect(helloRecord.content).toBe('Hello')

testLogger.test('Faber waits for message from Alice')
const helloMessage = await waitForBasicMessage(faberAgent, {
content: 'Hello',
})

testLogger.test('Faber sends message to Alice')
const replyRecord = await faberAgent.basicMessages.sendMessage(faberConnection.id, 'How are you?', helloMessage.id)
expect(replyRecord.content).toBe('How are you?')
expect(replyRecord.parentThreadId).toBe(helloMessage.id)

testLogger.test('Alice waits until she receives message from faber')
const replyMessage = await waitForBasicMessage(aliceAgent, {
content: 'How are you?',
})
expect(replyMessage.content).toBe('How are you?')
expect(replyMessage.thread?.parentThreadId).toBe(helloMessage.id)

// Both sender and recipient shall be able to find the threaded messages
// Hello message
const aliceHelloMessage = await aliceAgent.basicMessages.getByThreadId(helloMessage.id)
const faberHelloMessage = await faberAgent.basicMessages.getByThreadId(helloMessage.id)
expect(aliceHelloMessage).toMatchObject({
content: helloRecord.content,
threadId: helloRecord.threadId,
})
expect(faberHelloMessage).toMatchObject({
content: helloRecord.content,
threadId: helloRecord.threadId,
})

// Reply message
const aliceReplyMessages = await aliceAgent.basicMessages.findAllByQuery({ parentThreadId: helloMessage.id })
const faberReplyMessages = await faberAgent.basicMessages.findAllByQuery({ parentThreadId: helloMessage.id })
expect(aliceReplyMessages.length).toBe(1)
expect(aliceReplyMessages[0]).toMatchObject({
content: replyRecord.content,
parentThreadId: replyRecord.parentThreadId,
threadId: replyRecord.threadId,
})
expect(faberReplyMessages.length).toBe(1)
expect(faberReplyMessages[0]).toMatchObject(replyRecord)
})

test('Alice is unable to send a message', async () => {
testLogger.test('Alice sends message to Faber that is undeliverable')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export type CustomBasicMessageTags = TagsBase
export type DefaultBasicMessageTags = {
connectionId: string
role: BasicMessageRole
threadId?: string
parentThreadId?: string
}

export type BasicMessageTags = RecordTags<BasicMessageRecord>
Expand All @@ -18,7 +20,8 @@ export interface BasicMessageStorageProps {
connectionId: string
role: BasicMessageRole
tags?: CustomBasicMessageTags

threadId?: string
parentThreadId?: string
content: string
sentTime: string
}
Expand All @@ -28,6 +31,8 @@ export class BasicMessageRecord extends BaseRecord<DefaultBasicMessageTags, Cust
public sentTime!: string
public connectionId!: string
public role!: BasicMessageRole
public threadId?: string
public parentThreadId?: string

public static readonly type = 'BasicMessageRecord'
public readonly type = BasicMessageRecord.type
Expand All @@ -43,6 +48,8 @@ export class BasicMessageRecord extends BaseRecord<DefaultBasicMessageTags, Cust
this.connectionId = props.connectionId
this._tags = props.tags ?? {}
this.role = props.role
this.threadId = props.threadId
this.parentThreadId = props.parentThreadId
}
}

Expand All @@ -51,6 +58,8 @@ export class BasicMessageRecord extends BaseRecord<DefaultBasicMessageTags, Cust
...this._tags,
connectionId: this.connectionId,
role: this.role,
threadId: this.threadId,
parentThreadId: this.parentThreadId,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@ export class BasicMessageService {
this.eventEmitter = eventEmitter
}

public async createMessage(agentContext: AgentContext, message: string, connectionRecord: ConnectionRecord) {
public async createMessage(
agentContext: AgentContext,
message: string,
connectionRecord: ConnectionRecord,
parentThreadId?: string
) {
const basicMessage = new BasicMessage({ content: message })

// If no parentThreadid is defined, there is no need to explicitly send a thread decorator
if (parentThreadId) {
basicMessage.setThread({ parentThreadId })
}

const basicMessageRecord = new BasicMessageRecord({
sentTime: basicMessage.sentTime.toISOString(),
content: basicMessage.content,
connectionId: connectionRecord.id,
role: BasicMessageRole.Sender,
threadId: basicMessage.threadId,
parentThreadId,
})

await this.basicMessageRepository.save(agentContext, basicMessageRecord)
Expand All @@ -47,6 +59,8 @@ export class BasicMessageService {
content: message.content,
connectionId: connection.id,
role: BasicMessageRole.Receiver,
threadId: message.threadId,
parentThreadId: message.thread?.parentThreadId,
})

await this.basicMessageRepository.save(agentContext, basicMessageRecord)
Expand All @@ -73,6 +87,14 @@ export class BasicMessageService {
return this.basicMessageRepository.getById(agentContext, basicMessageRecordId)
}

public async getByThreadId(agentContext: AgentContext, threadId: string) {
return this.basicMessageRepository.getSingleByQuery(agentContext, { threadId })
}

public async findAllByParentThreadId(agentContext: AgentContext, parentThreadId: string) {
return this.basicMessageRepository.findByQuery(agentContext, { parentThreadId })
}

public async deleteById(agentContext: AgentContext, basicMessageRecordId: string) {
const basicMessageRecord = await this.getById(agentContext, basicMessageRecordId)
return this.basicMessageRepository.delete(agentContext, basicMessageRecord)
Expand Down
11 changes: 8 additions & 3 deletions packages/core/tests/wallet.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,14 @@ describe('wallet', () => {
await bobAgent.wallet.initialize({ id: backupWalletName, key: backupWalletName })

// Expect same basic message record to exist in new wallet
expect(await bobBasicMessageRepository.getById(bobAgent.context, basicMessageRecord.id)).toMatchObject(
basicMessageRecord
)
expect(await bobBasicMessageRepository.getById(bobAgent.context, basicMessageRecord.id)).toMatchObject({
id: basicMessageRecord.id,
connectionId: basicMessageRecord.connectionId,
content: basicMessageRecord.content,
createdAt: basicMessageRecord.createdAt,
updatedAt: basicMessageRecord.updatedAt,
type: basicMessageRecord.type,
})
})

test('changing wallet key', async () => {
Expand Down

0 comments on commit f27fb99

Please sign in to comment.