Skip to content

Commit

Permalink
allow multiple origins #133
Browse files Browse the repository at this point in the history
  • Loading branch information
mfornos committed Jan 15, 2025
1 parent a3b44f9 commit 4a72fbb
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 78 deletions.
9 changes: 4 additions & 5 deletions packages/server/src/server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const testSubContent = {
agent: 'xcm',
owner: 'unknown',
args: {
origin: 'urn:ocn:local:1000',
origins: ['urn:ocn:local:1000'],
senders: ['ALICE'],
events: '*',
destinations: ['urn:ocn:local:2000'],
Expand Down Expand Up @@ -102,7 +102,7 @@ describe('Ocelloids Server HTTP API', () => {
"id": "poison",
"agent": "xcm",
"args": {
"origin": "urn:ocn:local:1000",
"origins": ["urn:ocn:local:1000"],
"senders": "*",
"events": "*",
"destinations": ["urn:ocn:local:2000"]
Expand Down Expand Up @@ -159,7 +159,7 @@ describe('Ocelloids Server HTTP API', () => {
id: 'wild',
agent: 'xcm',
args: {
origin: 'urn:ocn:local:1000',
origins: ['urn:ocn:local:1000'],
senders: '*',
events: '*',
destinations: ['urn:ocn:local:2000'],
Expand Down Expand Up @@ -500,12 +500,11 @@ describe('Ocelloids Server HTTP API', () => {
const schema = response?.json()
expect(response?.statusCode).toStrictEqual(200)
expect(schema.type).toBe('object')
expect(schema.properties.origin).toBeDefined()
expect(schema.properties.origins).toBeDefined()
expect(schema.properties.senders).toBeDefined()
expect(schema.properties.destinations).toBeDefined()
expect(schema.properties.bridges).toBeDefined()
expect(schema.properties.events).toBeDefined()
expect(schema.properties.outboundTTL).toBeDefined()
resolve()
},
)
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/services/admin/routes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('admin api', () => {
)
})

expect(db.get('a')).rejects.toThrow()
await expect(db.get('a')).rejects.toThrow()
})

it('should schedule a task', async () => {
Expand Down
10 changes: 5 additions & 5 deletions packages/server/src/services/agents/xcm/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const testSub: Subscription<XcmInputs> = {
agent: 'xcm',
owner: 'unknown',
args: {
origin: 'urn:ocn:local:1000',
origins: ['urn:ocn:local:1000'],
senders: ['14DqgdKU6Zfh1UjdU4PYwpoHi2QTp37R6djehfbhXe9zoyQT'],
events: '*',
destinations: ['urn:ocn:local:2000'],
Expand Down Expand Up @@ -50,15 +50,15 @@ describe('xcm agent', () => {
const xcmAgent = agentService.getAgentById<XcmAgent>('xcm')
const chainId = 'urn:ocn:local:6001'

expect(() => {
await expect(() => {
xcmAgent.subscribe({
...testSub,
args: {
...testSub.args,
origin: chainId,
origins: [chainId],
},
})
}).toThrow(new ValidationError('Invalid chain id:' + chainId))
}).toThrow(new ValidationError('Invalid origin chain id:' + chainId))
})

it('should handle relay subscriptions', async () => {
Expand All @@ -70,7 +70,7 @@ describe('xcm agent', () => {
...testSub,
args: {
...testSub.args,
origin: 'urn:ocn:local:0',
origins: ['urn:ocn:local:0'],
},
})

Expand Down
50 changes: 33 additions & 17 deletions packages/server/src/services/agents/xcm/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ import { Agent, AgentMetadata, AgentRuntimeContext, Subscribable, getAgentCapabi
import { IngressConsumer } from '@/services/ingress/index.js'
import { filter } from 'rxjs'
import { XcmSubscriptionManager } from './handlers.js'
import { matchMessage, matchSenders, messageCriteria, sendersCriteria } from './ops/criteria.js'
import {
matchMessage,
matchNotificationType,
matchSenders,
messageCriteria,
notificationTypeCriteria,
sendersCriteria,
} from './ops/criteria.js'
import { XcmTracker } from './tracking.js'
import { $XcmInputs, XcmInputs, XcmMessagePayload, XcmSubscriptionHandler } from './types.js'

Expand Down Expand Up @@ -64,10 +71,9 @@ export class XcmAgent implements Agent, Subscribable {

subscribe(subscription: Subscription<XcmInputs>): void {
const { id, args } = subscription
const origin = args.origin as NetworkURN
const dests = args.destinations as NetworkURN[]

this.#validateChainIds([origin, ...dests])
this.#validateChainIds(args)

const handler = this.#monitor(subscription)
this.#subs.set(id, handler)
}
Expand Down Expand Up @@ -126,21 +132,20 @@ export class XcmAgent implements Agent, Subscribable {
#monitor(subscription: Subscription<XcmInputs>): XcmSubscriptionHandler {
const {
id,
args: { origin, destinations, senders },
args: { origins, destinations, senders, events },
} = subscription

const sendersControl = ControlQuery.from(sendersCriteria(senders))
const originsControl = ControlQuery.from(messageCriteria([origin] as NetworkURN[]))
const destinationsControl = ControlQuery.from(messageCriteria(destinations as NetworkURN[]))
const originsControl = ControlQuery.from(messageCriteria(origins))
const destinationsControl = ControlQuery.from(messageCriteria(destinations))
const notificationTypeControl = ControlQuery.from(notificationTypeCriteria(events))

// (args.events === undefined || args.events === '*' || args.events.includes(payload.type))
// TODO
// here with subscription id
const stream = this.#tracker.xcm$
.pipe(
filter((payload) => {
return (
/*matchEvents &&*/ matchMessage(originsControl, payload.origin) &&
matchNotificationType(notificationTypeControl, payload.type) &&
matchMessage(originsControl, payload.origin) &&
matchMessage(destinationsControl, payload.destination) &&
matchSenders(sendersControl, payload.sender)
)
Expand Down Expand Up @@ -172,14 +177,25 @@ export class XcmAgent implements Agent, Subscribable {
sendersControl,
originsControl,
destinationsControl,
notificationTypeControl,
}
}

#validateChainIds(chainIds: NetworkURN[]) {
chainIds.forEach((chainId) => {
if (!this.#ingress.isNetworkDefined(chainId)) {
throw new ValidationError('Invalid chain id:' + chainId)
}
})
#validateChainIds({ destinations, origins }: XcmInputs) {
if (destinations !== '*') {
destinations.forEach((chainId) => {
if (!this.#ingress.isNetworkDefined(chainId as NetworkURN)) {
throw new ValidationError('Invalid destination chain id:' + chainId)
}
})
}

if (origins !== '*') {
origins.forEach((chainId) => {
if (!this.#ingress.isNetworkDefined(chainId as NetworkURN)) {
throw new ValidationError('Invalid origin chain id:' + chainId)
}
})
}
}
}
60 changes: 51 additions & 9 deletions packages/server/src/services/agents/xcm/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import { Operation } from 'rfc6902'

import { Logger, NetworkURN } from '@/services/index.js'
import { Logger } from '@/services/index.js'
import { Subscription } from '@/services/subscriptions/types.js'
import { IngressConsumer } from 'services/ingress/index.js'
import { SubscriptionUpdater, hasOp } from '../base/updater.js'
import type { XcmAgent } from './agent.js'
import { messageCriteria, sendersCriteria } from './ops/criteria.js'
import { messageCriteria, notificationTypeCriteria, sendersCriteria } from './ops/criteria.js'
import { $XcmInputs, XcmInputs, XcmSubscriptionHandler } from './types.js'

const allowedPaths = ['/args/senders', '/args/destinations', '/channels', '/public', '/args/events']
const ALLOWED_PATHS = [
'/args/senders',
'/args/destinations',
'/args/origins',
'/args/events',
'/channels',
'/public',
]

/**
* Manages the XCM agent subscription handlers.
Expand All @@ -24,7 +31,7 @@ export class XcmSubscriptionManager {
constructor(log: Logger, ingress: IngressConsumer, agent: XcmAgent) {
this.#log = log
this.#agent = agent
this.#updater = new SubscriptionUpdater(ingress, allowedPaths)
this.#updater = new SubscriptionUpdater(ingress, ALLOWED_PATHS)
this.#handlers = {}
}

Expand Down Expand Up @@ -92,18 +99,27 @@ export class XcmSubscriptionManager {
argsSchema: $XcmInputs,
})

this.#updater.validateNetworks(toUpdate.args.destinations)
if (toUpdate.args.destinations !== '*') {
this.#updater.validateNetworks(toUpdate.args.destinations)
}
if (toUpdate.args.origins !== '*') {
this.#updater.validateNetworks(toUpdate.args.origins)
}

if (hasOp(patch, '/args/senders')) {
this.#updateSenders(toUpdate)
}

if (hasOp(patch, '/args/destinations')) {
this.#updateDestinationMessageControl(toUpdate)
this.#updateDestinations(toUpdate)
}

if (hasOp(patch, '/args/origins')) {
this.#updateOrigins(toUpdate)
}

if (hasOp(patch, '/args/events')) {
// this.#updateEvents(toUpdate)
this.#updateEvents(toUpdate)
}

this.#updateDescriptor(toUpdate)
Expand All @@ -120,6 +136,19 @@ export class XcmSubscriptionManager {
})
}

/**
* Updates the notification events control handler.
*/
#updateEvents(toUpdate: Subscription<XcmInputs>) {
const {
id,
args: { events },
} = toUpdate
const { notificationTypeControl } = this.#handlers[id]

notificationTypeControl.change(notificationTypeCriteria(events))
}

/**
* Updates the senders control handler.
*
Expand All @@ -135,17 +164,30 @@ export class XcmSubscriptionManager {
sendersControl.change(sendersCriteria(senders))
}

/**
* Updates the oriigins control handler.
*/
#updateOrigins(toUpdate: Subscription<XcmInputs>) {
const {
id,
args: { origins },
} = toUpdate
const { originsControl } = this.#handlers[id]

originsControl.change(messageCriteria(origins))
}

/**
* Updates the message control handler.
*/
#updateDestinationMessageControl(toUpdate: Subscription<XcmInputs>) {
#updateDestinations(toUpdate: Subscription<XcmInputs>) {
const {
id,
args: { destinations },
} = toUpdate
const { destinationsControl } = this.#handlers[id]

destinationsControl.change(messageCriteria(destinations as NetworkURN[]))
destinationsControl.change(messageCriteria(destinations))
}

#updateDescriptor(toUpdate: Subscription<XcmInputs>) {
Expand Down
50 changes: 35 additions & 15 deletions packages/server/src/services/agents/xcm/ops/criteria.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
import { ControlQuery, Criteria } from '@/common/index.js'

import { SignerData } from '../../../subscriptions/types.js'
import { NetworkURN } from '../../../types.js'
import { XcmTerminus } from '../types.js'
import { XcmNotificationType, XcmTerminus } from '../types.js'

const MATCH_ANY = {}

export function sendersCriteria(senders?: string[] | '*'): Criteria {
if (senders === undefined || senders === '*') {
// match any
return {}
} else {
return {
$or: [
{ 'sender.signer.id': { $in: senders } },
{ 'sender.signer.publicKey': { $in: senders } },
{ 'sender.extraSigners.id': { $in: senders } },
{ 'sender.extraSigners.publicKey': { $in: senders } },
],
}
return MATCH_ANY
}

return {
$or: [
{ 'sender.signer.id': { $in: senders } },
{ 'sender.signer.publicKey': { $in: senders } },
{ 'sender.extraSigners.id': { $in: senders } },
{ 'sender.extraSigners.publicKey': { $in: senders } },
],
}
}

// Assuming we are in the same consensus
export function messageCriteria(chainIds: NetworkURN[]): Criteria {
export function messageCriteria(chainIds: string[] | '*'): Criteria {
if (chainIds === '*') {
return MATCH_ANY
}

return {
chainId: { $in: chainIds },
}
}

export function notificationTypeCriteria(types?: string[] | '*'): Criteria {
if (types === undefined || types === '*') {
return MATCH_ANY
}

return {
notificationType: { $in: types },
}
}

/**
* Matches sender account address and public keys, including extra senders.
*/
Expand All @@ -48,3 +61,10 @@ export function matchSenders(query: ControlQuery, sender?: SignerData): boolean
export function matchMessage(query: ControlQuery, xcm: XcmTerminus): boolean {
return query.value.test({ chainId: xcm.chainId })
}

/**
* Matches XCM notification types
*/
export function matchNotificationType(query: ControlQuery, notificationType: XcmNotificationType) {
return query.value.test({ notificationType })
}
Loading

0 comments on commit 4a72fbb

Please sign in to comment.