Skip to content

Commit

Permalink
UMP generic messages #10
Browse files Browse the repository at this point in the history
  • Loading branch information
XY-Wang committed Feb 5, 2024
1 parent 57842bd commit 1595797
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 64 deletions.
8 changes: 4 additions & 4 deletions src/services/monitoring/ops/ump.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ describe('ump operator', () => {

describe('extractUmpReceive', () => {
it('should extract failed UMP received message', done => {
const { successBlocks } = umpReceive;
const { successBlocks, api } = umpReceive;

const calls = jest.fn();

const test$ = extractUmpReceive('1000')(successBlocks);
const test$ = extractUmpReceive(api, '1000')(successBlocks);

test$.subscribe({
next: msg => {
Expand All @@ -70,11 +70,11 @@ describe('ump operator', () => {
});

it('should extract UMP receive with outcome fail', done => {
const { failBlocks } = umpReceive;
const { failBlocks, api } = umpReceive;

const calls = jest.fn();

const test$ = extractUmpReceive('1000')(failBlocks);
const test$ = extractUmpReceive(api, '1000')(failBlocks);

test$.subscribe({
next: msg => {
Expand Down
127 changes: 69 additions & 58 deletions src/services/monitoring/ops/ump.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { mergeMap, map, Observable } from 'rxjs';

import type { SignedBlockExtended } from '@polkadot/api-derive/types';
import type { EventRecord } from '@polkadot/types/interfaces';
import type { U8aFixed, bool } from '@polkadot/types-codec';
import type {
PolkadotRuntimeParachainsInclusionAggregateMessageOrigin,
FrameSupportMessagesProcessMessageError
} from '@polkadot/types/lookup';
import type { ApiPromise } from '@polkadot/api';

import {
ControlQuery,
extractEvents,
filterEvents, filterNonNull,
mongoFilter, types
} from '@sodazone/ocelloids';
Expand All @@ -13,53 +19,48 @@ import {
GenericXcmReceivedWithContext,
GenericXcmSentWithContext,
GetOutboundUmpMessages,
HexString,
XcmCriteria, XcmReceivedWithContext,
XcmSentWithContext
} from '../types.js';
import { getMessageId } from './util.js';
import { getMessageId, getParaIdFromOrigin } from './util.js';
import { asVersionedXcm } from './xcm-format.js';

type EventRecordWithContext = {
record: EventRecord,
method: string,
section: string,
blockNumber: string,
blockHash: HexString
}
type UmpReceivedContext = {
id: U8aFixed,
origin: PolkadotRuntimeParachainsInclusionAggregateMessageOrigin,
success?: bool,
error?: FrameSupportMessagesProcessMessageError
};

function mapUmpQueueMessage(origin: string) {
return (source: Observable<EventRecordWithContext>):
Observable<XcmReceivedWithContext> => {
return (source.pipe(
mongoFilter({
'section': 'messageQueue',
'method': 'Processed'
}),
map(({ record: { event }, blockHash, blockNumber }) => {
const xcmMessage = event.data as any;
const messageId = xcmMessage.id.toHex();
const messageHash = messageId;
const messageOrigin = xcmMessage.origin.toHuman();
const originId = messageOrigin?.Ump?.Para?.replaceAll(',', '');
// If we can get origin ID, only return message if origin matches with subscription origin
// If no origin ID, we will return the message without matching with subscription origin
if (originId === undefined || originId === origin) {
return new GenericXcmReceivedWithContext({
event: event.toHuman(),
blockHash,
blockNumber,
messageHash,
messageId,
outcome: xcmMessage.success.toPrimitive() ? 'Success' : 'Fail',
error: null
});
}
return null;
}),
filterNonNull()
));
};
function createUmpReceivedWithContext(
event: types.BlockEvent,
subOrigin: string,
{
id,
origin,
success,
error
}: UmpReceivedContext
): XcmReceivedWithContext | null {
// Received event only emits field `message_id`,
// which is actually the message hash in the current runtime.
const messageId = id.toHex();
const messageHash = messageId;
const messageOrigin = getParaIdFromOrigin(origin);
// If we can get message origin, only return message if origin matches with subscription origin
// If no origin, we will return the message without matching with subscription origin
if (messageOrigin === undefined || messageOrigin === subOrigin) {
return new GenericXcmReceivedWithContext({
event: event.toHuman(),
blockHash: event.blockHash.toHex(),
blockNumber: event.blockNumber.toPrimitive(),
messageHash,
messageId,
outcome: success?.isTrue ? 'Success' : 'Fail',
error: error ? error.toHuman() : null
});
}
return null;
}

function umpMessagesSent() {
Expand All @@ -73,7 +74,8 @@ function umpMessagesSent() {
blockHash: event.blockHash.toHex(),
blockNumber: event.blockNumber.toPrimitive(),
extrinsicId: event.extrinsicId,
messageHash: xcmMessage.messageHash.toHex(),
messageHash: xcmMessage.messageHash?.toHex(),
messageId: xcmMessage.messageId?.toHex(),
sender: event.extrinsic?.signer.toHuman()
} as XcmSentWithContext;
})
Expand All @@ -89,7 +91,7 @@ function findOutboundUmpMessage(
: Observable<XcmSentWithContext> => {
return source.pipe(
mergeMap(sentMsg => {
const { blockHash, messageHash } = sentMsg;
const { blockHash, messageHash, messageId } = sentMsg;
return getOutboundUmpMessages(blockHash).pipe(
map(messages => {
return messages
Expand All @@ -104,7 +106,7 @@ function findOutboundUmpMessage(
instructions: xcmProgram.toHuman()
});
}).find(msg => {
return msg.messageHash === messageHash;
return messageId ? msg.messageId === messageId : msg.messageHash === messageHash;
});
}),
filterNonNull(),
Expand All @@ -125,8 +127,16 @@ export function extractUmpSend(
: Observable<XcmSentWithContext> => {
return source.pipe(
filterEvents({
'section': 'parachainSystem',
'method': 'UpwardMessageSent'
$or: [
{
'section': 'parachainSystem',
'method': 'UpwardMessageSent'
},
{
'section': 'polkadotXcm',
'method': 'Sent'
}
]
}),
mongoFilter(sendersControl),
umpMessagesSent(),
Expand All @@ -138,20 +148,21 @@ export function extractUmpSend(
};
}

export function extractUmpReceive(origin: string) {
export function extractUmpReceive(api: ApiPromise, originId: string) {
return (source: Observable<SignedBlockExtended>)
: Observable<XcmReceivedWithContext> => {
return (source.pipe(
mergeMap(({ block: { header }, events}) =>
events.map(record => ({
record,
method: record.event.method,
section: record.event.section,
blockNumber: header.number.toPrimitive(),
blockHash: header.hash.toHex()
} as EventRecordWithContext)
)),
mapUmpQueueMessage(origin)
extractEvents(),
map(event => {
if (
api.events.messageQueue.Processed.is(event) ||
api.events.messageQueue.ProcessingFailed.is(event)
) {
return createUmpReceivedWithContext(event, originId, event.data)
}
return null;
}),
filterNonNull()
));
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/services/monitoring/switchboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export class Switchboard {
chainId,
sub: this.#catcher.finalizedBlocks(chainId)
.pipe(
extractUmpReceive(origin),
extractUmpReceive(this.#apis.promise[chainId], origin),
retryWithTruncatedExpBackoff(),
inbound$()
).subscribe(inboundHandler)
Expand Down
16 changes: 15 additions & 1 deletion src/testing/xcm.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable max-len */
import '@polkadot/api-augment/polkadot'

import { jest } from '@jest/globals';

import { from } from 'rxjs';
Expand Down Expand Up @@ -60,7 +62,19 @@ export const umpSend = {

export const umpReceive = {
successBlocks: from(testBlocksFrom('ump-in-success.cbor.bin', 'polkadot.json')),
failBlocks: from(testBlocksFrom('ump-in-fail.cbor.bin', 'polkadot.json'))
failBlocks: from(testBlocksFrom('ump-in-fail.cbor.bin', 'polkadot.json')),
api: {
events: {
messageQueue: {
Processed: {
is: (event) => event.method === 'Processed' && event.section === 'messageQueue'
},
ProcessingFailed: {
is: (event) => event.method === 'ProcessingFailed' && event.section === 'messageQueue'
}
}
}
} as unknown as ApiPromise
};

// DMP testing mocks
Expand Down

0 comments on commit 1595797

Please sign in to comment.