Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync state prior to full sync should be correct #523

Merged
merged 11 commits into from
Mar 27, 2024
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"cpy-cli": "^5.0.0",
"drizzle-kit": "^0.19.12",
"eslint": "^8.57.0",
"filter-obj": "^6.0.0",
"husky": "^8.0.0",
"iterpal": "^0.3.0",
"light-my-request": "^5.10.0",
Expand Down
2 changes: 1 addition & 1 deletion src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ export class CoreManager extends TypedEmitter {
for (const { core, namespace } of this.#coreIndex) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
if (core.length === 0) continue
// if (core.length === 0) continue
gmaclennan marked this conversation as resolved.
Show resolved Hide resolved
const { discoveryKey } = core
// This will always be defined after ready(), but need to let TS know
if (!discoveryKey) continue
Expand Down
38 changes: 32 additions & 6 deletions src/core-manager/remote-bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ class RemoteBitfieldPage {
setRange(start, length, val) {
quickbit.fill(this.bitfield, val, start, start + length)

let i = Math.floor(start / 32)
const n = i + Math.ceil(length / 32)
let i = Math.floor(start / 128)
const n = i + Math.ceil(length / 128)

while (i < n) this.tree.update(this.offset * 8 + i++ * 32)
while (i <= n) this.tree.update(this.offset * 8 + i++ * 128)
}
/**
*
Expand All @@ -93,6 +93,7 @@ class RemoteBitfieldPage {
*/
insert(start, bitfield) {
this.bitfield.set(bitfield, start / 32)
this.segment.refresh()
}
}

Expand All @@ -108,17 +109,27 @@ class RemoteBitfieldSegment {
quickbit.Index.from([], BYTES_PER_SEGMENT)
)
this.pages = new Array(PAGES_PER_SEGMENT)
this.pagesLength = 0
}

get chunks() {
return this.tree.chunks
}

refresh() {
this.tree = /** @type {import('quickbit-universal').SparseIndex} */ (
quickbit.Index.from(this.tree.chunks, BYTES_PER_SEGMENT)
)
}

/**
* @param {RemoteBitfieldPage} page
*/
add(page) {
this.pages[page.index - this.index * PAGES_PER_SEGMENT] = page
const pageIndex = page.index - this.index * PAGES_PER_SEGMENT
if (pageIndex >= this.pagesLength) this.pagesLength = pageIndex + 1

this.pages[pageIndex] = page

const chunk = { field: page.bitfield, offset: page.offset }

Expand All @@ -145,7 +156,7 @@ class RemoteBitfieldSegment {

if (i >= PAGES_PER_SEGMENT) return -1

while (i < this.pages.length) {
while (i < this.pagesLength) {
const p = this.pages[i]

let index = -1
Expand Down Expand Up @@ -198,6 +209,7 @@ export default class RemoteBitfield {
this._pages = new BigSparseArray()
/** @type {BigSparseArray<RemoteBitfieldSegment>} */
this._segments = new BigSparseArray()
this._maxSegments = 0
}

/**
Expand All @@ -212,6 +224,17 @@ export default class RemoteBitfield {
return p ? p.get(j) : false
}

/**
* @param {number} index
*/
getBitfield(index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE

const p = this._pages.get(i)
return p || null
}

/**
* @param {number} index
* @param {boolean} val
Expand All @@ -227,6 +250,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand Down Expand Up @@ -254,6 +278,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand All @@ -280,7 +305,7 @@ export default class RemoteBitfield {
let j = position & (BITS_PER_SEGMENT - 1)
let i = (position - j) / BITS_PER_SEGMENT

while (i < this._segments.maxLength) {
while (i < this._maxSegments) {
const s = this._segments.get(i)

let index = -1
Expand Down Expand Up @@ -366,6 +391,7 @@ export default class RemoteBitfield {
const s =
this._segments.get(k) ||
this._segments.set(k, new RemoteBitfieldSegment(k))
if (this._maxSegments <= k) this._maxSegments = k + 1

p = this._pages.set(
i,
Expand Down
56 changes: 48 additions & 8 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import RemoteBitfield, {
* @property {number | undefined} length Core length, e.g. how many blocks in the core (including blocks that are not downloaded)
* @property {PeerState} localState
* @property {Map<PeerId, PeerState>} remoteStates
* @property {Map<string, import('./peer-sync-controller.js').PeerSyncController>} peerSyncControllers
* @property {import('../core-manager/index.js').Namespace} namespace
*/
/**
* @typedef {object} CoreState
Expand Down Expand Up @@ -58,12 +60,20 @@ export class CoreSyncState {
#localState = new PeerState()
/** @type {DerivedState | null} */
#cachedState = null
#preHavesLength = 0
#update
#peerSyncControllers
#namespace

/**
* @param {() => void} onUpdate Called when a state update is available (via getState())
* @param {object} opts
* @param {() => void} opts.onUpdate Called when a state update is available (via getState())
* @param {Map<string, import('./peer-sync-controller.js').PeerSyncController>} opts.peerSyncControllers
* @param {import('../core-manager/index.js').Namespace} opts.namespace
*/
constructor(onUpdate) {
constructor({ onUpdate, peerSyncControllers, namespace }) {
this.#peerSyncControllers = peerSyncControllers
this.#namespace = namespace
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#update = () => {
Expand All @@ -75,10 +85,13 @@ export class CoreSyncState {
/** @type {() => DerivedState} */
getState() {
if (this.#cachedState) return this.#cachedState
const localCoreLength = this.#core?.length || 0
return deriveState({
length: this.#core?.length,
length: Math.max(localCoreLength, this.#preHavesLength),
localState: this.#localState,
remoteStates: this.#remoteStates,
peerSyncControllers: this.#peerSyncControllers,
namespace: this.#namespace,
})
}

Expand Down Expand Up @@ -134,6 +147,10 @@ export class CoreSyncState {
insertPreHaves(peerId, start, bitfield) {
const peerState = this.#getPeerState(peerId)
peerState.insertPreHaves(start, bitfield)
this.#preHavesLength = Math.max(
this.#preHavesLength,
peerState.preHavesBitfield.lastSet(start + bitfield.length * 32) + 1
gmaclennan marked this conversation as resolved.
Show resolved Hide resolved
)
this.#update()
}

Expand All @@ -153,6 +170,14 @@ export class CoreSyncState {
this.#update()
}

/**
* @param {PeerId} peerId
*/
addPeer(peerId) {
if (this.#remoteStates.has(peerId)) return
this.#remoteStates.set(peerId, new PeerState())
}

/**
* @param {PeerId} peerId
*/
Expand Down Expand Up @@ -241,6 +266,9 @@ export class PeerState {
constructor({ wantAll = true } = {}) {
this.#wantAll = wantAll
}
get preHavesBitfield() {
return this.#preHaves
}
/**
* @param {number} start
* @param {Uint32Array} bitfield
Expand Down Expand Up @@ -274,7 +302,7 @@ export class PeerState {
* @param {number} index
*/
have(index) {
return this.#haves ? this.#haves.get(index) : this.#preHaves.get(index)
return this.#haves?.get(index) || this.#preHaves.get(index)
}
/**
* Return the "haves" for the 32 blocks from `index`, as a 32-bit integer
Expand All @@ -284,8 +312,9 @@ export class PeerState {
* the 32 blocks from `index`
*/
haveWord(index) {
if (this.#haves) return getBitfieldWord(this.#haves, index)
return getBitfieldWord(this.#preHaves, index)
const preHaveWord = getBitfieldWord(this.#preHaves, index)
if (!this.#haves) return preHaveWord
return preHaveWord | getBitfieldWord(this.#haves, index)
}
/**
* Returns whether this peer wants block at `index`. Defaults to `true` for
Expand Down Expand Up @@ -323,8 +352,19 @@ export class PeerState {
* Only exporteed for testing
*/
export function deriveState(coreState) {
const peerIds = ['local', ...coreState.remoteStates.keys()]
const peers = [coreState.localState, ...coreState.remoteStates.values()]
const peerIds = ['local']
const peers = [coreState.localState]

for (const [peerId, peerState] of coreState.remoteStates.entries()) {
const psc = coreState.peerSyncControllers.get(peerId)
const isBlocked = psc?.syncCapability[coreState.namespace] === 'blocked'
// Currently we do not include blocked peers in sync state - it's unclear
// how to expose this state in a meaningful way for considering sync
// completion, because blocked peers do not sync.
if (isBlocked) continue
peerIds.push(peerId)
peers.push(peerState)
}

/** @type {CoreState[]} */
const peerStates = new Array(peers.length)
Expand Down
20 changes: 18 additions & 2 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ export class NamespaceSyncState {
#namespace
/** @type {SyncState | null} */
#cachedState = null
#peerSyncControllers

/**
* @param {object} opts
* @param {TNamespace} opts.namespace
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {() => void} opts.onUpdate Called when a state update is available (via getState())
* @param {Map<string, import('./peer-sync-controller.js').PeerSyncController>} opts.peerSyncControllers
*/
constructor({ namespace, coreManager, onUpdate }) {
constructor({ namespace, coreManager, onUpdate, peerSyncControllers }) {
this.#namespace = namespace
this.#peerSyncControllers = peerSyncControllers
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#handleUpdate = () => {
Expand Down Expand Up @@ -81,6 +84,15 @@ export class NamespaceSyncState {
return state
}

/**
* @param {string} peerId
*/
addPeer(peerId) {
for (const css of this.#coreStates.values()) {
css.addPeer(peerId)
}
}

/**
* @param {import('hypercore')<"binary", Buffer>} core
* @param {Buffer} coreKey
Expand Down Expand Up @@ -109,7 +121,11 @@ export class NamespaceSyncState {
#getCoreState(discoveryId) {
let coreState = this.#coreStates.get(discoveryId)
if (!coreState) {
coreState = new CoreSyncState(this.#handleUpdate)
coreState = new CoreSyncState({
onUpdate: this.#handleUpdate,
peerSyncControllers: this.#peerSyncControllers,
namespace: this.#namespace,
})
this.#coreStates.set(discoveryId, coreState)
}
return coreState
Expand Down
18 changes: 15 additions & 3 deletions src/sync/sync-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class SyncApi extends TypedEmitter {
#roles
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()
/** @type {Map<string, PeerSyncController>} */
#pscByPeerId = new Map()
/** @type {Set<string>} */
#peerIds = new Set()
/** @type {Set<'local' | 'remote'>} */
Expand All @@ -67,7 +69,11 @@ export class SyncApi extends TypedEmitter {
this.#l = Logger.create('syncApi', logger)
this.#coreManager = coreManager
this.#roles = roles
this[kSyncState] = new SyncState({ coreManager, throttleMs })
this[kSyncState] = new SyncState({
coreManager,
throttleMs,
peerSyncControllers: this.#pscByPeerId,
})
this[kSyncState].setMaxListeners(0)
this[kSyncState].on('state', (namespaceSyncState) => {
const state = this.#getState(namespaceSyncState)
Expand Down Expand Up @@ -195,7 +201,11 @@ export class SyncApi extends TypedEmitter {
logger: this.#l,
})
this.#peerSyncControllers.set(protomux, peerSyncController)
if (peerSyncController.peerId) this.#peerIds.add(peerSyncController.peerId)
this.#pscByPeerId.set(peerSyncController.peerId, peerSyncController)
this.#peerIds.add(peerSyncController.peerId)

// Add peer to all core states (via namespace sync states)
this[kSyncState].addPeer(peerSyncController.peerId)

if (this.#dataSyncEnabled.has('local')) {
peerSyncController.enableDataSync()
Expand Down Expand Up @@ -228,7 +238,9 @@ export class SyncApi extends TypedEmitter {
return
}
this.#peerSyncControllers.delete(protomux)
this.#peerIds.delete(keyToId(peer.remotePublicKey))
const peerId = keyToId(peer.remotePublicKey)
this.#pscByPeerId.delete(peerId)
this.#peerIds.delete(peerId)
this.#pendingDiscoveryKeys.delete(protomux)
}
}
Expand Down
Loading
Loading