Skip to content

Commit

Permalink
refactor and notifier metrics #1
Browse files Browse the repository at this point in the history
  • Loading branch information
mfornos committed Jan 31, 2024
1 parent 627870f commit ce0fee6
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 61 deletions.
22 changes: 12 additions & 10 deletions src/services/monitoring/matching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export type NotificationReceiver = (message: XcmMessageNotify) => Promise<void>
type SubLevel<TV> = AbstractSublevel<DB, Buffer | Uint8Array | string, string, TV>;

export type ChainBlock = {
chainId: string | number,
chainId: string,
blockHash: string,
blockNumber: string
}
Expand Down Expand Up @@ -64,6 +64,8 @@ export class MatchingEngine extends EventEmitter {
async onOutboundMessage(outMsg: XcmMessageSent) {
const log = this.#log;

this.emit(telemetry.Inbound, outMsg);

// Confirmation key at destination
await this.#mutex.runExclusive(async () => {
const hashKey = `${outMsg.messageHash}:${outMsg.recipient}`;
Expand Down Expand Up @@ -91,7 +93,7 @@ export class MatchingEngine extends EventEmitter {
.del(idKey)
.del(hashKey)
.write();
await this.#notify(outMsg, inMsg);
await this.#notifyMatched(outMsg, inMsg);
} catch {
log.info(
'[%s:o] STORED hash=%s id=%s (subId=%s, block=%s #%s)',
Expand Down Expand Up @@ -119,7 +121,7 @@ export class MatchingEngine extends EventEmitter {
outMsg.blockNumber
);
await this.#inbound.del(hashKey);
await this.#notify(outMsg, inMsg);
await this.#notifyMatched(outMsg, inMsg);
} catch {
log.info(
'[%s:o] STORED hash=%s (subId=%s, block=%s #%s)',
Expand All @@ -138,6 +140,8 @@ export class MatchingEngine extends EventEmitter {
async onInboundMessage(inMsg: XcmMessageReceived) {
const log = this.#log;

this.emit(telemetry.Inbound, inMsg);

await this.#mutex.runExclusive(async () => {
const hashKey = `${inMsg.messageHash}:${inMsg.chainId}`;
const idKey = `${inMsg.messageId}:${inMsg.chainId}`;
Expand All @@ -154,7 +158,7 @@ export class MatchingEngine extends EventEmitter {
inMsg.blockNumber
);
await this.#outbound.del(hashKey);
await this.#notify(outMsg, inMsg);
await this.#notifyMatched(outMsg, inMsg);
} catch {
log.info(
'[%s:i] STORED hash=%s (subId=%s, block=%s #%s)',
Expand Down Expand Up @@ -189,7 +193,7 @@ export class MatchingEngine extends EventEmitter {
.del(idKey)
.del(hashKey)
.write();
await this.#notify(outMsg, inMsg);
await this.#notifyMatched(outMsg, inMsg);
} catch {
log.info(
'[%s:i] STORED hash=%s id=%s (subId=%s, block=%s #%s)',
Expand Down Expand Up @@ -227,19 +231,17 @@ export class MatchingEngine extends EventEmitter {
await this.#mutex.waitForUnlock();
}

async #notify(
async #notifyMatched(
outMsg: XcmMessageSent,
inMsg: XcmMessageReceived
) {
this.emit(telemetry.Matched, inMsg, outMsg);

try {
const message: XcmMessageNotify = new XcmMessageNotify(outMsg, inMsg);
await this.#noticationReceiver(message);

this.emit(telemetry.Notify, message);
} catch (e) {
this.#log.error(e, 'Error on notification');

this.emit(telemetry.NotifyError, e);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/services/monitoring/ops/dmp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function createXcmMessageSent(
{
paraId, data, tx: {extrinsic}
} : {
paraId: number,
paraId: string,
data: Bytes,
tx: types.TxWithIdAndEvent
}) : GenericXcmMessageSentWithContext {
Expand Down Expand Up @@ -114,7 +114,7 @@ function findDmpMessages(api: ApiPromise) {
if (paraIdStr) {
return {
tx,
paraId: parseInt(paraIdStr.replaceAll(',', '')),
paraId: paraIdStr.replaceAll(',', ''),
beneficiary,
assets
};
Expand Down
2 changes: 1 addition & 1 deletion src/services/monitoring/ops/ump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function findOutboundUmpMessage(
return new GenericXcmMessageSentWithContext({
...sentMsg,
messageData: data.toU8a(),
recipient: 0, // always relay
recipient: '0', // always relay
messageHash: xcmProgram.hash.toHex(),
messageId: getMessageId(xcmProgram),
instructions: xcmProgram.toHuman()
Expand Down
4 changes: 2 additions & 2 deletions src/services/monitoring/ops/xcmp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function findOutboundHrmpMessage(
return (source: Observable<XcmMessageSentWithContext>)
: Observable<GenericXcmMessageSentWithContext> => {
return source.pipe(
mergeMap(sentMsg => {
mergeMap((sentMsg): Observable<GenericXcmMessageSentWithContext> => {
const { blockHash, messageHash } = sentMsg;
return getOutboundHrmpMessages(blockHash).pipe(
map(messages => {
Expand All @@ -39,7 +39,7 @@ function findOutboundHrmpMessage(
new GenericXcmMessageSentWithContext({
...sentMsg,
messageData: xcmProgram.toU8a(),
recipient: recipient.toNumber(),
recipient: recipient.toNumber().toString(),
messageHash: xcmProgram.hash.toHex(),
instructions: xcmProgram.toHuman(),
messageId: getMessageId(xcmProgram)
Expand Down
1 change: 1 addition & 0 deletions src/services/monitoring/switchboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ export class Switchboard {
collectTelemetry(collect: (observer: TelemetryObserver) => void) {
collect({ id: TelemetrySources.engine, source: this.#engine});
collect({ id: TelemetrySources.catcher, source: this.#catcher});
collect({ id: TelemetrySources.notifier, source: this.#notifier});
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/services/monitoring/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export type XcmMessageWithContext = {

export interface XcmMessageSentWithContext extends XcmMessageWithContext {
messageData: Uint8Array,
recipient: number,
recipient: string,
sender: AnyJson,
instructions: AnyJson,
}
Expand Down Expand Up @@ -121,7 +121,7 @@ export class XcmMessageReceived {

export class GenericXcmMessageSentWithContext implements XcmMessageSentWithContext {
messageData: Uint8Array;
recipient: number;
recipient: string;
instructions: AnyJson;
messageHash: HexString;
event: AnyJson;
Expand Down Expand Up @@ -164,7 +164,7 @@ export class XcmMessageSent {
subscriptionId: string;
chainId: string;
messageData: string;
recipient: number;
recipient: string;
instructions: AnyJson;
messageHash: HexString;
event: AnyJson;
Expand Down
17 changes: 15 additions & 2 deletions src/services/notification/hub.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Logger, Services } from '../types.js';
import EventEmitter from 'node:events';

import { Logger, Services, TelementryNotifierEvents } from '../types.js';
import { QuerySubscription, XcmMessageNotify } from '../monitoring/types.js';
import { LogNotifier } from './log.js';
import { Notifier } from './types.js';
Expand All @@ -9,18 +11,29 @@ import { WebhookNotifier } from './webhook.js';
*
* Provides resolution of the supported notifiers.
*/
export class NotifierHub implements Notifier {
export class NotifierHub extends EventEmitter implements Notifier {
#log: Logger;
#notifiers: {
[property: string]: Notifier
};

constructor(services: Services) {
super();

this.#log = services.log;
this.#notifiers = {
log: new LogNotifier(services),
webhook: new WebhookNotifier(services)
};

// delegate telemetry events
for (const n of Object.values(this.#notifiers)) {
for (const t of Object.values(TelementryNotifierEvents)) {
n.on(t, (...args: any[]) => {
this.emit(t, ...args);
});
}
}
}

async notify(sub: QuerySubscription, msg: XcmMessageNotify) {
Expand Down
6 changes: 5 additions & 1 deletion src/services/notification/log.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import EventEmitter from 'node:events';

import { Logger, Services } from '../../services/types.js';
import { QuerySubscription, XcmMessageNotify } from '../monitoring/types.js';
import { Notifier } from './types.js';

export class LogNotifier implements Notifier {
export class LogNotifier extends EventEmitter implements Notifier {
#log: Logger;

constructor({ log }: Services) {
super();

this.#log = log;
}

Expand Down
4 changes: 3 additions & 1 deletion src/services/notification/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import EventEmitter from 'node:events';

import { QuerySubscription, XcmMessageNotify } from '../monitoring/types.js';

export interface Notifier {
export interface Notifier extends EventEmitter {
notify(sub: QuerySubscription, msg: XcmMessageNotify) : Promise<void>
}
32 changes: 26 additions & 6 deletions src/services/notification/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import Stream from 'node:stream';
import { EventEmitter } from 'node:events';

import got from 'got';
import { ulid } from 'ulidx';

import version from '../../version.js';
import { QuerySubscription, WebhookNotification, XcmMessageNotify } from '../monitoring/types.js';
import { Logger, Services } from 'services/types.js';
import { Logger, Services, TelementryNotifierEvents as telemetry } from '../types.js';

import { Notifier } from './types.js';
import { Scheduled, Scheduler } from 'services/persistence/scheduler.js';
import { Scheduled, Scheduler } from '../persistence/scheduler.js';

const DEFAULT_DELAY = 300000; // 5 minutes

Expand All @@ -22,7 +22,7 @@ const WebhookTaskType = 'task:webhook';

export const Delivered = Symbol('delivered');

export class WebhookNotifier extends Stream.EventEmitter implements Notifier {
export class WebhookNotifier extends EventEmitter implements Notifier {
#log: Logger;
#scheduler: Scheduler;

Expand Down Expand Up @@ -106,17 +106,27 @@ export class WebhookNotifier extends Stream.EventEmitter implements Notifier {
msg.origin.blockNumber,
msg.destination.blockNumber
);

this.emit(Delivered, {
id,
msg
});

this.emit(telemetry.Notify, {
type: config.type,
subscription: msg.subscriptionId,
origin: msg.origin.chainId,
destination: msg.destination.chainId,
outcome: msg.outcome,
sink: config.url
});
} else {
// Should not enter here, since the non success status codes
// are retryable and will throw an exception when the limit
// is of retries is reached.
// of retries is reached.
this.#log.error(
'Not deliverable webhook %s %s',
config.url,
url,
id
);
}
Expand All @@ -138,6 +148,16 @@ export class WebhookNotifier extends Stream.EventEmitter implements Notifier {
'Scheduled webhook delivery %s',
key
);

this.emit(telemetry.NotifyError, {
type: config.type,
subscription: msg.subscriptionId,
origin: msg.origin.chainId,
destination: msg.destination.chainId,
outcome: msg.outcome,
sink: config.url,
error: 'max_retries'
});
}
}
}
6 changes: 2 additions & 4 deletions src/services/telemetry/exporters/catcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Counter, Histogram } from 'prom-client';

import {
TelementryCatcherEvents, TelemetryObserver
TelementryCatcherEvents as events, TelemetryObserver
} from '../../types.js';

export function catcherExports(
export function catcherMetrics(
{ source }: TelemetryObserver
) {
const timers : Record<string, () => void> = {};
Expand Down Expand Up @@ -36,8 +36,6 @@ export function catcherExports(
labelNames: ['origin']
});

const events = TelementryCatcherEvents;

source.on(events.BlockCacheHit, ({ chainId }) => {
blockCacheHitsCount.labels(chainId).inc();
});
Expand Down
Loading

0 comments on commit ce0fee6

Please sign in to comment.