Skip to content

Commit

Permalink
fix: sync state prior to full sync should be correct (#523)
Browse files Browse the repository at this point in the history
* upstream remote-bitfield fixes

* holepunchto/hypercore@66ca5f7
* holepunchto/hypercore@62fc0d4
* holepunchto/hypercore@9c3cbe7

* update tests for remote-bitfield

* breaking test for pre-sync state

* WIP fixes

* fix up and clean up

* Fix remaining bugs

Co-authored-by: [email protected] <[email protected]>

* fix type

* fix unit tests for CoreSyncState

* fix broken unit test

(thanks to clunky way of fixing things)

* add review suggestions

---------

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
gmaclennan and EvanHahn authored Mar 27, 2024
1 parent b321922 commit 0952188
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 30 deletions.
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"cpy-cli": "^5.0.0",
"drizzle-kit": "^0.19.12",
"eslint": "^8.57.0",
"filter-obj": "^6.0.0",
"husky": "^8.0.0",
"iterpal": "^0.3.0",
"light-my-request": "^5.10.0",
Expand Down
1 change: 0 additions & 1 deletion src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ export class CoreManager extends TypedEmitter {
for (const { core, namespace } of this.#coreIndex) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
if (core.length === 0) continue
const { discoveryKey } = core
// This will always be defined after ready(), but need to let TS know
if (!discoveryKey) continue
Expand Down
38 changes: 32 additions & 6 deletions src/core-manager/remote-bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ class RemoteBitfieldPage {
setRange(start, length, val) {
quickbit.fill(this.bitfield, val, start, start + length)

let i = Math.floor(start / 32)
const n = i + Math.ceil(length / 32)
let i = Math.floor(start / 128)
const n = i + Math.ceil(length / 128)

while (i < n) this.tree.update(this.offset * 8 + i++ * 32)
while (i <= n) this.tree.update(this.offset * 8 + i++ * 128)
}
/**
*
Expand All @@ -93,6 +93,7 @@ class RemoteBitfieldPage {
*/
insert(start, bitfield) {
this.bitfield.set(bitfield, start / 32)
this.segment.refresh()
}
}

Expand All @@ -108,17 +109,27 @@ class RemoteBitfieldSegment {
quickbit.Index.from([], BYTES_PER_SEGMENT)
)
this.pages = new Array(PAGES_PER_SEGMENT)
this.pagesLength = 0
}

get chunks() {
return this.tree.chunks
}

refresh() {
this.tree = /** @type {import('quickbit-universal').SparseIndex} */ (
quickbit.Index.from(this.tree.chunks, BYTES_PER_SEGMENT)
)
}

/**
* @param {RemoteBitfieldPage} page
*/
add(page) {
this.pages[page.index - this.index * PAGES_PER_SEGMENT] = page
const pageIndex = page.index - this.index * PAGES_PER_SEGMENT
if (pageIndex >= this.pagesLength) this.pagesLength = pageIndex + 1

this.pages[pageIndex] = page

const chunk = { field: page.bitfield, offset: page.offset }

Expand All @@ -145,7 +156,7 @@ class RemoteBitfieldSegment {

if (i >= PAGES_PER_SEGMENT) return -1

while (i < this.pages.length) {
while (i < this.pagesLength) {
const p = this.pages[i]

let index = -1
Expand Down Expand Up @@ -198,6 +209,7 @@ export default class RemoteBitfield {
this._pages = new BigSparseArray()
/** @type {BigSparseArray<RemoteBitfieldSegment>} */
this._segments = new BigSparseArray()
this._maxSegments = 0
}

/**
Expand All @@ -212,6 +224,17 @@ export default class RemoteBitfield {
return p ? p.get(j) : false
}

/**
* @param {number} index
*/
getBitfield(index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE

const p = this._pages.get(i)
return p || null
}

/**
* @param {number} index
* @param {boolean} val
Expand All @@ -227,6 +250,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand Down Expand Up @@ -254,6 +278,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand All @@ -280,7 +305,7 @@ export default class RemoteBitfield {
let j = position & (BITS_PER_SEGMENT - 1)
let i = (position - j) / BITS_PER_SEGMENT

while (i < this._segments.maxLength) {
while (i < this._maxSegments) {
const s = this._segments.get(i)

let index = -1
Expand Down Expand Up @@ -366,6 +391,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand Down
56 changes: 48 additions & 8 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import RemoteBitfield, {
* @property {number | undefined} length Core length, e.g. how many blocks in the core (including blocks that are not downloaded)
* @property {PeerState} localState
* @property {Map<PeerId, PeerState>} remoteStates
* @property {Map<string, import('./peer-sync-controller.js').PeerSyncController>} peerSyncControllers
* @property {import('../core-manager/index.js').Namespace} namespace
*/
/**
* @typedef {object} CoreState
Expand Down Expand Up @@ -58,12 +60,20 @@ export class CoreSyncState {
#localState = new PeerState()
/** @type {DerivedState | null} */
#cachedState = null
#preHavesLength = 0
#update
#peerSyncControllers
#namespace

/**
* @param {() => void} onUpdate Called when a state update is available (via getState())
* @param {object} opts
* @param {() => void} opts.onUpdate Called when a state update is available (via getState())
* @param {Map<string, import('./peer-sync-controller.js').PeerSyncController>} opts.peerSyncControllers
* @param {import('../core-manager/index.js').Namespace} opts.namespace
*/
constructor(onUpdate) {
constructor({ onUpdate, peerSyncControllers, namespace }) {
this.#peerSyncControllers = peerSyncControllers
this.#namespace = namespace
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#update = () => {
Expand All @@ -75,10 +85,13 @@ export class CoreSyncState {
/** @type {() => DerivedState} */
getState() {
if (this.#cachedState) return this.#cachedState
const localCoreLength = this.#core?.length || 0
return deriveState({
length: this.#core?.length,
length: Math.max(localCoreLength, this.#preHavesLength),
localState: this.#localState,
remoteStates: this.#remoteStates,
peerSyncControllers: this.#peerSyncControllers,
namespace: this.#namespace,
})
}

Expand Down Expand Up @@ -134,6 +147,10 @@ export class CoreSyncState {
insertPreHaves(peerId, start, bitfield) {
const peerState = this.#getPeerState(peerId)
peerState.insertPreHaves(start, bitfield)
this.#preHavesLength = Math.max(
this.#preHavesLength,
peerState.preHavesBitfield.lastSet(start + bitfield.length * 32) + 1
)
this.#update()
}

Expand All @@ -153,6 +170,14 @@ export class CoreSyncState {
this.#update()
}

/**
* @param {PeerId} peerId
*/
addPeer(peerId) {
if (this.#remoteStates.has(peerId)) return
this.#remoteStates.set(peerId, new PeerState())
}

/**
* @param {PeerId} peerId
*/
Expand Down Expand Up @@ -241,6 +266,9 @@ export class PeerState {
constructor({ wantAll = true } = {}) {
this.#wantAll = wantAll
}
get preHavesBitfield() {
return this.#preHaves
}
/**
* @param {number} start
* @param {Uint32Array} bitfield
Expand Down Expand Up @@ -274,7 +302,7 @@ export class PeerState {
* @param {number} index
*/
have(index) {
return this.#haves ? this.#haves.get(index) : this.#preHaves.get(index)
return this.#haves?.get(index) || this.#preHaves.get(index)
}
/**
* Return the "haves" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -284,8 +312,9 @@ export class PeerState {
* the 32 blocks from `index`
*/
haveWord(index) {
if (this.#haves) return getBitfieldWord(this.#haves, index)
return getBitfieldWord(this.#preHaves, index)
const preHaveWord = getBitfieldWord(this.#preHaves, index)
if (!this.#haves) return preHaveWord
return preHaveWord | getBitfieldWord(this.#haves, index)
}
/**
* Returns whether this peer wants block at `index`. Defaults to `true` for
Expand Down Expand Up @@ -323,8 +352,19 @@ export class PeerState {
* Only exporteed for testing
*/
export function deriveState(coreState) {
const peerIds = ['local', ...coreState.remoteStates.keys()]
const peers = [coreState.localState, ...coreState.remoteStates.values()]
const peerIds = ['local']
const peers = [coreState.localState]

for (const [peerId, peerState] of coreState.remoteStates.entries()) {
const psc = coreState.peerSyncControllers.get(peerId)
const isBlocked = psc?.syncCapability[coreState.namespace] === 'blocked'
// Currently we do not include blocked peers in sync state - it's unclear
// how to expose this state in a meaningful way for considering sync
// completion, because blocked peers do not sync.
if (isBlocked) continue
peerIds.push(peerId)
peers.push(peerState)
}

/** @type {CoreState[]} */
const peerStates = new Array(peers.length)
Expand Down
20 changes: 18 additions & 2 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ export class NamespaceSyncState {
#namespace
/** @type {SyncState | null} */
#cachedState = null
#peerSyncControllers

/**
* @param {object} opts
* @param {TNamespace} opts.namespace
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {() => void} opts.onUpdate Called when a state update is available (via getState())
* @param {Map<string, import('./peer-sync-controller.js').PeerSyncController>} opts.peerSyncControllers
*/
constructor({ namespace, coreManager, onUpdate }) {
constructor({ namespace, coreManager, onUpdate, peerSyncControllers }) {
this.#namespace = namespace
this.#peerSyncControllers = peerSyncControllers
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#handleUpdate = () => {
Expand Down Expand Up @@ -81,6 +84,15 @@ export class NamespaceSyncState {
return state
}

/**
* @param {string} peerId
*/
addPeer(peerId) {
for (const css of this.#coreStates.values()) {
css.addPeer(peerId)
}
}

/**
* @param {import('hypercore')<"binary", Buffer>} core
* @param {Buffer} coreKey
Expand Down Expand Up @@ -109,7 +121,11 @@ export class NamespaceSyncState {
#getCoreState(discoveryId) {
let coreState = this.#coreStates.get(discoveryId)
if (!coreState) {
coreState = new CoreSyncState(this.#handleUpdate)
coreState = new CoreSyncState({
onUpdate: this.#handleUpdate,
peerSyncControllers: this.#peerSyncControllers,
namespace: this.#namespace,
})
this.#coreStates.set(discoveryId, coreState)
}
return coreState
Expand Down
18 changes: 15 additions & 3 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class SyncApi extends TypedEmitter {
#roles
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()
/** @type {Map<string, PeerSyncController>} */
#pscByPeerId = new Map()
/** @type {Set<string>} */
#peerIds = new Set()
/** @type {Set<'local' | 'remote'>} */
Expand All @@ -67,7 +69,11 @@ export class SyncApi extends TypedEmitter {
this.#l = Logger.create('syncApi', logger)
this.#coreManager = coreManager
this.#roles = roles
this[kSyncState] = new SyncState({ coreManager, throttleMs })
this[kSyncState] = new SyncState({
coreManager,
throttleMs,
peerSyncControllers: this.#pscByPeerId,
})
this[kSyncState].setMaxListeners(0)
this[kSyncState].on('state', (namespaceSyncState) => {
const state = this.#getState(namespaceSyncState)
Expand Down Expand Up @@ -195,7 +201,11 @@ export class SyncApi extends TypedEmitter {
logger: this.#l,
})
this.#peerSyncControllers.set(protomux, peerSyncController)
if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId)
this.#pscByPeerId.set(peerSyncController.peerId, peerSyncController)
this.#peerIds.add(peerSyncController.peerId)

// Add peer to all core states (via namespace sync states)
this[kSyncState].addPeer(peerSyncController.peerId)

if (this.#dataSyncEnabled.has('local')) {
peerSyncController.enableDataSync()
Expand Down Expand Up @@ -228,7 +238,9 @@ export class SyncApi extends TypedEmitter {
return
}
this.#peerSyncControllers.delete(protomux)
this.#peerIds.delete(keyToId(peer.remotePublicKey))
const peerId = keyToId(peer.remotePublicKey)
this.#pscByPeerId.delete(peerId)
this.#peerIds.delete(peerId)
this.#pendingDiscoveryKeys.delete(protomux)
}
}
Expand Down
Loading

0 comments on commit 0952188

Please sign in to comment.