Skip to content

Commit

Permalink
Reduce bitfield footprint (#184)
Browse files Browse the repository at this point in the history
* streaming load instead of full read

* bump random-access-memory, undo skip very sparse test before merge

* change reset code

* use the memory deduplicator

* add bitfield dudup test

* reenable test

* fix index page size

* bump travis

* implement want regions
  • Loading branch information
mafintosh authored Dec 11, 2018
1 parent d31aa10 commit 547a68c
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: node_js
node_js:
- '4'
- '6'
- '8'
- '10'
12 changes: 10 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ Feed.prototype._open = function (cb) {
}

if (self._overwrite) {
state.bitfield.fill(0)
state.bitfield = []
state.key = state.secretKey = null
}

self.bitfield = bitfield(state.bitfield)
self.bitfield = bitfield(state.bitfieldPageSize, state.bitfield)
self.tree = treeIndex(self.bitfield.tree)
self.length = self.tree.blocks()
self._seq = self.length
Expand Down Expand Up @@ -350,10 +350,13 @@ Feed.prototype.download = function (range, cb) {
iterator: null,
start: range.start || 0,
end: range.end || -1,
want: 0,
linear: !!range.linear,
callback: cb
}

sel.want = toWantRange(sel.start)

this._selections.push(sel)
this._updatePeers()

Expand Down Expand Up @@ -589,6 +592,7 @@ Feed.prototype.seek = function (bytes, opts, cb) {
index: -1,
start: start,
end: end,
want: toWantRange(start),
callback: cb || noop
})

Expand Down Expand Up @@ -1392,3 +1396,7 @@ function safeBufferEquals (a, b) {
if (!b) return !a
return equals(a, b)
}

function toWantRange (i) {
return Math.floor(i / 1024 / 1024) * 1024 * 1024
}
39 changes: 29 additions & 10 deletions lib/bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var flat = require('flat-tree')
var rle = require('bitfield-rle')
var pager = require('memory-pager')
var bitfield = require('sparse-bitfield')
var allocUnsafe = require('buffer-alloc-unsafe')

var INDEX_UPDATE_MASK = [63, 207, 243, 252]
var INDEX_ITERATE_MASK = [0, 192, 240, 252]
Expand All @@ -26,14 +27,19 @@ for (var i = 0; i < 256; i++) {

module.exports = Bitfield

function Bitfield (buffer) {
if (!(this instanceof Bitfield)) return new Bitfield(buffer)
function Bitfield (pageSize, pages) {
if (!(this instanceof Bitfield)) return new Bitfield(pageSize, pages)
if (!pageSize) pageSize = 2048 + 1024 + 512

this.pages = pager(3328)
var deduplicate = allocUnsafe(pageSize)
deduplicate.fill(255)

if (buffer) {
for (var i = 0; i < buffer.length; i += 3328) {
this.pages.set(i / 3328, buffer.slice(i, i + 3328))
this.indexSize = pageSize - 2048 - 1024
this.pages = pager(pageSize, { deduplicate })

if (pages) {
for (var i = 0; i < pages.length; i++) {
this.pages.set(i, pages[i])
}
}

Expand All @@ -52,7 +58,7 @@ function Bitfield (buffer) {
})

this.index = bitfield({
pageSize: 256,
pageSize: this.indexSize,
pageOffset: 1024 + 2048,
pages: this.pages,
trackUpdates: true
Expand Down Expand Up @@ -103,8 +109,21 @@ Bitfield.prototype.total = function (start, end) {
}

// TODO: use the index to speed this up *a lot*
Bitfield.prototype.compress = function () {
return rle.encode(this.data.toBuffer())
Bitfield.prototype.compress = function (start, length) {
if (!start && !length) return rle.encode(this.data.toBuffer())

var buf = Buffer.alloc(length)
var p = start / this.data.pageSize / 8
var end = p + length / this.data.pageSize / 8
var offset = p * this.data.pageSize

for (; p < end; p++) {
var page = this.data.pages.pages[p]
if (!page || !page.buffer) continue
page.buffer.copy(buf, p * this.data.pageSize - offset, this.data.pageOffset, this.data.pageOffset + this.data.pageSize)
}

return rle.encode(buf)
}

Bitfield.prototype._setIndex = function (i, value) {
Expand All @@ -120,7 +139,7 @@ Bitfield.prototype._setIndex = function (i, value) {
var start = 2 * i
var byte = (bitfield.getByte(start) & INDEX_UPDATE_MASK[o]) | (getIndexValue(value) >> (2 * o))
var len = bitfield.length
var maxLength = this.pages.length * 256
var maxLength = this.pages.length * this.indexSize

ite.seek(start)

Expand Down
81 changes: 69 additions & 12 deletions lib/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function Peer (feed, opts) {
this.feed = feed
this.stream = null // set by replicate just after creation
this.remoteId = null
this.wants = bitfield()
this.remoteBitfield = bitfield()
this.remoteLength = 0
this.remoteWant = false
Expand All @@ -55,6 +56,7 @@ function Peer (feed, opts) {

this.maxRequests = opts.maxRequests || feed.maxRequests || 16
this.inflightRequests = []
this.inflightWants = 0

this._index = -1
this._lastBytes = 0
Expand All @@ -65,11 +67,17 @@ function Peer (feed, opts) {
this._iterator = this.remoteBitfield.iterator()
}

Peer.prototype.onwant = function () {
// TODO: reply to the actual want context
Peer.prototype.onwant = function (want) {
if ((want.start & 8191) || (want.length & 8191)) return
if (!this.remoteWant && this.feed.length) {
// Eagerly send the length of the feed to the otherside
// TODO: only send this if the remote is not wanting a region
// where this is contained in
this.stream.have({ start: this.feed.length - 1 })
}
this.remoteWant = true
var rle = this.feed.bitfield.compress()
this.stream.have({start: 0, bitfield: rle})
var rle = this.feed.bitfield.compress(want.start, want.length)
this.stream.have({start: want.start, length: want.length, bitfield: rle})
}

Peer.prototype.ondata = function (data) {
Expand Down Expand Up @@ -187,11 +195,18 @@ Peer.prototype.onhave = function (have) {
var updated = this._first
if (this._first) this._first = false

if (have.length === 1024 * 1024) {
this.inflightWants--
}

if (have.bitfield) { // TODO: handle start !== 0
if (have.length === 0 || have.length === 1) { // length === 1 is for backwards compat
this.wants = null // we are in backwards compat mode where we subscribe everything
}
var buf = rle.decode(have.bitfield)
var bits = buf.length * 8
remoteAndNotLocal(this.feed.bitfield, buf, this.remoteBitfield.littleEndian)
this.remoteBitfield.fill(buf, 0)
remoteAndNotLocal(this.feed.bitfield, buf, this.remoteBitfield.littleEndian, have.start)
this.remoteBitfield.fill(buf, have.start)
if (bits > this.remoteLength) {
this.remoteLength = this.remoteBitfield.last() + 1
updated = true
Expand Down Expand Up @@ -282,6 +297,7 @@ Peer.prototype.haveBytes = function (bytes) { // called by feed
Peer.prototype.update = function () {
// do nothing
while (this._update()) {}
this._sendWantsMaybe()
}

Peer.prototype._update = function () {
Expand Down Expand Up @@ -336,9 +352,7 @@ Peer.prototype._update = function () {

Peer.prototype.ready = function () {
set.add(this.feed.peers, this)
if (this.downloading) {
this.stream.want({start: 0}) // TODO: don't just subscribe to *EVERYTHING* hehe
}
this._sendWants()
this.feed.emit('peer-add', this)
}

Expand Down Expand Up @@ -383,6 +397,49 @@ Peer.prototype.destroy = function (err) {
this._close()
}

Peer.prototype._sendWantsMaybe = function () {
if (this.inflightRequests.length < this.maxRequests) this._sendWants()
}

Peer.prototype._sendWants = function () {
if (!this.wants || !this.downloading) return
if (this.inflightWants >= 16) return

var i

for (i = 0; i < this.feed._waiting.length; i++) {
var w = this.feed._waiting[i]
if (w.index === -1) this._sendWantRange(w)
else this._sendWant(w.index)
if (this.inflightWants >= 16) return
}

for (i = 0; i < this.feed._selections.length; i++) {
var s = this.feed._selections[i]
this._sendWantRange(s)
if (this.inflightWants >= 16) return
}

// always sub to the first range for now, usually what you want
this._sendWant(0)
}

Peer.prototype._sendWantRange = function (s) {
if (s.want >= this.remoteLength) return
if (s.end !== -1 && s.want >= s.end) return
if (this._sendWant(s.want)) s.want += 1024 * 1024
}

Peer.prototype._sendWant = function (index) {
var len = 1024 * 1024
var j = Math.floor(index / len)
if (this.wants.get(j)) return false
this.wants.set(j, true)
this.inflightWants++
this.stream.want({start: j * len, length: len})
return true
}

Peer.prototype._downloadWaiting = function (wait) {
if (!wait.bytes) {
if (!this.remoteBitfield.get(wait.index) || !this.feed._reserved.set(wait.index, true)) return
Expand Down Expand Up @@ -466,11 +523,11 @@ function createView (page) {
return new DataView(buf.buffer, buf.byteOffset, 1024)
}

function remoteAndNotLocal (local, buf, le) {
function remoteAndNotLocal (local, buf, le, start) {
var remote = new DataView(buf.buffer, buf.byteOffset)
var len = Math.floor(buf.length / 4)
var arr = new Uint32Array(buf.buffer, buf.byteOffset, len)
var p = 0
var p = start / 8192 // 8192 is bits per bitfield page
var l = 0
var page = createView(local.pages.get(p++, true))

Expand All @@ -487,5 +544,5 @@ function remoteAndNotLocal (local, buf, le) {
function nextRandom (ite, start, end) {
var len = end - start
var i = ite.seek(Math.floor(Math.random() * len) + start).next(true)
return i === -1 ? ite.seek(start).next(true) : i
return i === -1 || i >= end ? ite.seek(start).next(true) : i
}
46 changes: 34 additions & 12 deletions lib/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,20 @@ Storage.prototype.open = function (opts, cb) {
if (!this.signatures) this.signatures = this.create('signatures', opts)

var result = {
bitfield: bufferAlloc(0),
bitfield: [],
bitfieldPageSize: 3584, // we upgraded the page size to fix a bug
secretKey: null,
key: null
}

this.bitfield.write(0, header(0, 3328, null), function (err) {
if (err) return cb(err)
readAll(self.bitfield, 32, 3328, function (err, data) {
if (data) result.bitfield = data
done(err)
this.bitfield.read(0, 32, function (_, h) {
if (h) result.bitfieldPageSize = h.readUInt16BE(5)
self.bitfield.write(0, header(0, result.bitfieldPageSize, null), function (err) {
if (err) return cb(err)
readAll(self.bitfield, 32, result.bitfieldPageSize, function (err, pages) {
if (pages) result.bitfield = pages
done(err)
})
})
})

Expand Down Expand Up @@ -316,23 +320,41 @@ function close (st, cb) {
else cb()
}

function statAndReadAll (st, offset, cb) {
function statAndReadAll (st, offset, pageSize, cb) {
st.stat(function (err, stat) {
if (err) return cb(null, bufferAlloc(0))
st.read(offset, stat.size - offset, cb)
if (err) return cb(null, [])

var result = []

loop(null, null)

function loop (err, batch) {
if (err) return cb(err)

if (batch) {
offset += batch.length
for (var i = 0; i < batch.length; i += pageSize) {
result.push(batch.slice(i, i + pageSize))
}
}

var next = Math.min(stat.size - offset, 32 * pageSize)
if (!next) return cb(null, result)

st.read(offset, next, loop)
}
})
}

function readAll (st, offset, pageSize, cb) {
if (typeof st.length === 'number' && st.length > -1) return st.read(offset, st.length - offset, cb)
if (st.statable === true) return statAndReadAll(st, offset, cb)
if (st.statable === true) return statAndReadAll(st, offset, pageSize, cb)

var bufs = []

st.read(offset, pageSize, loop)

function loop (err, buf) {
if (err) return cb(null, Buffer.concat(bufs))
if (err) return cb(null, bufs)
bufs.push(buf)
st.read(offset + bufs.length * pageSize, pageSize, loop)
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"buffer-from": "^1.0.0",
"bulk-write-stream": "^1.1.3",
"codecs": "^1.2.0",
"fast-bitfield": "^1.2.1",
"fast-bitfield": "^1.2.2",
"flat-tree": "^1.6.0",
"from2": "^2.3.0",
"hypercore-crypto": "^1.0.0",
Expand All @@ -33,7 +33,7 @@
"unordered-set": "^2.0.0"
},
"devDependencies": {
"random-access-memory": "^2.2.0",
"random-access-memory": "^3.1.0",
"shuffle-array": "^1.0.1",
"speedometer": "^1.0.0",
"standard": "^8.6.0",
Expand Down
2 changes: 1 addition & 1 deletion test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ tape('create from existing keys', function (t) {
feed.append('hi', function () {
var otherFeed = hypercore(storage2, feed.key, { secretKey: feed.secretKey })
var store = otherFeed._storage
otherFeed.close(function () {
otherFeed.ready(function () {
store.open({key: feed.key}, function (err, data) {
t.error(err)
t.equals(data.key.toString('hex'), feed.key.toString('hex'))
Expand Down
Loading

0 comments on commit 547a68c

Please sign in to comment.