Skip to content

Commit

Permalink
add head-catcher finalizedBlocks test
Browse files Browse the repository at this point in the history
  • Loading branch information
XY-Wang committed Oct 25, 2023
1 parent 763e04b commit 5474bee
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 14 deletions.
137 changes: 133 additions & 4 deletions src/services/monitoring/head-catcher.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
// import { jest } from '@jest/globals';
import { jest } from '@jest/globals';

import { MemoryLevel } from 'memory-level';
import { from, of } from 'rxjs';
import { ApiRx } from '@polkadot/api';
import { ApiRx, ApiPromise } from '@polkadot/api';
// import type { SignedBlockExtended } from '@polkadot/api-derive/types';
import { ApiDecoration } from '@polkadot/api/types';
import type { SignedBlockExtended } from '@polkadot/api-derive/types';
import * as P from '@polkadot/api-derive';

import { HeadCatcher } from './head-catcher.js';
import { _services } from '../../test/services.js';
import Connector from '../networking/connector.js';
import { mockConfigMixed } from '../../test/configs.js';
import { interlayBlocks, polkadotBlocks } from '../../test/blocks.js';
import { DB } from '../types.js';
import { Janitor } from '../persistence/janitor.js';
import { HeadCatcher as HC } from './head-catcher.js';

jest.unstable_mockModule('@polkadot/api-derive', () => {
// const originalModule = await import('@polkadot/api-derive');
return {
__esModule: true,
...P,
createSignedBlockExtended: () => {
return {
block: {
header: {
hash: {
toHex: () => '0xFEEDC0DE'
}
}
},
events: {}
} as unknown as SignedBlockExtended;
}
};
});

const HeadCatcher = (await import('./head-catcher.js')).HeadCatcher;

describe('head catcher', () => {
let catcher: HeadCatcher;
let catcher: HC;
let db: DB;

function sl(chainId: string) {
Expand Down Expand Up @@ -142,4 +167,108 @@ describe('head catcher', () => {
catcher.stop();
});
});

describe('finalizedBlocks', () => {
it('should get block from cache and delete gotten entries if using smoldot', (done) => {
const janitor = {
schedule: () => {}
} as unknown as Janitor;

const headersSource = from(polkadotBlocks.map(tb => tb.block.header));
const blocksSource = from(polkadotBlocks);

catcher = new HeadCatcher({
..._services,
config: mockConfigMixed,
connector: {
connect: () => ({
rx: {
'0': of({
derive: {
chain: {
subscribeNewBlocks: () => blocksSource,
getBlock: (hash) => of(
polkadotBlocks.find(
b => b.block.hash.toHex() === hash.toHex()
)
)
},
},
rpc: {
chain: {
subscribeFinalizedHeads: () => from(headersSource)
},
},
} as unknown as ApiRx),
'1000': of({} as unknown as ApiRx),
'2032': of({} as unknown as ApiRx)
},
promise: {
'0': {
derive: {
chain: {
getBlock: (hash: Uint8Array | string) => of(
polkadotBlocks.find(
b => b.block.hash.eq(hash)
)
)
},
},
registry: {
createType: () => ({})
}
} as unknown as ApiPromise
}
})
} as unknown as Connector,
storage: {
..._services.storage,
root: db
},
janitor
});

const janitorSpy = jest.spyOn(janitor, 'schedule');
const expectedBlocks = [
'0xaf1a3580d45b40b2fc5efd1aa0104e4caa1a20364e9cda17e6cd26032b088b5f',
'0x787a7e572d6a549162fb29495bab1512b8441cedbab2f48113fba9de273501bb',
'0x356f7d037f0ff737b13b1871cbd7a1b9b15b1a75e1e36f8cf27b84943454d875'
];

catcher.start();

blocksSource.subscribe({
complete: async () => {
// Blocks should be put in cache
const blockCache = await sl('0').keys().all();
expect(expectedBlocks.every(k => blockCache.includes(k))).toBe(true);

catcher.finalizedBlocks('0').subscribe({
next: _ => {
expect(janitorSpy).toBeCalledWith({
sublevel: '0:blocks',
key: 'hrmp-messages:0xFEEDC0DE'
},
{
sublevel: '0:blocks',
key: 'ump-messages:0xFEEDC0DE'
},
{
sublevel: '0:blocks',
key: '0xFEEDC0DE'
});
},
complete: async () => {
// Blocks should be deleted from cache
const blockCacheAfter = await sl('0').keys().all();
expect(blockCacheAfter.length).toBe(0);
expect(janitorSpy).toBeCalledTimes(3);
catcher.stop();
done();
}
});
}
});
});
});
});
17 changes: 7 additions & 10 deletions src/services/monitoring/head-catcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ export class HeadCatcher extends EventEmitter {
const db = this.#blockCache(chainId);

this.#log.info('[%s] Register head catcher', chainId);
console.log('register chain', chainId);

const blockPipe = api.pipe(
blocks(),
Expand All @@ -92,11 +91,9 @@ export class HeadCatcher extends EventEmitter {
retryWithTruncatedExpBackoff()
);
const paraPipe = blockPipe.pipe(
tap(b => console.log('block', b.block.header.toHuman())),
mergeMap(block => {
return api.pipe(
switchMap(_api => _api.at(block.block.header.hash)),
tap(at => console.log('ATATATAT', at)),
mergeMap(at =>
zip([
from(
Expand Down Expand Up @@ -139,11 +136,9 @@ export class HeadCatcher extends EventEmitter {

const hash = block.block.header.hash.toHex();
if (hrmpMessages.length > 0) {
console.log('putting hrmp messages');
ops.push(from(db.put('hrmp-messages:' + hash, hrmpMessages.toU8a())));
}
if (umpMessages.length > 0) {
console.log('putting ump messages');
ops.push(from(db.put('ump-messages:' + hash, umpMessages.toU8a())));
}
return ops;
Expand Down Expand Up @@ -352,7 +347,9 @@ export class HeadCatcher extends EventEmitter {
return (source: Observable<Header>)
: Observable<Header> => {
return source.pipe(
mergeMap(head => defer(() => this.#doCatchUp(chainId, api, head))),
mergeMap(head => defer(
() => this.#doCatchUp(chainId, api, head)
)),
retryWithTruncatedExpBackoff(),
mergeMap(head => head)
);
Expand Down Expand Up @@ -435,7 +432,6 @@ export class HeadCatcher extends EventEmitter {
}

async #putBlock(chainId: string, block: SignedBlockExtended) {
console.log('putting block', this.#blockCache(chainId).put);
const hash = block.block.header.hash.toHex();

// TODO: review to use SCALE instead of CBOR
Expand All @@ -448,18 +444,19 @@ export class HeadCatcher extends EventEmitter {

#updateJanitorTasks(chainId: string) {
return ({ block: { header } }: SignedBlockExtended) => {
const blockHash = header.hash.toHex();
this.#janitor.schedule(
{
sublevel: chainId + ':blocks',
key: 'hrmp-messages:' + header.hash.toHex()
key: 'hrmp-messages:' + blockHash
},
{
sublevel: chainId + ':blocks',
key: 'ump-messages:' + header.hash.toHex()
key: 'ump-messages:' + blockHash
},
{
sublevel: chainId + ':blocks',
key: header.hash.toHex()
key: blockHash
}
);
};
Expand Down

0 comments on commit 5474bee

Please sign in to comment.