Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update pre-haves as they change #594

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions src/core-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ export class CoreManager extends TypedEmitter {
})

this.#creatorCore.on('peer-add', (peer) => {
this.#sendHaves(peer)
this.#sendHaves(peer, this.#coreIndex)
})
this.#creatorCore.on('peer-remove', (peer) => {
// When a peer is removed we clean up any unanswered key requests, so that
Expand Down Expand Up @@ -304,12 +304,29 @@ export class CoreManager extends TypedEmitter {
peer._maybeWant(0, core.length)
})

// A non-writer core will emit 'append' when its length is updated from the
// initial sync with a peer, and we will not have sent a "maybe want" for
// this range, so we need to do it now. Subsequent appends are propogated
// (more efficiently) via range broadcasts, so we only need to listen to the
// first append.
Comment on lines -307 to -311
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this comment down and re-formatted it so no line was longer than 80 characters; I didn't change it other than fixing a typo.

if (!writer) {
if (writer) {
const sendHaves = () => {
for (const peer of this.#creatorCore.peers) {
this.#sendHaves(peer, [{ core, namespace }])
}
}

// Tell connected peers, who we aren't necessarily syncing with, about
// what we just added or cleared. Hypercore doesn't emit anything when
// clearing, so we patch it in.
core.on('append', sendHaves)
const originalClear = core.clear
core.clear = function clear() {
const result = originalClear.apply(this, /** @type {any} */ (arguments))
result.then(sendHaves)
return result
}
Comment on lines +318 to +323
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered patching Hypercore itself, which doesn't feel meaningfully different. I don't think Hypercore offers a way to address this in a better way, even on newer versions.

} else {
// A non-writer core will emit 'append' when its length is updated from
// the initial sync with a peer, and we will not have sent a "maybe want"
// for this range, so we need to do it now. Subsequent appends are
// propagated (more efficiently) via range broadcasts, so we only need to
// listen to the first append.
core.once('append', () => {
for (const peer of core.peers) {
// TODO: It would be more efficient (in terms of network traffic) to
Expand Down Expand Up @@ -422,8 +439,9 @@ export class CoreManager extends TypedEmitter {
/**
*
* @param {any} peer
* @param {Iterable<{ core: Hypercore<Hypercore.ValueEncoding, Buffer>, namespace: Namespace }>} cores
*/
async #sendHaves(peer) {
async #sendHaves(peer, cores) {
if (!peer) {
console.warn('sendHaves no peer', peer.remotePublicKey)
// TODO: How to handle this and when does it happen?
Expand All @@ -432,7 +450,7 @@ export class CoreManager extends TypedEmitter {

peer.protomux.cork()

for (const { core, namespace } of this.#coreIndex) {
for (const { core, namespace } of cores) {
// We want ready() rather than update() because we are only interested in local data
await core.ready()
const { discoveryKey } = core
Expand Down
90 changes: 90 additions & 0 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// @ts-check
import { test } from 'brittle'
import { pEvent } from 'p-event'
import { setTimeout as delay } from 'timers/promises'
import { excludeKeys } from 'filter-obj'
import {
Expand All @@ -20,6 +21,8 @@ import { valueOf } from '../src/utils.js'
import pTimeout from 'p-timeout'
import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/roles.js'
import { kSyncState } from '../src/sync/sync-api.js'
/** @typedef {import('../src/mapeo-project.js').MapeoProject} MapeoProject */
/** @typedef {import('../src/sync/sync-api.js').SyncTypeState} SyncState */

const SCHEMAS_INITIAL_SYNC = ['preset', 'field']

Expand Down Expand Up @@ -362,3 +365,90 @@ test('Correct sync state prior to data sync', async function (t) {
await disconnect2()
await Promise.all(projects.map((p) => p.close()))
})

test('pre-haves are updated', async (t) => {
const managers = await createManagers(2, t)
const [invitor, ...invitees] = managers

const disconnect = connectPeers(managers)
t.teardown(disconnect)

const projectId = await invitor.createProject({ name: 'Mapeo' })
await invite({ invitor, invitees, projectId })
const projects = await Promise.all(
managers.map((m) => m.getProject(projectId))
)
for (const project of projects) t.teardown(() => project.close())
const [invitorProject, inviteeProject] = projects
await waitForSync(projects, 'initial')

assertDataSyncStateMatches(
t,
invitorProject,
{ have: 0, wanted: 0, dataToSync: false },
'Invitor project should have nothing to sync at start'
)
assertDataSyncStateMatches(
t,
inviteeProject,
{ have: 0, wanted: 0, dataToSync: false },
'Invitee project should see nothing to sync at start'
)

const invitorToSyncPromise = pEvent(
invitorProject.$sync,
'sync-state',
(syncState) =>
syncStateMatches(syncState.data, {
have: 1,
wanted: 1,
dataToSync: true,
})
)
const inviteeToSyncPromise = pEvent(
inviteeProject.$sync,
'sync-state',
(syncState) =>
syncStateMatches(syncState.data, {
have: 0,
want: 1,
dataToSync: true,
})
)

await invitorProject.observation.create(valueOf(generate('observation')[0]))

await Promise.all([invitorToSyncPromise, inviteeToSyncPromise])

assertDataSyncStateMatches(
t,
inviteeProject,
{ have: 0, want: 1, dataToSync: true },
'Invitee project should learn about something to sync'
)
})

/**
* @param {import('brittle').TestInstance} t
* @param {MapeoProject} project
* @param {Partial<SyncState>} expected
* @param {string} message
* @returns {void}
*/
function assertDataSyncStateMatches(t, project, expected, message) {
const actual = project.$sync.getState().data
t.ok(syncStateMatches(actual, expected), message)
}

/**
* @param {SyncState} syncState
* @param {Partial<SyncState>} expected
* @returns {boolean}
*/
function syncStateMatches(syncState, expected) {
for (const [key, expectedValue] of Object.entries(expected)) {
const actualValue = /** @type {any} */ (syncState)[key]
if (actualValue !== expectedValue) return false
}
return true
}
Loading