Skip to content

Commit

Permalink
Merge branch 'main' into feat/IconHttpServer
Browse files Browse the repository at this point in the history
* main:
  fix: fix core storage initialization in MapeoManager (#367)
  feat: add `$sync` API methods (#361)
  feat: `listLocalPeers()` & `local-peers` event (#360)
  feat: integrate LocalDiscovery & LocalPeers (#358)
  • Loading branch information
gmaclennan committed Nov 9, 2023
2 parents f4d0d28 + 75ea34a commit cc76864
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 109 deletions.
21 changes: 15 additions & 6 deletions src/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux<import('@hyperswarm/secret-stream')> }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
Expand All @@ -57,7 +57,7 @@ class Peer {
#name
#connectedAt = 0
#disconnectedAt = 0
/** @type {Protomux} */
/** @type {Protomux<import('@hyperswarm/secret-stream')>} */
#protomux

/**
Expand Down Expand Up @@ -103,7 +103,7 @@ class Peer {
}
}
}
/** @param {Protomux} protomux */
/** @param {Protomux<import('@hyperswarm/secret-stream')>} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
Expand Down Expand Up @@ -166,7 +166,9 @@ class Peer {
/**
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter)
*/

/** @extends {TypedEmitter<LocalPeersEvents>} */
Expand Down Expand Up @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter {
stream.userData = protomux
this.#opening.add(stream.opened)

protomux.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.emit('discovery-key', discoveryKey, stream.rawStream)
}
)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

Expand Down Expand Up @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter {

/**
* @param {Buffer} publicKey
* @param {Protomux} protomux
* @param {Protomux<import('@hyperswarm/secret-stream')>} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
const wasConnected = peer.info.status === 'connected'
peer.connect(protomux)
if (!wasConnected) this.#emitPeers()
this.#emitPeers()
this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info))
}

/** @param {Buffer} publicKey */
Expand Down
125 changes: 105 additions & 20 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import { ProjectKeys } from './generated/keys.js'
import {
deNullify,
getDeviceId,
keyToId,
openedNoiseSecretStream,
projectIdToNonce,
projectKeyToId,
projectKeyToPublicId,
} from './utils.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'
import { TypedEmitter } from 'tiny-typed-emitter'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand All @@ -36,8 +40,21 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db'
const MAX_FILE_DESCRIPTORS = 768

export const kRPC = Symbol('rpc')
export const kManagerReplicate = Symbol('replicate manager')

export class MapeoManager {
/**
* @typedef {Omit<import('./local-peers.js').PeerInfo, 'protomux'>} PublicPeerInfo
*/

/**
* @typedef {object} MapeoManagerEvents
* @property {(peers: PublicPeerInfo[]) => void} local-peers Emitted when the list of connected peers changes (new ones added, or connection status changes)
*/

/**
* @extends {TypedEmitter<MapeoManagerEvents>}
*/
export class MapeoManager extends TypedEmitter {
#keyManager
#projectSettingsIndexWriter
#db
Expand All @@ -48,8 +65,9 @@ export class MapeoManager {
#coreStorage
#dbFolder
#deviceId
#rpc
#localPeers
#invite
#localDiscovery

/**
* @param {Object} opts
Expand All @@ -58,6 +76,7 @@ export class MapeoManager {
* @param {string | import('./types.js').CoreStorage} opts.coreStorage Folder for hypercore storage or a function that returns a RandomAccessStorage instance
*/
constructor({ rootKey, dbFolder, coreStorage }) {
super()
this.#dbFolder = dbFolder
const sqlite = new Database(
dbFolder === ':memory:'
Expand All @@ -69,7 +88,11 @@ export class MapeoManager {
migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname,
})

this.#rpc = new LocalPeers()
this.#localPeers = new LocalPeers()
this.#localPeers.on('peers', (peers) => {
this.emit('local-peers', omitPeerProtomux(peers))
})

this.#keyManager = new KeyManager(rootKey)
this.#deviceId = getDeviceId(this.#keyManager)
this.#projectSettingsIndexWriter = new IndexWriter({
Expand All @@ -79,7 +102,7 @@ export class MapeoManager {
this.#activeProjects = new Map()

this.#invite = new InviteApi({
rpc: this.#rpc,
rpc: this.#localPeers,
queries: {
isMember: async (projectId) => {
const projectExists = this.#db
Expand All @@ -98,18 +121,47 @@ export class MapeoManager {

if (typeof coreStorage === 'string') {
const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS)
// @ts-ignore
this.#coreStorage = Hypercore.createStorage(coreStorage, { pool })
// @ts-expect-error
this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool })
} else {
this.#coreStorage = coreStorage
}

this.#localDiscovery = new LocalDiscovery({
identityKeypair: this.#keyManager.getIdentityKeypair(),
})
this.#localDiscovery.on('connection', this[kManagerReplicate].bind(this))
}

/**
* MapeoRPC instance, used for tests
*/
get [kRPC]() {
return this.#rpc
return this.#localPeers
}

/**
* Replicate Mapeo to a `@hyperswarm/secret-stream`. This replication connects
* the Mapeo RPC channel and allows invites. All active projects will sync
* automatically to this replication stream. Only use for local (trusted)
* connections, because the RPC channel key is public. To sync a specific
* project without connecting RPC, use project[kProjectReplication].
*
* @param {import('@hyperswarm/secret-stream')<any>} noiseStream
*/
[kManagerReplicate](noiseStream) {
const replicationStream = this.#localPeers.connect(noiseStream)
Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)])
.then(([{ name }, openedNoiseStream]) => {
if (openedNoiseStream.destroyed || !name) return
const peerId = keyToId(openedNoiseStream.remotePublicKey)
return this.#localPeers.sendDeviceInfo(peerId, { name })
})
.catch((e) => {
// Ignore error but log
console.error('Failed to send device info to peer', e)
})
return replicationStream
}

/**
Expand Down Expand Up @@ -205,15 +257,10 @@ export class MapeoManager {
})

// 4. Create MapeoProject instance
const project = new MapeoProject({
...this.#projectStorage(projectId),
const project = this.#createProjectInstance({
encryptionKeys,
keyManager: this.#keyManager,
projectKey: projectKeypair.publicKey,
projectSecretKey: projectKeypair.secretKey,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
})

// 5. Write project name and any other relevant metadata to project instance
Expand Down Expand Up @@ -263,19 +310,25 @@ export class MapeoManager {
projectId
)

const project = new MapeoProject({
const project = this.#createProjectInstance(projectKeys)

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/** @param {ProjectKeys} projectKeys */
#createProjectInstance(projectKeys) {
const projectId = keyToId(projectKeys.projectKey)
return new MapeoProject({
...this.#projectStorage(projectId),
...projectKeys,
keyManager: this.#keyManager,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
localPeers: this.#localPeers,
})

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/**
Expand Down Expand Up @@ -425,4 +478,36 @@ export class MapeoManager {
get invite() {
return this.#invite
}

/**
* @returns {Promise<PublicPeerInfo[]>}
*/
async listLocalPeers() {
return omitPeerProtomux(this.#localPeers.peers)
}
}

// We use the `protomux` property of connected peers internally, but we don't
// expose it to the API. I have avoided using a private symbol for this for fear
// that we could accidentally keep references around of protomux instances,
// which could cause a memory leak (it shouldn't, but just to eliminate the
// possibility)

/**
* Remove the protomux property of connected peers
*
* @param {import('./local-peers.js').PeerInfo[]} peers
* @returns {PublicPeerInfo[]}
*/
function omitPeerProtomux(peers) {
return peers.map(
({
// @ts-ignore
// eslint-disable-next-line no-unused-vars
protomux,
...publicPeerInfo
}) => {
return publicPeerInfo
}
)
}
Loading

0 comments on commit cc76864

Please sign in to comment.