Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: typedef generation & type checking #261

Merged
merged 15 commits into from
Mar 9, 2021
27 changes: 27 additions & 0 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
push:
branches:
- master
- main
- default
pull_request:
branches:
- '**'

name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [12.x]
steps:
- uses: actions/checkout@v1
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm install
- name: Typecheck
uses: gozala/[email protected]
26 changes: 21 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
"./test/utils/create-libp2p-node": false,
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"types": "dist/src/index.d.ts",
"typesVersions": {
Gozala marked this conversation as resolved.
Show resolved Hide resolved
"*": {
"src/*": [
"dist/src/*",
"dist/src/*/index"
]
}
},
"eslintConfig": {
"extends": "ipfs"
},
"files": [
"dist",
"src"
Expand All @@ -17,6 +29,7 @@
"test:browser": "aegir test -t browser -t webworker",
"test:node": "aegir test -t node",
"lint": "aegir lint",
"check": "aegir ts -p check",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
Expand All @@ -43,7 +56,7 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^28.1.0",
"aegir": "^29.1.0",
"benchmark": "^2.1.4",
"delay": "^4.3.0",
"ipfs-repo": "^7.0.0",
Expand All @@ -69,19 +82,21 @@
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^8.0.0"
"uuid": "^8.0.0",
Gozala marked this conversation as resolved.
Show resolved Hide resolved
"@types/debug": "^4.1.5",
"typescript": "^4.0.5"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"typescript": "^4.0.5"

},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^2.1.1",
"bignumber.js": "^9.0.0",
"cids": "^1.0.0",
"debug": "^4.1.0",
"debug": "^4.2.0",
"ipld-block": "^0.11.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^0.7.1",
"libp2p-interfaces": "git://github.com/libp2p/js-libp2p-interfaces.git#fix/types-of-multicodec-topology",
"moving-average": "^1.0.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.1",
Expand Down Expand Up @@ -115,6 +130,7 @@
"dmitriy ryajov <[email protected]>",
"Dmitriy Ryajov <[email protected]>",
"Bryan Stenson <[email protected]>",
"Richard Schneider <[email protected]>"
"Richard Schneider <[email protected]>",
"Irakli Gozalishvili <[email protected]>"
]
}
120 changes: 104 additions & 16 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ const TARGET_MESSAGE_SIZE = 16 * 1024
const MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024

class DecisionEngine {
/**
*
* @param {PeerId} peerId
* @param {BlockStore} blockstore
* @param {import('../network')} network
* @param {Stats} stats
* @param {Object} [opts]
* @param {number} [opts.targetMessageSize]
* @param {number} [opts.maxSizeReplaceHasWithBlock]
*/
constructor (peerId, blockstore, network, stats, opts) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
Expand All @@ -34,6 +44,7 @@ class DecisionEngine {
this._opts = this._processOpts(opts)

// A list of of ledgers by their partner id
/** @type {Map<string, Ledger>} */
this.ledgerMap = new Map()
this._running = false

Expand Down Expand Up @@ -112,7 +123,7 @@ class DecisionEngine {

// If there's nothing in the message, bail out
if (msg.empty) {
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerId is obtained on line 76 via following code:

 const { peerId, tasks, pendingSize } = this._requestQueue.popTasks(this._opts.targetMessageSize)

However popTasks returns an object without peerId in 2 code paths out of 3. At the same time tasksDone and messageSent below do not expect undefined.

It was not exactly clear what made most sense here, so I just conditioned those calls on peerIds presence, but it would be good to make sure all this makes sense.


// Trigger the next round of task processing
this._scheduleProcessTasks()
Expand All @@ -122,32 +133,36 @@ class DecisionEngine {

try {
// Send the message
await this.network.sendMessage(peerId, msg)
peerId && await this.network.sendMessage(peerId, msg)

// Peform sent message accounting
for (const block of blocks.values()) {
this.messageSent(peerId, block)
peerId && this.messageSent(peerId, block)
}
} catch (err) {
this._log.error(err)
}

// Free the tasks up from the request queue
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)

// Trigger the next round of task processing
this._scheduleProcessTasks()
}

/**
* @param {PeerId} peerId
* @returns {Map<string, WantListEntry>}
*/
wantlistForPeer (peerId) {
const peerIdStr = peerId.toB58String()
if (!this.ledgerMap.has(peerIdStr)) {
return new Map()
}

return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries()
const ledger = this.ledgerMap.get(peerIdStr)
return ledger ? ledger.wantlist.sortedEntries() : new Map()
}

/**
* @param {PeerId} peerId
*/
ledgerForPeer (peerId) {
const peerIdStr = peerId.toB58String()

Expand All @@ -164,12 +179,20 @@ class DecisionEngine {
}
}

/**
* @returns {PeerId[]}
*/
peers () {
return Array.from(this.ledgerMap.values()).map((l) => l.partner)
}

// Receive blocks either from an incoming message from the network, or from
// blocks being added by the client on the localhost (eg IPFS add)
/**
* Receive blocks either from an incoming message from the network, or from
* blocks being added by the client on the localhost (eg IPFS add)
*
* @param {Block[]} blocks
* @returns {void}
*/
receivedBlocks (blocks) {
if (!blocks.length) {
return
Expand Down Expand Up @@ -211,7 +234,13 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

// Handle incoming messages
/**
* Handle incoming messages
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<void>}
*/
async messageReceived (peerId, msg) {
const ledger = this._findOrCreate(peerId)

Expand Down Expand Up @@ -251,12 +280,24 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

/**
* @private
* @param {PeerId} peerId
* @param {CID[]} cids
* @returns {void}
*/
_cancelWants (peerId, cids) {
for (const c of cids) {
this._requestQueue.remove(c.toString(), peerId)
}
}

/**
* @private
* @param {PeerId} peerId
* @param {BitswapMessageEntry[]} wants
* @returns {Promise<void>}
*/
async _addWants (peerId, wants) {
// Get the size of each wanted block
const blockSizes = await this._getBlockSizes(wants.map(w => w.cid))
Expand Down Expand Up @@ -320,11 +361,21 @@ class DecisionEngine {
blockSize <= this._opts.maxSizeReplaceHasWithBlock
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, number>>}
*/
async _getBlockSizes (cids) {
const blocks = await this._getBlocks(cids)
return new Map([...blocks].map(([k, v]) => [k, v.data.length]))
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, Block>>}
*/
async _getBlocks (cids) {
const res = new Map()
await Promise.all(cids.map(async (cid) => {
Expand All @@ -347,7 +398,14 @@ class DecisionEngine {
})
}

// Clear up all accounting things after message was sent
/**
* Clear up all accounting things after message was sent
*
* @param {PeerId} peerId
* @param {Object} [block]
* @param {Uint8Array} block.data
* @param {CID} [block.cid]
*/
messageSent (peerId, block) {
const ledger = this._findOrCreate(peerId)
ledger.sentBytes(block ? block.data.length : 0)
Expand All @@ -356,15 +414,29 @@ class DecisionEngine {
}
}

/**
* @param {PeerId} peerId
* @returns {number}
*/
numBytesSentTo (peerId) {
return this._findOrCreate(peerId).accounting.bytesSent
}

/**
* @param {PeerId} peerId
* @returns {number}
*/

numBytesReceivedFrom (peerId) {
return this._findOrCreate(peerId).accounting.bytesRecv
}

peerDisconnected (peerId) {
/**
*
* @param {PeerId} _peerId
* @returns {void}
*/
peerDisconnected (_peerId) {
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
Expand All @@ -373,10 +445,16 @@ class DecisionEngine {
// in the peer request queue
}

/**
* @private
* @param {PeerId} peerId
* @returns {Ledger}
*/
_findOrCreate (peerId) {
const peerIdStr = peerId.toB58String()
if (this.ledgerMap.has(peerIdStr)) {
return this.ledgerMap.get(peerIdStr)
const ledger = this.ledgerMap.get(peerIdStr)
if (ledger) {
return ledger
}

const l = new Ledger(peerId)
Expand All @@ -399,3 +477,13 @@ class DecisionEngine {
}

module.exports = DecisionEngine

/**
* @typedef {import('../types').PeerId} PeerId
* @typedef {import('../stats')} Stats
* @typedef {import('../types').BlockData} BlockData
* @typedef {import('../types').Block} Block
* @typedef {import('../types/message/entry')} BitswapMessageEntry
* @typedef {import('../types/wantlist/entry')} WantListEntry
* @typedef {import('../types').BlockStore} BlockStore
*/
51 changes: 51 additions & 0 deletions src/decision-engine/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
export interface TaskMerger {
/**
* Given the existing tasks with the same topic, does the task add some new
* information? Used to decide whether to merge the task or ignore it.
*/
hasNewInfo (task:Task, tasksWithTopic:Task[]): boolean

/**
* Merge the information from the task into the existing pending task.
*/
merge (newTask, existingTask): void
}

export interface Task {
/**
* A name for the Task (like an id but not necessarily unique)
*/
topic: string
/**
* Priority for the Task (tasks are ordered by priority per peer).
*/
priority: number
/**
* The size of the task, e.g. the number of bytes in a block.
*/
size: number

data: TaskData
}

export interface TaskData {
/**
* The size of the block, if known (if we don't have the block this is zero)
*/
blockSize: number
/**
* Indicates if the request is for a block or for a HAVE.
*/
isWantBlock: boolean
/**
* Indicates if we have the block.
*/
haveBlock: boolean
/**
* Indicates whether to send a DONT_HAVE response if we don't have the block.
* If this is `false` and we don't have the block, we just ignore the
* want-block request (useful for discovery where we query lots of peers but
* don't want a response unless the peer has the block).
*/
sendDontHave: boolean
}
Loading