diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 61fac9838..233e1b316 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -13,9 +13,9 @@ import type { } from '../nodes/types'; import type { ClaimEncoded } from '../claims/types'; import type { Timer } from '../types'; +import type { PromiseType } from '../utils/utils'; import Logger from '@matrixai/logger'; import { StartStop, ready } from '@matrixai/async-init/dist/StartStop'; -import { IdInternal } from '@matrixai/id'; import * as nodesErrors from './errors'; import * as nodesUtils from './utils'; import * as networkUtils from '../network/utils'; @@ -24,7 +24,7 @@ import * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb'; import * as claimsErrors from '../claims/errors'; import * as sigchainUtils from '../sigchain/utils'; import * as claimsUtils from '../claims/utils'; -import { timerStart } from '../utils/utils'; +import { promise, timerStart } from '../utils/utils'; interface NodeManager extends StartStop {} @StartStop() @@ -47,6 +47,16 @@ class NodeManager { protected setNodeQueueRunner: Promise; protected setNodeQueueEmpty: Promise; protected setNodeQueueDrained: () => void; + // Refresh bucket timer + protected refreshBucketDeadlineMap: Map = new Map(); + protected refreshBucketTimer: NodeJS.Timer; + protected refreshBucketNext: NodeBucketIndex; + public readonly refreshBucketTimerDefault; + protected refreshBucketQueue: Set = new Set(); + protected refreshBucketQueueRunning: boolean = false; + protected refreshBucketQueueRunner: Promise; + protected refreshBucketQueuePlug_: PromiseType; + protected refreshBucketQueueDrained_: PromiseType; constructor({ db, @@ -54,6 +64,7 @@ class NodeManager { sigchain, nodeConnectionManager, nodeGraph, + refreshBucketTimerDefault = 3600000, // 1 hour in milliseconds logger, }: { db: DB; @@ -61,6 +72,7 @@ class NodeManager { sigchain: Sigchain; nodeConnectionManager: NodeConnectionManager; nodeGraph: NodeGraph; + refreshBucketTimerDefault?: number; logger?: Logger; }) { this.logger = logger ?? new Logger(this.constructor.name); @@ -69,17 +81,22 @@ class NodeManager { this.sigchain = sigchain; this.nodeConnectionManager = nodeConnectionManager; this.nodeGraph = nodeGraph; + this.refreshBucketTimerDefault = refreshBucketTimerDefault; } public async start() { this.logger.info(`Starting ${this.constructor.name}`); this.setNodeQueueRunner = this.startSetNodeQueue(); + this.startRefreshBucketTimers(); + this.refreshBucketQueueRunner = this.startRefreshBucketQueue(); this.logger.info(`Started ${this.constructor.name}`); } public async stop() { this.logger.info(`Stopping ${this.constructor.name}`); await this.stopSetNodeQueue(); + await this.stopRefreshBucketTimers(); + await this.stopRefreshBucketQueue(); this.logger.info(`Stopped ${this.constructor.name}`); } @@ -419,6 +436,8 @@ class NodeManager { // Either already exists or has room in the bucket // We want to add or update the node await this.nodeGraph.setNode(nodeId, nodeAddress, tran); + // Updating the refreshBucket timer + this.refreshBucketUpdateDeadline(bucketIndex); } else { // We want to add a node but the bucket is full // We need to ping the oldest node @@ -434,6 +453,8 @@ class NodeManager { ); await this.nodeGraph.unsetNode(oldNodeId, tran); await this.nodeGraph.setNode(nodeId, nodeAddress, tran); + // Updating the refreshBucket timer + this.refreshBucketUpdateDeadline(bucketIndex); return; } if (blocking) { @@ -491,6 +512,8 @@ class NodeManager { ); const node = (await this.nodeGraph.getNode(nodeId))!; await this.nodeGraph.setNode(nodeId, node.address); + // Updating the refreshBucket timer + this.refreshBucketUpdateDeadline(bucketIndex); } else { this.logger.debug(`Ping failed for ${nodesUtils.encodeNodeId(nodeId)}`); // Otherwise we remove the node @@ -502,6 +525,8 @@ class NodeManager { if (count < this.nodeGraph.nodeBucketLimit) { this.logger.debug(`Bucket ${bucketIndex} now has room, adding new node`); await this.nodeGraph.setNode(nodeId, nodeAddress); + // Updating the refreshBucket timer + this.refreshBucketUpdateDeadline(bucketIndex); } } @@ -623,7 +648,7 @@ class NodeManager { * nodes. * @param bucketIndex */ - private async refreshBucket(bucketIndex: NodeBucketIndex) { + public async refreshBucket(bucketIndex: NodeBucketIndex) { // We need to generate a random nodeId for this bucket const nodeId = this.keyManager.getNodeId(); const bucketRandomNodeId = nodesUtils.generateRandomNodeIdForBucket( @@ -633,6 +658,143 @@ class NodeManager { // We then need to start a findNode procedure await this.nodeConnectionManager.findNode(bucketRandomNodeId); } + + // Refresh bucket activity timer methods + + private startRefreshBucketTimers() { + // Setting initial bucket to refresh + this.refreshBucketNext = 0; + // Setting initial deadline + this.refreshBucketTimerReset(this.refreshBucketTimerDefault); + + for ( + let bucketIndex = 0; + bucketIndex < this.nodeGraph.nodeIdBits; + bucketIndex++ + ) { + const deadline = Date.now() + this.refreshBucketTimerDefault; + this.refreshBucketDeadlineMap.set(bucketIndex, deadline); + } + } + + private async stopRefreshBucketTimers() { + clearTimeout(this.refreshBucketTimer); + } + + private refreshBucketTimerReset(timeout: number) { + clearTimeout(this.refreshBucketTimer); + this.refreshBucketTimer = setTimeout(() => { + this.refreshBucketRefreshTimer(); + }, timeout); + } + + public refreshBucketUpdateDeadline(bucketIndex: NodeBucketIndex) { + // Update the map deadline + this.refreshBucketDeadlineMap.set( + bucketIndex, + Date.now() + this.refreshBucketTimerDefault, + ); + // If the bucket was pending a refresh we remove it + this.refreshBucketQueueRemove(bucketIndex); + if (bucketIndex === this.refreshBucketNext) { + // Bucket is same as next bucket, this affects the timer + this.refreshBucketRefreshTimer(); + } + } + + private refreshBucketRefreshTimer() { + // Getting new closest deadline + let closestBucket = this.refreshBucketNext; + let closestDeadline = Date.now() + this.refreshBucketTimerDefault; + const now = Date.now(); + for (const [bucketIndex, deadline] of this.refreshBucketDeadlineMap) { + // Skip any queued buckets marked by 0 deadline + if (deadline === 0) continue; + if (deadline <= now) { + // Deadline for this has already passed, we add it to the queue + this.refreshBucketQueueAdd(bucketIndex); + continue; + } + if (deadline < closestDeadline) { + closestBucket = bucketIndex; + closestDeadline = deadline; + } + } + // Working out time left + const timeout = closestDeadline - Date.now(); + this.logger.debug( + `Refreshing refreshBucket timer with new timeout ${timeout}`, + ); + // Updating timer and next + this.refreshBucketNext = closestBucket; + this.refreshBucketTimerReset(timeout); + } + + // Refresh bucket async queue methods + + public refreshBucketQueueAdd(bucketIndex: NodeBucketIndex) { + this.logger.debug(`Adding bucket ${bucketIndex} to queue`); + this.refreshBucketDeadlineMap.set(bucketIndex, 0); + this.refreshBucketQueue.add(bucketIndex); + this.refreshBucketQueueUnplug(); + } + + public refreshBucketQueueRemove(bucketIndex: NodeBucketIndex) { + this.logger.debug(`Removing bucket ${bucketIndex} from queue`); + this.refreshBucketQueue.delete(bucketIndex); + } + + public async refreshBucketQueueDrained() { + await this.refreshBucketQueueDrained_.p; + } + + private async startRefreshBucketQueue(): Promise { + this.refreshBucketQueueRunning = true; + this.refreshBucketQueuePlug(); + let iterator: IterableIterator | undefined; + const pace = async () => { + // Wait for plug + await this.refreshBucketQueuePlug_.p; + if (iterator == null) { + iterator = this.refreshBucketQueue[Symbol.iterator](); + } + return this.refreshBucketQueueRunning; + }; + while (await pace()) { + const bucketIndex: NodeBucketIndex = iterator?.next().value; + if (bucketIndex == null) { + // Iterator is empty, plug and continue + iterator = undefined; + this.refreshBucketQueuePlug(); + continue; + } + // Do the job + this.logger.debug( + `processing refreshBucket for bucket ${bucketIndex}, ${this.refreshBucketQueue.size} left in queue`, + ); + await this.refreshBucket(bucketIndex); + // Remove from queue and update bucket deadline + this.refreshBucketQueue.delete(bucketIndex); + this.refreshBucketUpdateDeadline(bucketIndex); + } + this.logger.debug('startRefreshBucketQueue has ended'); + } + + private async stopRefreshBucketQueue(): Promise { + // Flag end and await queue finish + this.refreshBucketQueueRunning = false; + this.refreshBucketQueueUnplug(); + } + + private refreshBucketQueuePlug() { + this.refreshBucketQueuePlug_ = promise(); + this.refreshBucketQueueDrained_?.resolveP(); + } + + private refreshBucketQueueUnplug() { + this.refreshBucketQueueDrained_ = promise(); + this.refreshBucketQueuePlug_?.resolveP(); + } } export default NodeManager; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 7c623e8dd..0b99a8a43 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -170,14 +170,16 @@ function promisify< }; } -/** - * Deconstructed promise - */ -function promise(): { +export type PromiseType = { p: Promise; resolveP: (value: T | PromiseLike) => void; rejectP: (reason?: any) => void; -} { +}; + +/** + * Deconstructed promise + */ +function promise(): PromiseType { let resolveP, rejectP; const p = new Promise((resolve, reject) => { resolveP = resolve; diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 605490dbf..04880a0ff 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -24,7 +24,7 @@ import { generateNodeIdForBucket } from './utils'; describe(`${NodeManager.name} test`, () => { const password = 'password'; - const logger = new Logger(`${NodeManager.name} test`, LogLevel.DEBUG, [ + const logger = new Logger(`${NodeManager.name} test`, LogLevel.WARN, [ new StreamHandler(), ]); let dataDir: string; @@ -864,4 +864,109 @@ describe(`${NodeManager.name} test`, () => { await nodeManager.stop(); } }); + test('should update deadline when updating a bucket', async () => { + const refreshBucketTimeout = 100000; + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: dummyNodeConnectionManager, + refreshBucketTimerDefault: refreshBucketTimeout, + logger, + }); + const mockRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + try { + mockRefreshBucket.mockImplementation(async () => {}); + await nodeManager.start(); + await nodeConnectionManager.start({ nodeManager }); + // @ts-ignore: kidnap map + const deadlineMap = nodeManager.refreshBucketDeadlineMap; + // Getting starting value + const bucket = 0; + const startingDeadline = deadlineMap.get(bucket); + const nodeId = nodesTestUtils.generateNodeIdForBucket( + keyManager.getNodeId(), + bucket, + ); + await sleep(1000); + await nodeManager.setNode(nodeId, {} as NodeAddress); + // Deadline should be updated + const newDeadline = deadlineMap.get(bucket); + expect(newDeadline).not.toEqual(startingDeadline); + } finally { + mockRefreshBucket.mockRestore(); + await nodeManager.stop(); + } + }); + test('should add buckets to the queue when exceeding deadline', async () => { + const refreshBucketTimeout = 100; + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: dummyNodeConnectionManager, + refreshBucketTimerDefault: refreshBucketTimeout, + logger, + }); + const mockRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + const mockRefreshBucketQueueAdd = jest.spyOn( + NodeManager.prototype, + 'refreshBucketQueueAdd', + ); + try { + mockRefreshBucket.mockImplementation(async () => {}); + await nodeManager.start(); + await nodeConnectionManager.start({ nodeManager }); + // Getting starting value + expect(mockRefreshBucketQueueAdd).toHaveBeenCalledTimes(0); + await sleep(200); + expect(mockRefreshBucketQueueAdd).toHaveBeenCalledTimes(256); + } finally { + mockRefreshBucketQueueAdd.mockRestore(); + mockRefreshBucket.mockRestore(); + await nodeManager.stop(); + } + }); + test('should digest queue to refresh buckets', async () => { + const refreshBucketTimeout = 1000000; + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: dummyNodeConnectionManager, + refreshBucketTimerDefault: refreshBucketTimeout, + logger, + }); + const mockRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + try { + await nodeManager.start(); + await nodeConnectionManager.start({ nodeManager }); + mockRefreshBucket.mockImplementation(async () => {}); + nodeManager.refreshBucketQueueAdd(1); + nodeManager.refreshBucketQueueAdd(2); + nodeManager.refreshBucketQueueAdd(3); + nodeManager.refreshBucketQueueAdd(4); + nodeManager.refreshBucketQueueAdd(5); + await nodeManager.refreshBucketQueueDrained(); + expect(mockRefreshBucket).toHaveBeenCalledTimes(5); + + // Add buckets to queue + // check if refresh buckets was called + } finally { + mockRefreshBucket.mockRestore(); + await nodeManager.stop(); + } + }); });