From aac64d2288c0ea55e75c528f1f6be7f678ff16f2 Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 29 May 2020 10:41:27 -0500 Subject: [PATCH 1/9] refactor: convert to typescript --- .eslintrc.js | 34 +++ package.json | 16 +- src/constants.js | 24 --- src/index.d.ts | 44 ---- src/message/index.js | 7 - src/utils/index.js | 6 - ts/constants.ts | 24 +++ src/getGossipPeers.js => ts/getGossipPeers.ts | 12 +- src/heartbeat.js => ts/heartbeat.ts | 42 ++-- src/index.js => ts/index.ts | 196 ++++++++++-------- ts/message/index.ts | 67 ++++++ .../rpc.proto.js => ts/message/rpc.proto.ts | 3 +- src/messageCache.js => ts/messageCache.ts | 46 ++-- ts/peer.ts | 29 +++ {src => ts}/pubsub.js | 12 +- .../utils/createGossipRpc.ts | 4 +- ts/utils/index.ts | 2 + src/utils/shuffle.js => ts/utils/shuffle.ts | 2 +- tsconfig.json | 20 +- 19 files changed, 348 insertions(+), 242 deletions(-) create mode 100644 .eslintrc.js delete mode 100644 src/constants.js delete mode 100644 src/index.d.ts delete mode 100644 src/message/index.js delete mode 100644 src/utils/index.js create mode 100644 ts/constants.ts rename src/getGossipPeers.js => ts/getGossipPeers.ts (70%) rename src/heartbeat.js => ts/heartbeat.ts (82%) rename src/index.js => ts/index.ts (75%) create mode 100644 ts/message/index.ts rename src/message/rpc.proto.js => ts/message/rpc.proto.ts (96%) rename src/messageCache.js => ts/messageCache.ts (73%) create mode 100644 ts/peer.ts rename {src => ts}/pubsub.js (97%) rename src/utils/createGossipRpc.js => ts/utils/createGossipRpc.ts (66%) create mode 100644 ts/utils/index.ts rename src/utils/shuffle.js => ts/utils/shuffle.ts (90%) diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 00000000..d2a1a04e --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,34 @@ +// from https://dev.to/itmayziii/typescript-eslint-and-standardjs-5hmd +module.exports = { + 'parser': '@typescript-eslint/parser', + 'parserOptions': { + 'project': './tsconfig.json', // Required to have rules that rely on Types. + 'tsconfigRootDir': './' + }, + 'extends': [ + 'plugin:@typescript-eslint/recommended', // Out of the box Typescript rules + 'standard' // Out of the box StandardJS rules + ], + 'plugins': [ + '@typescript-eslint' // Let's us override rules below. + ], + 'rules': { +// '@typescript-eslint/no-use-before-define': 'off', // Allows us to hoist variables and functions which I am a fan of, functions not variables that is. +// '@typescript-eslint/no-explicit-any': 'off', // Too strict for my case, sometimes I need an any type + '@typescript-eslint/member-delimiter-style': ['error', { // Prevents us from using any delimiter for interface properties. + 'multiline': { + 'delimiter': 'none', + 'requireLast': false + }, + 'singleline': { + 'delimiter': 'comma', + 'requireLast': false + } + }], + '@typescript-eslint/indent': 'off', // This is the job of StandardJS, they are competing rules so we turn off the Typescript one. + 'no-unused-vars': 'off', // On the fence about using this one, sometimes we import a package that is never used directly. + 'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords. + '@typescript-eslint/no-non-null-assertion': 'off', + 'no-mixed-operators': 'off' + } +} diff --git a/package.json b/package.json index eda14ef6..f8b86cdb 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "libp2p-gossipsub", "version": "0.4.2", - "description": "A javascript implementation of gossipsub", + "description": "A typescript implementation of gossipsub", "leadMaintainer": "Cayman Nava ", "main": "src/index.js", "files": [ @@ -10,8 +10,9 @@ ], "types": "src/index.d.ts", "scripts": { - "lint": "aegir lint", + "lint": "eslint --ext .ts ts", "release": "aegir release", + "prebuild": "tsc", "build": "aegir build", "test": "aegir test", "test:node": "aegir test --target node", @@ -48,12 +49,20 @@ "devDependencies": { "@types/chai": "^4.2.3", "@types/mocha": "^7.0.2", + "@typescript-eslint/eslint-plugin": "^3.0.2", + "@typescript-eslint/parser": "^3.0.2", "aegir": "^21.10.2", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", + "eslint": "^7.1.0", + "eslint-config-standard": "^14.1.1", + "eslint-plugin-import": "^2.20.2", + "eslint-plugin-node": "^11.1.0", + "eslint-plugin-promise": "^4.2.1", + "eslint-plugin-standard": "^4.0.1", "it-pair": "^1.0.0", "libp2p-floodsub": "^0.21.0", "lodash": "^4.17.15", @@ -61,7 +70,8 @@ "p-times": "^2.1.0", "p-wait-for": "^3.1.0", "promisify-es6": "^1.0.3", - "sinon": "^9.0.2" + "sinon": "^9.0.2", + "typescript": "^3.9.3" }, "contributors": [ "Cayman ", diff --git a/src/constants.js b/src/constants.js deleted file mode 100644 index b636e134..00000000 --- a/src/constants.js +++ /dev/null @@ -1,24 +0,0 @@ -'use strict' - -const second = exports.second = 1000 -const minute = exports.minute = 60 * second - -// Protocol identifiers -exports.FloodSubID = '/floodsub/1.0.0' -exports.GossipSubID = '/meshsub/1.0.0' - -// Overlay parameters -exports.GossipSubD = 6 -exports.GossipSubDlo = 4 -exports.GossipSubDhi = 12 - -// Gossip parameters -exports.GossipSubHistoryLength = 5 -exports.GossipSubHistoryGossip = 3 - -// Heartbeat interval -exports.GossipSubHeartbeatInitialDelay = 100 / second -exports.GossipSubHeartbeatInterval = second - -// Fanout ttl -exports.GossipSubFanoutTTL = minute diff --git a/src/index.d.ts b/src/index.d.ts deleted file mode 100644 index 8737c767..00000000 --- a/src/index.d.ts +++ /dev/null @@ -1,44 +0,0 @@ -// Type definitions for libp2p-gossipsub v0.2.3 -// Project https://github.com/ChainSafe/gossipsub-js - -/// - -import PeerId = require('peer-id'); - -export interface Registrar { - handle: Function; - register(topology: Object): string; - unregister(id: string): boolean; -} - -export interface IGossipMessage { - from: Buffer | string; - data: Buffer; - seqno: Buffer; - topicIDs: string[]; -} - -export interface Options { - emitSelf?: boolean, - gossipIncoming?: boolean, - fallbackToFloodsub?: boolean, -} - -import * as Events from "events"; - -interface GossipSub extends Events.EventEmitter {} - -declare class GossipSub { - constructor(peerId: PeerId, registrar: Registrar, options: Options); - publish(topic: string, data: Buffer): Promise; - start(): Promise; - stop(): Promise; - subscribe(topic: string): void; - unsubscribe(topic: string): void; - validate(message: IGossipMessage): Promise; - _emitMessage(topics: string[], message: IGossipMessage): void; - getTopics(): string[]; - _publish(messages: IGossipMessage[]): void; -} - -export default GossipSub; \ No newline at end of file diff --git a/src/message/index.js b/src/message/index.js deleted file mode 100644 index 0519b339..00000000 --- a/src/message/index.js +++ /dev/null @@ -1,7 +0,0 @@ -'use strict' - -const protons = require('protons') - -const rpcProto = protons(require('./rpc.proto.js')) - -exports.RPC = rpcProto.RPC diff --git a/src/utils/index.js b/src/utils/index.js deleted file mode 100644 index 88ea3d7f..00000000 --- a/src/utils/index.js +++ /dev/null @@ -1,6 +0,0 @@ -'use strict' - -module.exports = { - ...require('./createGossipRpc'), - ...require('./shuffle') -} diff --git a/ts/constants.ts b/ts/constants.ts new file mode 100644 index 00000000..93c12e11 --- /dev/null +++ b/ts/constants.ts @@ -0,0 +1,24 @@ +'use strict' + +const second = exports.second = 1000 +const minute = exports.minute = 60 * second + +// Protocol identifiers +export const FloodSubID = '/floodsub/1.0.0' +export const GossipSubID = '/meshsub/1.0.0' + +// Overlay parameters +export const GossipSubD = 6 +export const GossipSubDlo = 4 +export const GossipSubDhi = 12 + +// Gossip parameters +export const GossipSubHistoryLength = 5 +export const GossipSubHistoryGossip = 3 + +// Heartbeat interval +export const GossipSubHeartbeatInitialDelay = 100 / second +export const GossipSubHeartbeatInterval = second + +// Fanout ttl +export const GossipSubFanoutTTL = minute diff --git a/src/getGossipPeers.js b/ts/getGossipPeers.ts similarity index 70% rename from src/getGossipPeers.js rename to ts/getGossipPeers.ts index 1fc48736..a40686bc 100644 --- a/src/getGossipPeers.js +++ b/ts/getGossipPeers.ts @@ -1,7 +1,7 @@ -'use strict' - -const constants = require('./constants') -const { shuffle } = require('./utils') +import * as constants from './constants' +import { shuffle } from './utils' +import { GossipSub } from './index' +import { Peer } from './peer' /** * Given a topic, returns up to count peers subscribed to that topic @@ -12,14 +12,14 @@ const { shuffle } = require('./utils') * @returns {Set} * */ -module.exports = function getGossipPeers (router, topic, count) { +export function getGossipPeers (router: GossipSub, topic: string, count: number): Set { const peersInTopic = router.topics.get(topic) if (!peersInTopic) { return new Set() } // Adds all peers using our protocol - let peers = [] + let peers: Peer[] = [] peersInTopic.forEach((peer) => { if (peer.protocols.includes(constants.GossipSubID)) { peers.push(peer) diff --git a/src/heartbeat.js b/ts/heartbeat.ts similarity index 82% rename from src/heartbeat.js rename to ts/heartbeat.ts index 99c04849..c97fcbd6 100644 --- a/src/heartbeat.js +++ b/ts/heartbeat.ts @@ -1,20 +1,29 @@ 'use strict' -const errcode = require('err-code') -const constants = require('./constants') -const getGossipPeers = require('./getGossipPeers') -const { shuffle } = require('./utils') +import * as constants from './constants' +import { getGossipPeers } from './getGossipPeers' +import { shuffle } from './utils' +import { GossipSub } from './index' +import { Peer } from './peer' +import errcode = require('err-code') + +export class Heartbeat { + gossipsub: GossipSub + _heartbeatTimer: { + _intervalId: NodeJS.Timeout | undefined + runPeriodically (fn: () => void, period: number): void + cancel (): void + } | null -class Heartbeat { /** * @param {Object} gossipsub * @constructor */ - constructor (gossipsub) { + constructor (gossipsub: GossipSub) { this.gossipsub = gossipsub } - start () { + start (): void { if (this._heartbeatTimer) { const errMsg = 'Heartbeat timer is already running' this.gossipsub.log(errMsg) @@ -25,18 +34,17 @@ class Heartbeat { const timeout = setTimeout(() => { heartbeat() - this._heartbeatTimer.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval) + this._heartbeatTimer!.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval) }, constants.GossipSubHeartbeatInitialDelay) this._heartbeatTimer = { - _onCancel: null, - _intervalId: null, + _intervalId: undefined, runPeriodically: (fn, period) => { - this._heartbeatTimer._intervalId = setInterval(fn, period) + this._heartbeatTimer!._intervalId = setInterval(fn, period) }, cancel: () => { clearTimeout(timeout) - clearInterval(this._heartbeatTimer._intervalId) + clearInterval(this._heartbeatTimer!._intervalId as NodeJS.Timeout) } } } @@ -46,7 +54,7 @@ class Heartbeat { * @override * @returns {void} */ - stop () { + stop (): void { if (!this._heartbeatTimer) { const errMsg = 'Heartbeat timer is not running' this.gossipsub.log(errMsg) @@ -62,7 +70,7 @@ class Heartbeat { * * @returns {void} */ - _heartbeat () { + _heartbeat (): void { // flush pending control message from retries and gossip // that hasn't been piggybacked since the last heartbeat this.gossipsub._flush() @@ -70,8 +78,8 @@ class Heartbeat { /** * @type {Map>} */ - const tograft = new Map() - const toprune = new Map() + const tograft = new Map() + const toprune = new Map() // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { @@ -160,5 +168,3 @@ class Heartbeat { this.gossipsub.emit('gossipsub:heartbeat') } } - -module.exports = Heartbeat diff --git a/src/index.js b/ts/index.ts similarity index 75% rename from src/index.js rename to ts/index.ts index 33425a68..26c5b0a6 100644 --- a/src/index.js +++ b/ts/index.ts @@ -1,18 +1,35 @@ -'use strict' - -const { utils } = require('libp2p-pubsub') -const TimeCache = require('time-cache') - -const BasicPubsub = require('./pubsub') -const { MessageCache } = require('./messageCache') +import { utils } from 'libp2p-pubsub' +import { MessageCache } from './messageCache' +import { + RPCCodec, + RPC, Message, InMessage, + ControlMessage, ControlIHave, ControlGraft, ControlIWant, ControlPrune +} from './message' +import * as constants from './constants' +import { Heartbeat } from './heartbeat' +import { getGossipPeers } from './getGossipPeers' +import { createGossipRpc } from './utils' +import { Peer, Registrar } from './peer' +import TimeCache = require('time-cache') +import PeerId = require('peer-id') +import BasicPubsub = require('./pubsub') + +interface GossipOptions { + emitSelf: boolean + gossipIncoming: boolean + fallbackToFloodsub: boolean + msgIdFn: (msg: Message) => string + messageCache: MessageCache +} -const { RPC } = require('./message') -const constants = require('./constants') -const Heartbeat = require('./heartbeat') -const getGossipPeers = require('./getGossipPeers') -const { createGossipRpc } = require('./utils') +export class GossipSub extends BasicPubsub { + mesh: Map> + fanout: Map> + lastPub: Map + gossip: Map + control: Map + _options: GossipOptions -class GossipSub extends BasicPubsub { /** * @param {PeerId} peerId instance of the peer's PeerId * @param {Object} registrar @@ -27,7 +44,7 @@ class GossipSub extends BasicPubsub { * @param {Object} [options.messageCache] override the default MessageCache * @constructor */ - constructor (peerId, registrar, options = {}) { + constructor (peerId: PeerId, registrar: Registrar, options: Partial = {}) { const multicodecs = [constants.GossipSubID] const _options = { gossipIncoming: true, @@ -45,7 +62,7 @@ class GossipSub extends BasicPubsub { multicodecs, peerId, registrar, - options: _options + options: _options as GossipOptions }) /** @@ -79,14 +96,14 @@ class GossipSub extends BasicPubsub { /** * Map of pending messages to gossip * - * @type {Map> } + * @type {Map> } */ this.gossip = new Map() /** * Map of control messages * - * @type {Map} + * @type {Map} */ this.control = new Map() @@ -113,18 +130,18 @@ class GossipSub extends BasicPubsub { * @param {Peer} peer * @returns {Peer} */ - _removePeer (peer) { + _removePeer (peer: Peer): Peer { super._removePeer(peer) // Remove this peer from the mesh // eslint-disable-next-line no-unused-vars - for (const [_, peers] of this.mesh.entries()) { + for (const peers of this.mesh.values()) { peers.delete(peer) } // Remove this peer from the fanout // eslint-disable-next-line no-unused-vars - for (const [_, peers] of this.fanout.entries()) { + for (const peers of this.fanout.values()) { peers.delete(peer) } @@ -145,18 +162,20 @@ class GossipSub extends BasicPubsub { * @param {RPC} rpc * @returns {void} */ - _processRpc (idB58Str, peer, rpc) { + _processRpc (idB58Str: string, peer: Peer, rpc: RPC): void { super._processRpc(idB58Str, peer, rpc) - this._processRpcControlMessage(peer, rpc.control) + if (rpc.control) { + this._processRpcControlMessage(peer, rpc.control) + } } /** * Handles an rpc control message from a peer * @param {Peer} peer - * @param {RPC.ControlMessage} controlMsg + * @param {ControlMessage} controlMsg * @returns {void} */ - _processRpcControlMessage (peer, controlMsg) { + _processRpcControlMessage (peer: Peer, controlMsg: ControlMessage): void { if (!controlMsg) { return } @@ -170,7 +189,7 @@ class GossipSub extends BasicPubsub { return } - const outRpc = createGossipRpc([], { ihave, iwant, prune }) + const outRpc = createGossipRpc(ihave, { iwant: [iwant], prune }) this._sendRpc(peer, outRpc) } @@ -179,9 +198,9 @@ class GossipSub extends BasicPubsub { * emitting locally and forwarding on to relevant floodsub and gossipsub peers * @override * @param {Peer} peer - * @param {RPC.Message} msg + * @param {Message} msg */ - _processRpcMessage (peer, msg) { + _processRpcMessage (peer: Peer, msg: InMessage): void { const msgID = this.getMsgId(msg) // Ignore if we've already seen the message @@ -212,10 +231,11 @@ class GossipSub extends BasicPubsub { // Emit to peers in the mesh topics.forEach((topic) => { - if (!this.mesh.has(topic)) { + const meshPeers = this.mesh.get(topic) + if (!meshPeers) { return } - this.mesh.get(topic).forEach((peer) => { + meshPeers.forEach((peer) => { if (!peer.isWritable || peer.id.toB58String() === msg.from) { return } @@ -228,14 +248,14 @@ class GossipSub extends BasicPubsub { /** * Handles IHAVE messages * @param {Peer} peer - * @param {Array} ihave - * @returns {RPC.ControlIWant} + * @param {Array} ihave + * @returns {ControlIWant} */ - _handleIHave (peer, ihave) { - const iwant = new Set() + _handleIHave (peer: Peer, ihave: ControlIHave[]): ControlIWant | undefined { + const iwant = new Set() ihave.forEach(({ topicID, messageIDs }) => { - if (!this.mesh.has(topicID)) { + if (!topicID || !this.mesh.has(topicID)) { return } @@ -262,12 +282,12 @@ class GossipSub extends BasicPubsub { * Handles IWANT messages * Returns messages to send back to peer * @param {Peer} peer - * @param {Array} iwant - * @returns {Array} + * @param {Array} iwant + * @returns {Array} */ - _handleIWant (peer, iwant) { - // @type {Map} - const ihave = new Map() + _handleIWant (peer: Peer, iwant: ControlIWant[]): Message[] | undefined { + // @type {Map} + const ihave = new Map() iwant.forEach(({ messageIDs }) => { messageIDs.forEach((msgID) => { @@ -290,13 +310,16 @@ class GossipSub extends BasicPubsub { /** * Handles Graft messages * @param {Peer} peer - * @param {Array} graft - * @return {Array} + * @param {Array} graft + * @return {Array} */ - _handleGraft (peer, graft) { - const prune = [] + _handleGraft (peer: Peer, graft: ControlGraft[]): ControlPrune[] | undefined { + const prune: string[] = [] graft.forEach(({ topicID }) => { + if (!topicID) { + return + } const peers = this.mesh.get(topicID) if (!peers) { prune.push(topicID) @@ -324,11 +347,14 @@ class GossipSub extends BasicPubsub { /** * Handles Prune messages * @param {Peer} peer - * @param {Array} prune + * @param {Array} prune * @returns {void} */ - _handlePrune (peer, prune) { + _handlePrune (peer: Peer, prune: ControlPrune[]): void { prune.forEach(({ topicID }) => { + if (!topicID) { + return + } const peers = this.mesh.get(topicID) if (peers) { this.log('PRUNE: Remove mesh link to %s in %s', peer.id.toB58String(), topicID) @@ -344,7 +370,7 @@ class GossipSub extends BasicPubsub { * @override * @returns {Promise} */ - async start () { + async start (): Promise { await super.start() this.heartbeat.start() } @@ -354,7 +380,7 @@ class GossipSub extends BasicPubsub { * @override * @returns {Promise} */ - async stop () { + async stop (): Promise { await super.stop() this.heartbeat.stop() @@ -372,7 +398,7 @@ class GossipSub extends BasicPubsub { * @param {Array} topics * @returns {void} */ - _subscribe (topics) { + _subscribe (topics: string[]): void { super._subscribe(topics) this.join(topics) } @@ -384,7 +410,7 @@ class GossipSub extends BasicPubsub { * @param {Array} topics * @returns {void} */ - _unsubscribe (topics) { + _unsubscribe (topics: string[]): void { super._unsubscribe(topics) this.leave(topics) } @@ -394,7 +420,7 @@ class GossipSub extends BasicPubsub { * @param {Array|string} topics * @returns {void} */ - join (topics) { + join (topics: string[] | string): void { if (!this.started) { throw new Error('GossipSub has not started') } @@ -402,7 +428,7 @@ class GossipSub extends BasicPubsub { this.log('JOIN %s', topics) - topics.forEach((topic) => { + ;(topics as string[]).forEach((topic) => { // Send GRAFT to mesh peers const fanoutPeers = this.fanout.get(topic) if (fanoutPeers) { @@ -413,7 +439,7 @@ class GossipSub extends BasicPubsub { const peers = getGossipPeers(this, topic, constants.GossipSubD) this.mesh.set(topic, peers) } - this.mesh.get(topic).forEach((peer) => { + this.mesh.get(topic)!.forEach((peer) => { this.log('JOIN: Add mesh link to %s in %s', peer.id.toB58String(), topic) this._sendGraft(peer, topic) }) @@ -425,12 +451,12 @@ class GossipSub extends BasicPubsub { * @param {Array|string} topics * @returns {void} */ - leave (topics) { + leave (topics: string[] | string): void { topics = utils.ensureArray(topics) this.log('LEAVE %s', topics) - topics.forEach((topic) => { + ;(topics as string[]).forEach((topic) => { // Send PRUNE to mesh peers const meshPeers = this.mesh.get(topic) if (meshPeers) { @@ -447,10 +473,10 @@ class GossipSub extends BasicPubsub { * Override the default implementation in BasicPubSub. * If we don't provide msgIdFn in constructor option, it's the same. * @override - * @param {RPC.Message} msg the message object + * @param {Message} msg the message object * @returns {string} message id as string */ - getMsgId (msg) { + getMsgId (msg: InMessage): string { return this._msgIdFn(msg) } @@ -459,18 +485,17 @@ class GossipSub extends BasicPubsub { * * Note: this function assumes all messages are well-formed RPC objects * @override - * @param {Array} rpcs + * @param {Array} msgs * @returns {void} */ - _publish (rpcs) { - rpcs.forEach((msgObj) => { + _publish (msgs: InMessage[]): void { + msgs.forEach((msgObj) => { const msgID = this.getMsgId(msgObj) // put in seen cache this.seenCache.put(msgID) this.messageCache.put(msgObj) - // @type Set - const tosend = new Set() + const tosend = new Set() msgObj.topicIDs.forEach((topic) => { const peersInTopic = this.topics.get(topic) if (!peersInTopic) { @@ -497,14 +522,14 @@ class GossipSub extends BasicPubsub { meshPeers = peers this.fanout.set(topic, peers) } else { - meshPeers = [] + meshPeers = new Set() } } // Store the latest publishing time this.lastpub.set(topic, this._now()) } - meshPeers.forEach((peer) => { + meshPeers!.forEach((peer) => { tosend.add(peer) }) }) @@ -513,7 +538,7 @@ class GossipSub extends BasicPubsub { if (peer.id.toB58String() === msgObj.from) { return } - this._sendRpc(peer, { msgs: [msgObj] }) + this._sendRpc(peer, createGossipRpc([utils.normalizeOutRpcMessage(msgObj)])) }) }) } @@ -524,7 +549,7 @@ class GossipSub extends BasicPubsub { * @param {String} topic * @returns {void} */ - _sendGraft (peer, topic) { + _sendGraft (peer: Peer, topic: string): void { const graft = [{ topicID: topic }] @@ -539,7 +564,7 @@ class GossipSub extends BasicPubsub { * @param {String} topic * @returns {void} */ - _sendPrune (peer, topic) { + _sendPrune (peer: Peer, topic: string): void { const prune = [{ topicID: topic }] @@ -548,7 +573,7 @@ class GossipSub extends BasicPubsub { this._sendRpc(peer, out) } - _sendRpc (peer, outRpc) { + _sendRpc (peer: Peer, outRpc: RPC): void { if (!peer || !peer.isWritable) { return } @@ -567,26 +592,31 @@ class GossipSub extends BasicPubsub { this.gossip.delete(peer) } - peer.write(RPC.encode(outRpc)) + peer.write(RPCCodec.encode(outRpc)) } - _piggybackControl (peer, outRpc, ctrl) { + _piggybackControl (peer: Peer, outRpc: RPC, ctrl: ControlMessage): void { const tograft = (ctrl.graft || []) - .filter(({ topicID }) => (this.mesh.get(topicID) || new Set()).has(peer)) + .filter(({ topicID }) => (topicID && this.mesh.get(topicID) || new Set()).has(peer)) const toprune = (ctrl.prune || []) - .filter(({ topicID }) => !(this.mesh.get(topicID) || new Set()).has(peer)) + .filter(({ topicID }) => !(topicID && this.mesh.get(topicID) || new Set()).has(peer)) if (!tograft.length && !toprune.length) { return } - outRpc.control = outRpc.control || {} - outRpc.control.graft = (outRpc.control.graft || []).concat(tograft) - outRpc.control.prune = (outRpc.control.prune || []).concat(toprune) + if (outRpc.control) { + outRpc.control.graft = outRpc.control.graft.concat(tograft) + outRpc.control.prune = outRpc.control.prune.concat(toprune) + } else { + outRpc.control = { ihave: [], iwant: [], graft: tograft, prune: toprune } + } } - _piggybackGossip (peer, outRpc, ihave) { - outRpc.control = outRpc.control || {} + _piggybackGossip (peer: Peer, outRpc: RPC, ihave: ControlIHave[]): void { + if (!outRpc.control) { + outRpc.control = { ihave: [], iwant: [], graft: [], prune: [] } + } outRpc.control.ihave = ihave } @@ -595,10 +625,10 @@ class GossipSub extends BasicPubsub { * @param {Map>} tograft * @param {Map>} toprune */ - _sendGraftPrune (tograft, toprune) { + _sendGraftPrune (tograft: Map, toprune: Map): void { for (const [p, topics] of tograft) { const graft = topics.map((topicID) => ({ topicID })) - let prune = null + let prune: ControlPrune[] = [] // If a peer also has prunes, process them now const pruneMsg = toprune.get(p) if (pruneMsg) { @@ -622,7 +652,7 @@ class GossipSub extends BasicPubsub { * @param {Set} peers - peers to exclude * @returns {void} */ - _emitGossip (topic, peers) { + _emitGossip (topic: string, peers: Set): void { const messageIDs = this.messageCache.getGossipIDs(topic) if (!messageIDs.length) { return @@ -643,7 +673,7 @@ class GossipSub extends BasicPubsub { /** * Flush gossip and control messages */ - _flush () { + _flush (): void { // send gossip first, which will also piggyback control for (const [peer, ihave] of this.gossip.entries()) { this.gossip.delete(peer) @@ -661,10 +691,10 @@ class GossipSub extends BasicPubsub { /** * Adds new IHAVE messages to pending gossip * @param {Peer} peer - * @param {Array} controlIHaveMsgs + * @param {Array} controlIHaveMsgs * @returns {void} */ - _pushGossip (peer, controlIHaveMsgs) { + _pushGossip (peer: Peer, controlIHaveMsgs: ControlIHave): void { this.log('Add gossip to %s', peer.id.toB58String()) const gossip = this.gossip.get(peer) || [] this.gossip.set(peer, gossip.concat(controlIHaveMsgs)) @@ -674,7 +704,7 @@ class GossipSub extends BasicPubsub { * Returns the current time in milliseconds * @returns {number} */ - _now () { + _now (): number { return Date.now() } } diff --git a/ts/message/index.ts b/ts/message/index.ts new file mode 100644 index 00000000..d29bd3de --- /dev/null +++ b/ts/message/index.ts @@ -0,0 +1,67 @@ +import rpcProtoStr from './rpc.proto' +import protons = require('protons') + +const rpcProto = protons(rpcProtoStr) + +export interface SubOpts { + subscribe?: boolean + topicID?: string +} + +export interface Message { + from?: Buffer + data?: Buffer + seqno?: Buffer + topicIDs: string[] + signature?: Buffer + key?: Buffer +} + +/** + * Same as Message, but `from` is an optional string + */ +export interface InMessage { + from?: string + data?: Buffer + seqno?: Buffer + topicIDs: string[] + signature?: Buffer + key?: Buffer +} + +export interface ControlIHave { + topicID?: string + messageIDs: string[] +} + +export interface ControlIWant { + messageIDs: string[] +} + +export interface ControlGraft { + topicID?: string +} + +export interface ControlPrune { + topicID?: string +} + +export interface ControlMessage { + ihave: ControlIHave[] + iwant: ControlIWant[] + graft: ControlGraft[] + prune: ControlPrune[] +} + +export interface RPC { + subscriptions: SubOpts[] + msgs: Message[] + control?: ControlMessage +} + +interface ProtoCodec { + encode(obj: T): Buffer + decode(buf: Buffer): T +} + +export const RPCCodec = rpcProto.RPC as ProtoCodec diff --git a/src/message/rpc.proto.js b/ts/message/rpc.proto.ts similarity index 96% rename from src/message/rpc.proto.js rename to ts/message/rpc.proto.ts index 636ff1a7..89fecf6c 100644 --- a/src/message/rpc.proto.js +++ b/ts/message/rpc.proto.ts @@ -1,5 +1,4 @@ -'use strict' -module.exports = ` +export default ` message RPC { repeated SubOpts subscriptions = 1; repeated Message msgs = 2; diff --git a/src/messageCache.js b/ts/messageCache.ts similarity index 73% rename from src/messageCache.js rename to ts/messageCache.ts index 27d16265..debd4218 100644 --- a/src/messageCache.js +++ b/ts/messageCache.ts @@ -1,19 +1,16 @@ -'use strict' +import { InMessage } from './message' -class CacheEntry { - /** - * @param {String} msgID - * @param {Array} topics - * - * @constructor - */ - constructor (msgID, topics) { - this.msgID = msgID - this.topics = topics - } +export interface CacheEntry { + msgID: string + topics: string[] } -class MessageCache { +export class MessageCache { + msgs: Map + history: CacheEntry[][] + gossip: number + msgIdFn: (msg: InMessage) => string + /** * @param {Number} gossip * @param {Number} history @@ -21,7 +18,7 @@ class MessageCache { * * @constructor */ - constructor (gossip, history, msgIdFn) { + constructor (gossip: number, history: number, msgIdFn: (msg: InMessage) => string) { /** * @type {Map} */ @@ -52,10 +49,10 @@ class MessageCache { * @param {RPC.Message} msg * @returns {void} */ - put (msg) { + put (msg: InMessage): void { const msgID = this.getMsgId(msg) this.msgs.set(msgID, msg) - this.history[0].push(new CacheEntry(msgID, msg.topicIDs)) + this.history[0].push({ msgID, topics: msg.topicIDs }) } /** @@ -63,7 +60,7 @@ class MessageCache { * @param {RPC.Message} msg * @returns {string} */ - getMsgId (msg) { + getMsgId (msg: InMessage): string { return this.msgIdFn(msg) } @@ -71,9 +68,9 @@ class MessageCache { * Retrieves a message from the cache by its ID, if it is still present * * @param {String} msgID - * @returns {RPC.Message} + * @returns {Message} */ - get (msgID) { + get (msgID: string): InMessage | undefined { return this.msgs.get(msgID) } @@ -84,8 +81,8 @@ class MessageCache { * * @returns {Array} */ - getGossipIDs (topic) { - const msgIDs = [] + getGossipIDs (topic: string): string[] { + const msgIDs: string[] = [] for (let i = 0; i < this.gossip; i++) { this.history[i].forEach((entry) => { for (const t of entry.topics) { @@ -105,7 +102,7 @@ class MessageCache { * * @returns {void} */ - shift () { + shift (): void { const last = this.history[this.history.length - 1] last.forEach((entry) => { this.msgs.delete(entry.msgID) @@ -115,8 +112,3 @@ class MessageCache { this.history.unshift([]) } } - -module.exports = { - CacheEntry, - MessageCache -} diff --git a/ts/peer.ts b/ts/peer.ts new file mode 100644 index 00000000..e925e009 --- /dev/null +++ b/ts/peer.ts @@ -0,0 +1,29 @@ +import PeerId from 'peer-id' +import { Pushable } from 'it-pushable' +import { Message, SubOpts } from './message' + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface Connection {} + +export interface Peer { + id: PeerId + protocols: string[] + conn: Connection + topics: Set + stream: Pushable + readonly isConnected: boolean + readonly isWritable: boolean + write (buf: Buffer): void + attachConnection (conn: Connection) + sendSubscriptions (topics: string[]): void + sendUnsubscriptions (topics: string[]): void + sendMessages (msgs: Message[]): void + updateSubscriptions (subOpts: SubOpts[]): void + close (): void +} + +export interface Registrar { + handle (): void + register (): void + unregister (): void +} diff --git a/src/pubsub.js b/ts/pubsub.js similarity index 97% rename from src/pubsub.js rename to ts/pubsub.js index 29de5803..6fb0d518 100644 --- a/src/pubsub.js +++ b/ts/pubsub.js @@ -11,13 +11,13 @@ const pMap = require('p-map') const Pubsub = require('libp2p-pubsub') const { utils } = require('libp2p-pubsub') -const { RPC } = require('./message') +const { RPCCodec } = require('./message') class BasicPubSub extends Pubsub { /** * @param {Object} props * @param {String} props.debugName log namespace - * @param {string} props.multicodec protocol identificer to connect + * @param {string[]} props.multicodecs protocol identifiers to connect * @param {PeerId} props.peerId peer's peerId * @param {Object} props.registrar registrar for libp2p protocols * @param {function} props.registrar.handle @@ -64,7 +64,7 @@ class BasicPubSub extends Pubsub { /** * Topic validator function - * @typedef {function(topic: string, peer: Peer, message: RPC): boolean} validator + * @typedef {function(string, Peer, RPC): boolean} validator */ /** @@ -133,7 +133,7 @@ class BasicPubSub extends Pubsub { * @returns {RPC} */ _decodeRpc (buf) { - return RPC.decode(buf) + return RPCCodec.decode(buf) } /** @@ -450,10 +450,10 @@ class BasicPubSub extends Pubsub { * Publish messages * * Note: this function assumes all messages are well-formed RPC objects - * @param {Array} rpcs + * @param {Array} msgs * @returns {void} */ - _publish (rpcs) { + _publish (msgs) { throw errcode(new Error('_publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } } diff --git a/src/utils/createGossipRpc.js b/ts/utils/createGossipRpc.ts similarity index 66% rename from src/utils/createGossipRpc.js rename to ts/utils/createGossipRpc.ts index 9324d003..b628fbca 100644 --- a/src/utils/createGossipRpc.js +++ b/ts/utils/createGossipRpc.ts @@ -1,12 +1,14 @@ 'use strict' +import { RPC, Message, ControlMessage } from '../message' + /** * Create a gossipsub RPC object * @param {Array} msgs * @param {Partial} control * @returns {RPC} */ -exports.createGossipRpc = (msgs = [], control = {}) => { +export function createGossipRpc (msgs: Message[] = [], control: Partial = {}): RPC { return { subscriptions: [], msgs: msgs, diff --git a/ts/utils/index.ts b/ts/utils/index.ts new file mode 100644 index 00000000..3b1d6d32 --- /dev/null +++ b/ts/utils/index.ts @@ -0,0 +1,2 @@ +export * from './createGossipRpc' +export * from './shuffle' diff --git a/src/utils/shuffle.js b/ts/utils/shuffle.ts similarity index 90% rename from src/utils/shuffle.js rename to ts/utils/shuffle.ts index 055eac27..ed323050 100644 --- a/src/utils/shuffle.js +++ b/ts/utils/shuffle.ts @@ -8,7 +8,7 @@ * @param {Array} arr * @returns {Array} */ -exports.shuffle = (arr) => { +export function shuffle (arr: T[]): T[] { if (arr.length <= 1) { return arr } diff --git a/tsconfig.json b/tsconfig.json index b8d04043..b1daefcb 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,30 +1,22 @@ - { "compilerOptions": { "module": "commonjs", "lib": [ "es6" ], - "target": "ES5", + "target": "es6", "noImplicitAny": false, "noImplicitThis": true, "strictFunctionTypes": true, "strictNullChecks": true, "esModuleInterop": true, - "resolveJsonModule": true, "allowJs": true, - "checkJs": true, - "baseUrl": ".", - }, + "outDir": "./src", "types": [ - "node", - "mocha", - "chai" + "node" ], - "noEmit": true, - "forceConsistentCasingInFileNames": true }, - "files": [ - "./src/index.d.ts" + "include": [ + "./ts" ] -} \ No newline at end of file +} From 414ed5170e60f82d97a6efc2d116f2ecac99cb1e Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 29 May 2020 11:01:03 -0500 Subject: [PATCH 2/9] chore: GossipSub => Gossipsub --- test/2-nodes.spec.js | 2 +- test/gossip-incoming.spec.js | 2 +- test/gossip.js | 4 ++-- test/heartbeat.spec.js | 4 ++-- test/mesh.spec.js | 6 +++--- test/multiple-nodes.spec.js | 2 +- test/utils/index.js | 4 ++-- ts/constants.ts | 18 +++++++++--------- ts/getGossipPeers.ts | 6 +++--- ts/heartbeat.ts | 24 ++++++++++++------------ ts/index.ts | 18 +++++++++--------- 11 files changed, 45 insertions(+), 45 deletions(-) diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 5625864f..666b1d87 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -7,7 +7,7 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect -const { GossipSubID: multicodec } = require('../src/constants') +const { GossipsubID: multicodec } = require('../src/constants') const { createGossipsub, diff --git a/test/gossip-incoming.spec.js b/test/gossip-incoming.spec.js index 209e4093..8d4cf357 100644 --- a/test/gossip-incoming.spec.js +++ b/test/gossip-incoming.spec.js @@ -7,7 +7,7 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect -const { GossipSubID: multicodec } = require('../src/constants') +const { GossipsubID: multicodec } = require('../src/constants') const { createGossipsubConnectedNodes } = require('./utils') const shouldNotHappen = (msg) => expect.fail() diff --git a/test/gossip.js b/test/gossip.js index 3c1f8ca2..3a6f0aec 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -5,7 +5,7 @@ const { Buffer } = require('buffer') const { expect } = require('chai') const sinon = require('sinon') -const { GossipSubID: multicodec, GossipSubDhi } = require('../src/constants') +const { GossipsubID: multicodec, GossipsubDhi } = require('../src/constants') const { first, createGossipsubNodes, @@ -20,7 +20,7 @@ describe('gossip', () => { ({ nodes, registrarRecords - } = await createGossipsubNodes(GossipSubDhi + 2, true)) + } = await createGossipsubNodes(GossipsubDhi + 2, true)) }) afterEach(() => Promise.all(nodes.map((n) => n.stop()))) diff --git a/test/heartbeat.spec.js b/test/heartbeat.spec.js index f89ffb51..e590b917 100644 --- a/test/heartbeat.spec.js +++ b/test/heartbeat.spec.js @@ -4,7 +4,7 @@ const { expect } = require('chai') const Gossipsub = require('../src') -const { GossipSubHeartbeatInterval } = require('../src/constants') +const { GossipsubHeartbeatInterval } = require('../src/constants') const { createPeerId, mockRegistrar } = require('./utils') describe('heartbeat', () => { @@ -25,6 +25,6 @@ describe('heartbeat', () => { await new Promise((resolve) => gossipsub.once('gossipsub:heartbeat', resolve)) const t2 = Date.now() const safeDelta = 100 // ms - expect(t2 - t1).to.be.lt(GossipSubHeartbeatInterval + safeDelta) + expect(t2 - t1).to.be.lt(GossipsubHeartbeatInterval + safeDelta) }) }) diff --git a/test/mesh.spec.js b/test/mesh.spec.js index 4ac66da3..b3de12fe 100644 --- a/test/mesh.spec.js +++ b/test/mesh.spec.js @@ -3,7 +3,7 @@ const { expect } = require('chai') -const { GossipSubDhi, GossipSubID: multicodec } = require('../src/constants') +const { GossipsubDhi, GossipsubID: multicodec } = require('../src/constants') const { createGossipsubNodes, ConnectionPair @@ -17,7 +17,7 @@ describe('mesh overlay', () => { ({ nodes, registrarRecords - } = await createGossipsubNodes(GossipSubDhi + 2, true)) + } = await createGossipsubNodes(GossipsubDhi + 2, true)) }) afterEach(() => Promise.all(nodes.map((n) => n.stop()))) @@ -76,6 +76,6 @@ describe('mesh overlay', () => { await new Promise((resolve) => setTimeout(resolve, 500)) // await mesh rebalancing await new Promise((resolve) => node0.once('gossipsub:heartbeat', resolve)) - expect(node0.mesh.get(topic).size).to.be.lte(GossipSubDhi) + expect(node0.mesh.get(topic).size).to.be.lte(GossipsubDhi) }) }) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index bdbdc342..7159eb09 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -7,7 +7,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const promisify = require('promisify-es6') -const { GossipSubID: multicodec } = require('../src/constants') +const { GossipsubID: multicodec } = require('../src/constants') const { createGossipsubNodes, expectSet, diff --git a/test/utils/index.js b/test/utils/index.js index 018ce970..f6881941 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -8,7 +8,7 @@ const pTimes = require('p-times') const FloodSub = require('libp2p-floodsub') const PeerId = require('peer-id') -const GossipSub = require('../../src') +const Gossipsub = require('../../src') exports.first = (map) => map.values().next().value @@ -26,7 +26,7 @@ exports.createPeerId = createPeerId const createGossipsub = async (registrar, shouldStart = false, options) => { const peerId = await createPeerId() - const gs = new GossipSub(peerId, registrar, options) + const gs = new Gossipsub(peerId, registrar, options) if (shouldStart) { await gs.start() diff --git a/ts/constants.ts b/ts/constants.ts index 93c12e11..7d68de34 100644 --- a/ts/constants.ts +++ b/ts/constants.ts @@ -5,20 +5,20 @@ const minute = exports.minute = 60 * second // Protocol identifiers export const FloodSubID = '/floodsub/1.0.0' -export const GossipSubID = '/meshsub/1.0.0' +export const GossipsubID = '/meshsub/1.0.0' // Overlay parameters -export const GossipSubD = 6 -export const GossipSubDlo = 4 -export const GossipSubDhi = 12 +export const GossipsubD = 6 +export const GossipsubDlo = 4 +export const GossipsubDhi = 12 // Gossip parameters -export const GossipSubHistoryLength = 5 -export const GossipSubHistoryGossip = 3 +export const GossipsubHistoryLength = 5 +export const GossipsubHistoryGossip = 3 // Heartbeat interval -export const GossipSubHeartbeatInitialDelay = 100 / second -export const GossipSubHeartbeatInterval = second +export const GossipsubHeartbeatInitialDelay = 100 / second +export const GossipsubHeartbeatInterval = second // Fanout ttl -export const GossipSubFanoutTTL = minute +export const GossipsubFanoutTTL = minute diff --git a/ts/getGossipPeers.ts b/ts/getGossipPeers.ts index a40686bc..262d17ef 100644 --- a/ts/getGossipPeers.ts +++ b/ts/getGossipPeers.ts @@ -1,6 +1,6 @@ import * as constants from './constants' import { shuffle } from './utils' -import { GossipSub } from './index' +import { Gossipsub } from './index' import { Peer } from './peer' /** @@ -12,7 +12,7 @@ import { Peer } from './peer' * @returns {Set} * */ -export function getGossipPeers (router: GossipSub, topic: string, count: number): Set { +export function getGossipPeers (router: Gossipsub, topic: string, count: number): Set { const peersInTopic = router.topics.get(topic) if (!peersInTopic) { return new Set() @@ -21,7 +21,7 @@ export function getGossipPeers (router: GossipSub, topic: string, count: number) // Adds all peers using our protocol let peers: Peer[] = [] peersInTopic.forEach((peer) => { - if (peer.protocols.includes(constants.GossipSubID)) { + if (peer.protocols.includes(constants.GossipsubID)) { peers.push(peer) } }) diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index c97fcbd6..cb4c35e0 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -3,12 +3,12 @@ import * as constants from './constants' import { getGossipPeers } from './getGossipPeers' import { shuffle } from './utils' -import { GossipSub } from './index' +import { Gossipsub } from './index' import { Peer } from './peer' import errcode = require('err-code') export class Heartbeat { - gossipsub: GossipSub + gossipsub: Gossipsub _heartbeatTimer: { _intervalId: NodeJS.Timeout | undefined runPeriodically (fn: () => void, period: number): void @@ -19,7 +19,7 @@ export class Heartbeat { * @param {Object} gossipsub * @constructor */ - constructor (gossipsub: GossipSub) { + constructor (gossipsub: Gossipsub) { this.gossipsub = gossipsub } @@ -34,8 +34,8 @@ export class Heartbeat { const timeout = setTimeout(() => { heartbeat() - this._heartbeatTimer!.runPeriodically(heartbeat, constants.GossipSubHeartbeatInterval) - }, constants.GossipSubHeartbeatInitialDelay) + this._heartbeatTimer!.runPeriodically(heartbeat, constants.GossipsubHeartbeatInterval) + }, constants.GossipsubHeartbeatInitialDelay) this._heartbeatTimer = { _intervalId: undefined, @@ -84,8 +84,8 @@ export class Heartbeat { // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { // do we have enough peers? - if (peers.size < constants.GossipSubDlo) { - const ineed = constants.GossipSubD - peers.size + if (peers.size < constants.GossipsubDlo) { + const ineed = constants.GossipsubD - peers.size const peersSet = getGossipPeers(this.gossipsub, topic, ineed) peersSet.forEach((peer) => { // add topic peers not already in mesh @@ -105,8 +105,8 @@ export class Heartbeat { } // do we have to many peers? - if (peers.size > constants.GossipSubDhi) { - const idontneed = peers.size - constants.GossipSubD + if (peers.size > constants.GossipsubDhi) { + const idontneed = peers.size - constants.GossipsubD let peersArray = Array.from(peers) peersArray = shuffle(peersArray) peersArray = peersArray.slice(0, idontneed) @@ -129,7 +129,7 @@ export class Heartbeat { // expire fanout for topics we haven't published to in a while const now = this.gossipsub._now() this.gossipsub.lastpub.forEach((lastpb, topic) => { - if ((lastpb + constants.GossipSubFanoutTTL) < now) { + if ((lastpb + constants.GossipsubFanoutTTL) < now) { this.gossipsub.fanout.delete(topic) this.gossipsub.lastpub.delete(topic) } @@ -145,8 +145,8 @@ export class Heartbeat { }) // do we need more peers? - if (peers.size < constants.GossipSubD) { - const ineed = constants.GossipSubD - peers.size + if (peers.size < constants.GossipsubD) { + const ineed = constants.GossipsubD - peers.size const peersSet = getGossipPeers(this.gossipsub, topic, ineed) peersSet.forEach((peer) => { if (!peers.has(peer)) { diff --git a/ts/index.ts b/ts/index.ts index 26c5b0a6..f8626557 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -22,7 +22,7 @@ interface GossipOptions { messageCache: MessageCache } -export class GossipSub extends BasicPubsub { +export class Gossipsub extends BasicPubsub { mesh: Map> fanout: Map> lastPub: Map @@ -45,7 +45,7 @@ export class GossipSub extends BasicPubsub { * @constructor */ constructor (peerId: PeerId, registrar: Registrar, options: Partial = {}) { - const multicodecs = [constants.GossipSubID] + const multicodecs = [constants.GossipsubID] const _options = { gossipIncoming: true, fallbackToFloodsub: true, @@ -116,7 +116,7 @@ export class GossipSub extends BasicPubsub { * A message cache that contains the messages for last few hearbeat ticks * */ - this.messageCache = options.messageCache || new MessageCache(constants.GossipSubHistoryGossip, constants.GossipSubHistoryLength, this._msgIdFn) + this.messageCache = options.messageCache || new MessageCache(constants.GossipsubHistoryGossip, constants.GossipsubHistoryLength, this._msgIdFn) /** * A heartbeat timer that maintains the mesh @@ -422,7 +422,7 @@ export class GossipSub extends BasicPubsub { */ join (topics: string[] | string): void { if (!this.started) { - throw new Error('GossipSub has not started') + throw new Error('Gossipsub has not started') } topics = utils.ensureArray(topics) @@ -436,7 +436,7 @@ export class GossipSub extends BasicPubsub { this.fanout.delete(topic) this.lastpub.delete(topic) } else { - const peers = getGossipPeers(this, topic, constants.GossipSubD) + const peers = getGossipPeers(this, topic, constants.GossipsubD) this.mesh.set(topic, peers) } this.mesh.get(topic)!.forEach((peer) => { @@ -516,7 +516,7 @@ export class GossipSub extends BasicPubsub { meshPeers = this.fanout.get(topic) if (!meshPeers) { // If we are not in the fanout, then pick any peers in topic - const peers = getGossipPeers(this, topic, constants.GossipSubD) + const peers = getGossipPeers(this, topic, constants.GossipsubD) if (peers.size > 0) { meshPeers = peers @@ -658,7 +658,7 @@ export class GossipSub extends BasicPubsub { return } - const gossipSubPeers = getGossipPeers(this, topic, constants.GossipSubD) + const gossipSubPeers = getGossipPeers(this, topic, constants.GossipsubD) gossipSubPeers.forEach((peer) => { // skip mesh peers if (!peers.has(peer)) { @@ -709,5 +709,5 @@ export class GossipSub extends BasicPubsub { } } -module.exports = GossipSub -module.exports.multicodec = constants.GossipSubID +module.exports = Gossipsub +module.exports.multicodec = constants.GossipsubID From a1a78cbf41a69825f1d520fa033a15af4bd2bc0e Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 29 May 2020 12:04:26 -0500 Subject: [PATCH 3/9] chore: add build step to travis --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 9550f446..44668840 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: node_js cache: npm stages: + - build - check - test - cov @@ -20,6 +21,10 @@ jobs: - os: windows cache: false + - stage: build + script: + - npm run prebuild + - stage: check script: - npx aegir dep-check From 7a1076941db9d2eebe4d478cb1a1ce52548d6569 Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 29 May 2020 12:14:39 -0500 Subject: [PATCH 4/9] chore: add prebuild to each travis step --- .travis.yml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 44668840..b8019ffc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: node_js cache: npm stages: - - build - check - test - cov @@ -21,12 +20,9 @@ jobs: - os: windows cache: false - - stage: build - script: - - npm run prebuild - - stage: check script: + - npm run prebuild - npx aegir dep-check - npm run lint @@ -35,6 +31,7 @@ jobs: addons: chrome: stable script: + - npm run prebuild - npx aegir test -t browser - npx aegir test -t webworker @@ -43,6 +40,7 @@ jobs: addons: firefox: latest script: + - npm run prebuild - npx aegir test -t browser -- --browsers FirefoxHeadless - npx aegir test -t webworker -- --browsers FirefoxHeadless From ba645759dbc96d2d0c9d870f87e78ddc0c6212fa Mon Sep 17 00:00:00 2001 From: Cayman Date: Fri, 29 May 2020 12:31:23 -0500 Subject: [PATCH 5/9] chore: add pre_script to travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index b8019ffc..bf0bf48a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ os: - linux - osx +before_script: npm run prebuild script: npx nyc -s npm run test:node -- --bail after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov From 8610696c908097987b8197e097b9f3d198388b26 Mon Sep 17 00:00:00 2001 From: Cayman Date: Sun, 31 May 2020 21:57:03 -0500 Subject: [PATCH 6/9] chore: address PR comments --- .eslintrc.js | 3 --- package.json | 2 +- ts/getGossipPeers.ts | 2 +- ts/heartbeat.ts | 9 +++++---- ts/index.ts | 13 +++++++++---- ts/message/index.ts | 2 ++ ts/peer.ts | 2 +- tsconfig.json | 3 ++- 8 files changed, 21 insertions(+), 15 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index d2a1a04e..daf79db8 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -13,8 +13,6 @@ module.exports = { '@typescript-eslint' // Let's us override rules below. ], 'rules': { -// '@typescript-eslint/no-use-before-define': 'off', // Allows us to hoist variables and functions which I am a fan of, functions not variables that is. -// '@typescript-eslint/no-explicit-any': 'off', // Too strict for my case, sometimes I need an any type '@typescript-eslint/member-delimiter-style': ['error', { // Prevents us from using any delimiter for interface properties. 'multiline': { 'delimiter': 'none', @@ -26,7 +24,6 @@ module.exports = { } }], '@typescript-eslint/indent': 'off', // This is the job of StandardJS, they are competing rules so we turn off the Typescript one. - 'no-unused-vars': 'off', // On the fence about using this one, sometimes we import a package that is never used directly. 'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords. '@typescript-eslint/no-non-null-assertion': 'off', 'no-mixed-operators': 'off' diff --git a/package.json b/package.json index f8b86cdb..42857c54 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "it-pipe": "^1.0.1", "libp2p-pubsub": "^0.5.0", "p-map": "^4.0.0", - "peer-id": "~0.13.3", + "peer-id": "~0.13.12", "protons": "^1.0.1", "time-cache": "^0.3.0" }, diff --git a/ts/getGossipPeers.ts b/ts/getGossipPeers.ts index 262d17ef..fccba96d 100644 --- a/ts/getGossipPeers.ts +++ b/ts/getGossipPeers.ts @@ -1,7 +1,7 @@ import * as constants from './constants' import { shuffle } from './utils' -import { Gossipsub } from './index' import { Peer } from './peer' +import Gossipsub = require('./index') /** * Given a topic, returns up to count peers subscribed to that topic diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index cb4c35e0..38acbaf8 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -1,10 +1,10 @@ -'use strict' - import * as constants from './constants' import { getGossipPeers } from './getGossipPeers' import { shuffle } from './utils' -import { Gossipsub } from './index' import { Peer } from './peer' +import Gossipsub = require('./index') +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore import errcode = require('err-code') export class Heartbeat { @@ -138,8 +138,9 @@ export class Heartbeat { // maintain our fanout for topics we are publishing but we have not joined this.gossipsub.fanout.forEach((peers, topic) => { // checks whether our peers are still in the topic + const topicGossip = this.gossipsub.topics.get(topic) peers.forEach((peer) => { - if (this.gossipsub.topics.has(peer)) { + if (topicGossip!.has(peer)) { peers.delete(peer) } }) diff --git a/ts/index.ts b/ts/index.ts index f8626557..901d5ae2 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,3 +1,5 @@ +/* eslint-disable @typescript-eslint/ban-ts-comment */ +// @ts-ignore import { utils } from 'libp2p-pubsub' import { MessageCache } from './messageCache' import { @@ -10,6 +12,7 @@ import { Heartbeat } from './heartbeat' import { getGossipPeers } from './getGossipPeers' import { createGossipRpc } from './utils' import { Peer, Registrar } from './peer' +// @ts-ignore import TimeCache = require('time-cache') import PeerId = require('peer-id') import BasicPubsub = require('./pubsub') @@ -22,10 +25,12 @@ interface GossipOptions { messageCache: MessageCache } -export class Gossipsub extends BasicPubsub { +class Gossipsub extends BasicPubsub { + peers: Map + topics: Map> mesh: Map> fanout: Map> - lastPub: Map + lastpub: Map gossip: Map control: Map _options: GossipOptions @@ -335,7 +340,7 @@ export class Gossipsub extends BasicPubsub { return } - const buildCtrlPruneMsg = (topic) => { + const buildCtrlPruneMsg = (topic: string) => { return { topicID: topic } @@ -709,5 +714,5 @@ export class Gossipsub extends BasicPubsub { } } -module.exports = Gossipsub +export = Gossipsub module.exports.multicodec = constants.GossipsubID diff --git a/ts/message/index.ts b/ts/message/index.ts index d29bd3de..975078ba 100644 --- a/ts/message/index.ts +++ b/ts/message/index.ts @@ -1,4 +1,6 @@ import rpcProtoStr from './rpc.proto' +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore import protons = require('protons') const rpcProto = protons(rpcProtoStr) diff --git a/ts/peer.ts b/ts/peer.ts index e925e009..85791aef 100644 --- a/ts/peer.ts +++ b/ts/peer.ts @@ -14,7 +14,7 @@ export interface Peer { readonly isConnected: boolean readonly isWritable: boolean write (buf: Buffer): void - attachConnection (conn: Connection) + attachConnection (conn: Connection): void sendSubscriptions (topics: string[]): void sendUnsubscriptions (topics: string[]): void sendMessages (msgs: Message[]): void diff --git a/tsconfig.json b/tsconfig.json index b1daefcb..10bb4be5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,13 +5,14 @@ "es6" ], "target": "es6", - "noImplicitAny": false, + "noImplicitAny": true, "noImplicitThis": true, "strictFunctionTypes": true, "strictNullChecks": true, "esModuleInterop": true, "allowJs": true, "outDir": "./src", + "declaration": true, "types": [ "node" ], From 12c40d1edef0ad3fa44fedee04cad31aca5777e6 Mon Sep 17 00:00:00 2001 From: Cayman Date: Mon, 1 Jun 2020 15:32:34 -0500 Subject: [PATCH 7/9] chore: add more comments to RPC interfaces --- ts/message/index.ts | 62 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/ts/message/index.ts b/ts/message/index.ts index 975078ba..8f08dd4c 100644 --- a/ts/message/index.ts +++ b/ts/message/index.ts @@ -5,17 +5,57 @@ import protons = require('protons') const rpcProto = protons(rpcProtoStr) +// These interfaces follow the protobuf definitions in ./rpc.proto.js + +/** + * Subscription Options + */ export interface SubOpts { + /** + * Whether to subscribe of unsubscribe + * true for subscribe, false for unsubscribe + */ subscribe?: boolean + /** + * Topic ID + */ topicID?: string } +/** + * Pubsub message + */ export interface Message { + /** + * Author of the message + * + * Note: This is not necessarily the peer who sent the RPC this message is contained in + */ from?: Buffer + /** + * Opaque blob of data + */ data?: Buffer + /** + * 64-bit big-endian uint + * + * No two messages on a topic from the same peer should have the same seqno value + */ seqno?: Buffer + /** + * Set of topics being published to + */ topicIDs: string[] + /** + * Signature of the message + * + * The signature is computed over the marshalled message protobuf excluding the key field + * The protobuf bloc is prefixed by the string `libp2p-pubsub:` before signing + */ signature?: Buffer + /** + * Signing key + */ key?: Buffer } @@ -31,23 +71,42 @@ export interface InMessage { key?: Buffer } +/** + * IHAVE control message + * Sent to notify a peer with a list of messages that were recently seen by the local router in the included topic id + */ export interface ControlIHave { topicID?: string messageIDs: string[] } +/** + * IWANT control message + * Sent to request the full content of messages whose IDs were announced by a remote peer in an IHAVE message + */ export interface ControlIWant { messageIDs: string[] } +/** + * GRAFT control message + * Sent to notify a peer that it has been added to the local router's mesh for the included topic id + */ export interface ControlGraft { topicID?: string } +/** + * PRUNE control message + * Sent to notify a peer that it has been removed from the local router's mesh for the included topic id + */ export interface ControlPrune { topicID?: string } +/** + * Gossip control message container + */ export interface ControlMessage { ihave: ControlIHave[] iwant: ControlIWant[] @@ -55,6 +114,9 @@ export interface ControlMessage { prune: ControlPrune[] } +/** + * Gossipsub RPC message + */ export interface RPC { subscriptions: SubOpts[] msgs: Message[] From 99bb48fcfe5b268d80140d81a90a65b25dda4493 Mon Sep 17 00:00:00 2001 From: Cayman Date: Mon, 1 Jun 2020 15:43:55 -0500 Subject: [PATCH 8/9] chore: remove duplication in RPC interfaces --- ts/message/index.ts | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/ts/message/index.ts b/ts/message/index.ts index 8f08dd4c..fb984c12 100644 --- a/ts/message/index.ts +++ b/ts/message/index.ts @@ -27,7 +27,7 @@ export interface SubOpts { */ export interface Message { /** - * Author of the message + * Peer id of the author of the message * * Note: This is not necessarily the peer who sent the RPC this message is contained in */ @@ -59,17 +59,19 @@ export interface Message { key?: Buffer } +type Overwrite = { + [P in Exclude]: T1[P] +} & T2; + /** - * Same as Message, but `from` is an optional string + * Pubsub message, with `from` as a base58-encoded string */ -export interface InMessage { +export type InMessage = Overwrite /** * IHAVE control message From 666099e8452c3179850db53bf6bbf182adb5fcd3 Mon Sep 17 00:00:00 2001 From: Cayman Date: Tue, 2 Jun 2020 10:10:55 -0500 Subject: [PATCH 9/9] chore: add pretest script --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 42857c54..cb9505f6 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "release": "aegir release", "prebuild": "tsc", "build": "aegir build", + "pretest": "tsc", "test": "aegir test", "test:node": "aegir test --target node", "test:browser": "aegir test --target browser"