diff --git a/src/core-manager/index.js b/src/core-manager/index.js index 728c043e..a9296b87 100644 --- a/src/core-manager/index.js +++ b/src/core-manager/index.js @@ -164,7 +164,7 @@ export class CoreManager extends TypedEmitter { }) this.#creatorCore.on('peer-add', (peer) => { - this.#sendHaves(peer) + this.#sendHaves(peer, this.#coreIndex) }) this.#creatorCore.on('peer-remove', (peer) => { // When a peer is removed we clean up any unanswered key requests, so that @@ -304,12 +304,29 @@ export class CoreManager extends TypedEmitter { peer._maybeWant(0, core.length) }) - // A non-writer core will emit 'append' when its length is updated from the - // initial sync with a peer, and we will not have sent a "maybe want" for - // this range, so we need to do it now. Subsequent appends are propogated - // (more efficiently) via range broadcasts, so we only need to listen to the - // first append. - if (!writer) { + if (writer) { + const sendHaves = () => { + for (const peer of this.#creatorCore.peers) { + this.#sendHaves(peer, [{ core, namespace }]) + } + } + + // Tell connected peers, who we aren't necessarily syncing with, about + // what we just added or cleared. Hypercore doesn't emit anything when + // clearing, so we patch it in. + core.on('append', sendHaves) + const originalClear = core.clear + core.clear = function clear() { + const result = originalClear.apply(this, /** @type {any} */ (arguments)) + result.then(sendHaves) + return result + } + } else { + // A non-writer core will emit 'append' when its length is updated from + // the initial sync with a peer, and we will not have sent a "maybe want" + // for this range, so we need to do it now. Subsequent appends are + // propagated (more efficiently) via range broadcasts, so we only need to + // listen to the first append. core.once('append', () => { for (const peer of core.peers) { // TODO: It would be more efficient (in terms of network traffic) to @@ -422,8 +439,9 @@ export class CoreManager extends TypedEmitter { /** * * @param {any} peer + * @param {Iterable<{ core: Hypercore, namespace: Namespace }>} cores */ - async #sendHaves(peer) { + async #sendHaves(peer, cores) { if (!peer) { console.warn('sendHaves no peer', peer.remotePublicKey) // TODO: How to handle this and when does it happen? @@ -432,7 +450,7 @@ export class CoreManager extends TypedEmitter { peer.protomux.cork() - for (const { core, namespace } of this.#coreIndex) { + for (const { core, namespace } of cores) { // We want ready() rather than update() because we are only interested in local data await core.ready() const { discoveryKey } = core diff --git a/test-e2e/sync.js b/test-e2e/sync.js index 4c448b98..e4a31ae0 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -1,5 +1,6 @@ // @ts-check import { test } from 'brittle' +import { pEvent } from 'p-event' import { setTimeout as delay } from 'timers/promises' import { excludeKeys } from 'filter-obj' import { @@ -20,6 +21,8 @@ import { valueOf } from '../src/utils.js' import pTimeout from 'p-timeout' import { BLOCKED_ROLE_ID, COORDINATOR_ROLE_ID } from '../src/roles.js' import { kSyncState } from '../src/sync/sync-api.js' +/** @typedef {import('../src/mapeo-project.js').MapeoProject} MapeoProject */ +/** @typedef {import('../src/sync/sync-api.js').SyncTypeState} SyncState */ const SCHEMAS_INITIAL_SYNC = ['preset', 'field'] @@ -362,3 +365,90 @@ test('Correct sync state prior to data sync', async function (t) { await disconnect2() await Promise.all(projects.map((p) => p.close())) }) + +test('pre-haves are updated', async (t) => { + const managers = await createManagers(2, t) + const [invitor, ...invitees] = managers + + const disconnect = connectPeers(managers) + t.teardown(disconnect) + + const projectId = await invitor.createProject({ name: 'Mapeo' }) + await invite({ invitor, invitees, projectId }) + const projects = await Promise.all( + managers.map((m) => m.getProject(projectId)) + ) + for (const project of projects) t.teardown(() => project.close()) + const [invitorProject, inviteeProject] = projects + await waitForSync(projects, 'initial') + + assertDataSyncStateMatches( + t, + invitorProject, + { have: 0, wanted: 0, dataToSync: false }, + 'Invitor project should have nothing to sync at start' + ) + assertDataSyncStateMatches( + t, + inviteeProject, + { have: 0, wanted: 0, dataToSync: false }, + 'Invitee project should see nothing to sync at start' + ) + + const invitorToSyncPromise = pEvent( + invitorProject.$sync, + 'sync-state', + (syncState) => + syncStateMatches(syncState.data, { + have: 1, + wanted: 1, + dataToSync: true, + }) + ) + const inviteeToSyncPromise = pEvent( + inviteeProject.$sync, + 'sync-state', + (syncState) => + syncStateMatches(syncState.data, { + have: 0, + want: 1, + dataToSync: true, + }) + ) + + await invitorProject.observation.create(valueOf(generate('observation')[0])) + + await Promise.all([invitorToSyncPromise, inviteeToSyncPromise]) + + assertDataSyncStateMatches( + t, + inviteeProject, + { have: 0, want: 1, dataToSync: true }, + 'Invitee project should learn about something to sync' + ) +}) + +/** + * @param {import('brittle').TestInstance} t + * @param {MapeoProject} project + * @param {Partial} expected + * @param {string} message + * @returns {void} + */ +function assertDataSyncStateMatches(t, project, expected, message) { + const actual = project.$sync.getState().data + t.ok(syncStateMatches(actual, expected), message) +} + +/** + * @param {SyncState} syncState + * @param {Partial} expected + * @returns {boolean} + */ +function syncStateMatches(syncState, expected) { + for (const [key, expectedValue] of Object.entries(expected)) { + const actualValue = /** @type {any} */ (syncState)[key] + if (actualValue !== expectedValue) return false + } + return true +}