Skip to content

Commit

Permalink
feat: Add NamespaceSyncState (#313)
Browse files Browse the repository at this point in the history
* chore: emit peer-have messages don't persist

Fixes #309, removes responsibility from CoreManager to track peer-have
messages - will be handled by SyncState class

* feat: Add NamespaceSyncState

This combines all core sync states for a namespace. It listens to
CoreManager for new cores and pre-have messages.

* feat: add namespace to peer pre-have messages

* fix param name

* WIP tests

* add tests

* don't use eventEmitter

Since these are internal modules and we don't attach a "listener" other
than in the constructor, switching to a pattern that passes an
'onUpdate' constructor param, that avoids needing to track when event
listeners are removed
  • Loading branch information
gmaclennan authored Oct 9, 2023
1 parent 37dbd22 commit 05b0830
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 40 deletions.
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"@types/node": "^18.16.3",
"@types/sinonjs__fake-timers": "^8.1.2",
"@types/streamx": "^2.9.1",
"@types/throttle-debounce": "^5.0.0",
"@types/varint": "^6.0.1",
"bitfield": "^4.1.0",
"brittle": "^3.2.1",
Expand Down Expand Up @@ -142,6 +143,7 @@
"sodium-universal": "^4.0.0",
"start-stop-state-machine": "^1.2.0",
"sub-encoder": "^2.1.1",
"throttle-debounce": "^5.0.0",
"tiny-typed-emitter": "^2.1.0",
"varint": "^6.0.0",
"z32": "^1.0.1"
Expand Down
42 changes: 13 additions & 29 deletions src/sync/core-sync-state.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { TypedEmitter } from 'tiny-typed-emitter'
import { keyToId } from '../utils.js'
import RemoteBitfield, {
BITS_PER_PAGE,
Expand Down Expand Up @@ -32,46 +31,45 @@ import RemoteBitfield, {
* @property {PeerSimpleState} localState local state
* @property {Record<PeerId, RemotePeerSimpleState>} remoteStates map of state of all known peers
*/
/**
* @typedef {object} CoreSyncEvents
* @property {() => void} update
*/

/**
* Track sync state for a core identified by `discoveryId`. Can start tracking
* state before the core instance exists locally, via the "preHave" messages
* received over the project creator core.
*
* Because deriving the state is expensive (it iterates through the bitfields of
* all peers), this is designed to be pull-based: an `update` event signals that
* the state is updated, but does not pass the state. The consumer can "pull"
* the state when it wants it via `coreSyncState.getState()`.
* all peers), this is designed to be pull-based: the onUpdate event signals
* that the state is updated, but does not pass the state. The consumer can
* "pull" the state when it wants it via `coreSyncState.getState()`.
*
* Each peer (including the local peer) has a state of:
* 1. `have` - number of blocks the peer has locally
* 2. `want` - number of blocks the peer wants, and at least one peer has
* 3. `wanted` - number of blocks the peer has that at least one peer wants
* 4. `missing` - number of blocks the peer wants but no peer has
*
* @extends {TypedEmitter<CoreSyncEvents>}
*/
export class CoreSyncState extends TypedEmitter {
export class CoreSyncState {
/** @type {import('hypercore')<'binary', Buffer>} */
#core
/** @type {InternalState['remoteStates']} */
#remoteStates = new Map()
/** @type {InternalState['localState']} */
#localState = new PeerState()
#discoveryId
/** @type {DerivedState | null} */
#cachedState = null
#update

/**
* @param {string} discoveryId Discovery ID for the core that this is representing
* @param {() => void} onUpdate Called when a state update is available (via getState())
*/
constructor(discoveryId) {
super()
this.#discoveryId = discoveryId
constructor(onUpdate) {
// Called whenever the state changes, so we clear the cache because next
// call to getState() will need to re-derive the state
this.#update = () => {
this.#cachedState = null
process.nextTick(onUpdate)
}
}

/** @type {() => DerivedState} */
Expand All @@ -84,15 +82,6 @@ export class CoreSyncState extends TypedEmitter {
})
}

/**
* Called whenever the state changes, so we clear the cache because next call
* to getState() will need to re-derive the state
*/
#update() {
this.#cachedState = null
this.emit('update')
}

/**
* Attach a core. The sync state can be initialized without a core instance,
* because we could receive peer want and have states via extension messages
Expand All @@ -101,11 +90,6 @@ export class CoreSyncState extends TypedEmitter {
* @param {import('hypercore')<'binary', Buffer>} core
*/
attachCore(core) {
// @ts-ignore - we know discoveryKey exists here
const discoveryId = keyToId(core.discoveryKey)
if (discoveryId !== this.#discoveryId) {
throw new Error('discoveryId does not match')
}
if (this.#core) return

this.#core = core
Expand Down
102 changes: 102 additions & 0 deletions src/sync/namespace-sync-state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { CoreSyncState } from './core-sync-state.js'
import { discoveryKey } from 'hypercore-crypto'

/**
* @typedef {object} PeerSyncState
* @property {number} have
* @property {number} want
* @property {number} wanted
* @property {number} missing
*/

/**
* @typedef {object} SyncState
* @property {PeerSyncState} localState
*/

/**
* @template {import('../core-manager/index.js').Namespace} [TNamespace=import('../core-manager/index.js').Namespace]
*/
export class NamespaceSyncState {
/** @type {Map<string, CoreSyncState>} */
#coreStates = new Map()
#handleUpdate
#namespace

/**
* @param {object} opts
* @param {TNamespace} opts.namespace
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {() => void} opts.onUpdate Called when a state update is available (via getState())
*/
constructor({ namespace, coreManager, onUpdate }) {
this.#namespace = namespace
this.#handleUpdate = onUpdate

for (const { core, key } of coreManager.getCores(namespace)) {
this.#addCore(core, key)
}

coreManager.on('add-core', ({ core, namespace, key }) => {
if (namespace !== this.#namespace) return
this.#addCore(core, key)
})

coreManager.on('peer-have', (namespace, msg) => {
if (namespace !== this.#namespace) return
this.#insertPreHaves(msg)
})
}

get namespace() {
return this.#namespace
}

/** @returns {SyncState} */
getState() {
const state = {
localState: { have: 0, want: 0, wanted: 0, missing: 0 },
}
for (const crs of this.#coreStates.values()) {
const { localState } = crs.getState()
state.localState.have += localState.have
state.localState.want += localState.want
state.localState.wanted += localState.wanted
state.localState.missing += localState.missing
}
return state
}

/**
* @param {import('hypercore')<"binary", Buffer>} core
* @param {Buffer} coreKey
*/
#addCore(core, coreKey) {
const discoveryId = discoveryKey(coreKey).toString('hex')
this.#getCoreState(discoveryId).attachCore(core)
}

/**
* @param {{
* peerId: string,
* start: number,
* coreDiscoveryId: string,
* bitfield: Uint32Array
* }} opts
*/
#insertPreHaves({ peerId, start, coreDiscoveryId, bitfield }) {
this.#getCoreState(coreDiscoveryId).insertPreHaves(peerId, start, bitfield)
}

/**
* @param {string} discoveryId
*/
#getCoreState(discoveryId) {
let coreState = this.#coreStates.get(discoveryId)
if (!coreState) {
coreState = new CoreSyncState(this.#handleUpdate)
this.#coreStates.set(discoveryId, coreState)
}
return coreState
}
}
5 changes: 5 additions & 0 deletions tests/helpers/core-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import { KeyManager } from '@mapeo/crypto'
import RAM from 'random-access-memory'
import NoiseSecretStream from '@hyperswarm/secret-stream'

/**
*
* @param {Partial<ConstructorParameters<typeof CoreManager>[0]> & { rootKey?: Buffer }} param0
* @returns
*/
export function createCoreManager({
rootKey = randomBytes(16),
projectKey = randomBytes(32),
Expand Down
17 changes: 11 additions & 6 deletions tests/helpers/replication-state.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import NoiseSecretStream from '@hyperswarm/secret-stream'

import { truncateId } from '../../src/utils.js'
import { getKeys } from './core-manager.js'

export function logState(syncState, name) {
let message = `${name ? name + ' ' : ''}${
Expand Down Expand Up @@ -37,13 +36,19 @@ export async function download(
{ start = 0, end = -1 } = {}
) {
const writer = coreManager.getWriterCore(namespace)
const keys = getKeys(coreManager, namespace)

for (const key of keys) {
const donePromises = []
for (const { core, key } of coreManager.getCores(namespace)) {
if (key.equals(writer.core.key)) continue
const core = coreManager.getCoreByKey(key)
await core.download({ start, end, ifAvailable: true }).done()
donePromises.push(core.download({ start, end, ifAvailable: true }).done())
}
if (end !== -1) return Promise.all(donePromises)
return new Promise(() => {
coreManager.on('add-core', (coreRecord) => {
console.log('add-core')
if (coreRecord.namespace !== namespace) return
coreRecord.core.download({ start, end, ifAvailable: true }).done()
})
})
}

export async function downloadCore(
Expand Down
15 changes: 10 additions & 5 deletions tests/sync/core-sync-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { createCore } from '../helpers/index.js'
// import { setTimeout } from 'timers/promises'
import { once } from 'node:events'
import pTimeout from 'p-timeout'
import { EventEmitter } from 'node:events'

/**
* @type {Array<{
Expand Down Expand Up @@ -208,7 +209,8 @@ test('CoreReplicationState', async (t) => {
for (const { state, expected, message } of scenarios) {
const localCore = await createCore()
await localCore.ready()
const crs = new CoreSyncState(localCore.discoveryKey.toString('hex'))
const emitter = new EventEmitter()
const crs = new CoreSyncState(() => emitter.emit('update'))
crs.attachCore(localCore)
const blocks = new Array(state.length).fill('block')
await localCore.append(blocks)
Expand Down Expand Up @@ -249,7 +251,7 @@ test('CoreReplicationState', async (t) => {
connected: connectedState.get(peerId),
}
}
await updateWithTimeout(crs, 100)
await updateWithTimeout(emitter, 100)
t.alike(
crs.getState(),
{ ...expected, remoteStates: expectedRemoteStates },
Expand Down Expand Up @@ -383,11 +385,14 @@ function setPeerWants(state, peerId, bits) {

/**
* Wait for update event with a timeout
* @param {CoreSyncState} state
* @param {EventEmitter} updateEmitter
* @param {number} milliseconds
*/
async function updateWithTimeout(state, milliseconds) {
return pTimeout(once(state, 'update'), { milliseconds, message: false })
async function updateWithTimeout(updateEmitter, milliseconds) {
return pTimeout(once(updateEmitter, 'update'), {
milliseconds,
message: false,
})
}

/**
Expand Down
Loading

0 comments on commit 05b0830

Please sign in to comment.