Skip to content

Commit

Permalink
fix TipFetcher when no messageBus available
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Oct 11, 2024
1 parent ac8910f commit a1b0ead
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
16 changes: 15 additions & 1 deletion packages/core/src/initialization/stream-loading.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ import { LogSyncer } from '../stream-loading/log-syncer.js'
import { StateManipulator } from '../stream-loading/state-manipulator.js'
import { AnchorValidator } from '../anchor/anchor-service.js'
import { HandlersMap } from '../handlers-map.js'
import { StreamID } from '@ceramicnetwork/streamid'
import { Observable, empty } from 'rxjs'
import type { CID } from 'multiformats/cid'

const noopPubsubQuerier = {
queryNetwork(streamId: StreamID): Observable<CID> {
return empty()
},
}

export function makeStreamLoaderAndUpdater(
logger: DiagnosticsLogger,
Expand All @@ -17,7 +26,12 @@ export function makeStreamLoaderAndUpdater(
streamHandlers: HandlersMap
): [StreamLoader, StreamUpdater] {
const anchorTimestampExtractor = new AnchorTimestampExtractor(logger, dispatcher, anchorValidator)
const tipFetcher = new TipFetcher(dispatcher.messageBus)
if (!dispatcher.messageBus) {
logger.warn("No pubsub querier detected, won't be able to load tips from the network")
}
const tipFetcher = new TipFetcher(
dispatcher.messageBus ? dispatcher.messageBus : noopPubsubQuerier
)
const logSyncer = new LogSyncer(dispatcher)
const stateManipulator = new StateManipulator(logger, streamHandlers, logSyncer, api)
const streamLoader = new StreamLoader(
Expand Down
34 changes: 11 additions & 23 deletions packages/stream-tests/src/__tests__/ceramic_sync_disabled.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ const makeCeramicCore = async (
return core
}

// should pass on v4 when updated from TileDocument

describeIfV3('Cross node syncing disabled', () => {
describe('Cross node syncing disabled', () => {
jest.setTimeout(20000)

let ipfs1: IpfsApi
Expand Down Expand Up @@ -110,32 +108,22 @@ describeIfV3('Cross node syncing disabled', () => {
it('Stream created and updated on node with peer data sync disabled still loads via other well connected nodes', async () => {
const content0 = { step: 0 }
const content1 = { step: 1 }
const content2 = { step: 2 }
const doc1 = await TileDocument.create(disconnectedCeramic, content0, null, {
anchor: false,
})
await doc1.update(content1, null, { anchor: false })

const doc2 = await TileDocument.load(connectedCeramic, doc1.id)
expect(doc1.content).toEqual(doc2.content)

// Update should also propagate from node with sync disabled to the other node without issue
await doc1.update(content2, null, { anchor: false })

await TestUtils.waitForState(
doc2,
5000,
(state) => state.log.length == 3,
(state) => {
throw new Error(`Sync failed. State: ${StreamUtils.serializeState(state)}`)
}
)

expect(doc1.content).toEqual(content2)
expect(doc1.state.log.length).toEqual(3)

expect(doc2.content).toEqual(content2)
expect(doc2.state.log.length).toEqual(3)
// The disconnected node won't be listening to pubsub so the connected node will only get the
// genesis commit, not the tip from the update.
expect(doc2.content).toEqual(content0)

// Loading at the specific CommitID of the update will work though because the underlying
// commit blocks are still available via bitswap.
const docAtCommit = await TileDocument.load(connectedCeramic, doc1.commitId)
expect(docAtCommit.content).toEqual(content1)
expect(doc1.content).toEqual(docAtCommit.content)
expect(docAtCommit.state.log.length).toEqual(2)
})

it('Updates made on connected node not visible to node with peer data sync disabled', async () => {
Expand Down

0 comments on commit a1b0ead

Please sign in to comment.