Skip to content

Commit

Permalink
handle getProject calls failing on server side
Browse files Browse the repository at this point in the history
  • Loading branch information
achou11 committed Sep 18, 2023
1 parent 77cdf54 commit b7e98da
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 39 deletions.
22 changes: 17 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"multi-core-indexer": "^1.0.0-alpha.7",
"multicast-service-discovery": "^4.0.4",
"p-defer": "^4.0.0",
"p-timeout": "^6.1.2",
"private-ip": "^3.0.0",
"protobufjs": "^7.2.3",
"protomux": "^3.4.1",
Expand Down
49 changes: 41 additions & 8 deletions src/ipc-wrapper/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// @ts-check
import { createClient } from 'rpc-reflector'
import { MANAGER_CHANNEL_ID, SubChannel } from './sub-channel.js'
import pTimeout from 'p-timeout'
import { MANAGER_CHANNEL_ID, MAPEO_IPC_ID, SubChannel } from './sub-channel.js'

const CLOSE = Symbol('close')

Expand All @@ -12,17 +13,20 @@ export function createMapeoClient(messagePort) {
/** @type {Map<import('../types.js').ProjectPublicId, { instance: import('rpc-reflector/client.js').ClientApi<import('../mapeo-project.js').MapeoProject>, channel: SubChannel }>} */
const existingProjectClients = new Map()

const mapeoIpcChannel = new SubChannel(messagePort, MAPEO_IPC_ID)
mapeoIpcChannel.start()

const managerChannel = new SubChannel(messagePort, MANAGER_CHANNEL_ID)
managerChannel.start()

/** @type {import('rpc-reflector').ClientApi<import('../mapeo-manager.js').MapeoManager>} */
const managerClient = createClient(managerChannel)

managerChannel.start()

const client = new Proxy(managerClient, {
get(target, prop, receiver) {
if (prop === CLOSE) {
return () => {
mapeoIpcChannel.close()
managerChannel.close()
createClient.close(managerClient)
}
Expand All @@ -42,13 +46,44 @@ export function createMapeoClient(messagePort) {
* @param {import('../types.js').ProjectPublicId} projectPublicId
* @returns {Promise<import('rpc-reflector/client.js').ClientApi<import('../mapeo-project.js').MapeoProject>>}
*/
function createProjectClient(projectPublicId) {
async function createProjectClient(projectPublicId) {
const existingClient = existingProjectClients.get(projectPublicId)

if (existingClient) return Promise.resolve(existingClient.instance)
if (existingClient) return existingClient.instance

// Wait for the server to confirm that it could retrieve the desired project
await pTimeout(
new Promise((res, rej) => {
mapeoIpcChannel.on(
'message',
/**
* @param {import('./utils.js').MapeoIpcMessagePayload} payload
*/
function handleGetProjectResponse(payload) {
const { type, projectId, error } = payload

if (type !== 'get_project') return
if (projectId !== projectPublicId) return

mapeoIpcChannel.off('message', handleGetProjectResponse)

if (error) rej(new Error(error))
else res(null)
}
)

mapeoIpcChannel.postMessage({
type: 'get_project',
projectId: projectPublicId,
})
}),
{ milliseconds: 3000 }
)

const projectChannel = new SubChannel(messagePort, projectPublicId)

projectChannel.start()

/** @type {import('rpc-reflector').ClientApi<import('../mapeo-project.js').MapeoProject>} */
const projectClientProxy = new Proxy(createClient(projectChannel), {
get(target, prop, receiver) {
Expand All @@ -59,14 +94,12 @@ export function createMapeoClient(messagePort) {
},
})

projectChannel.start()

existingProjectClients.set(projectPublicId, {
instance: projectClientProxy,
channel: projectChannel,
})

return Promise.resolve(projectClientProxy)
return projectClientProxy
}
}

Expand Down
65 changes: 40 additions & 25 deletions src/ipc-wrapper/server.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// @ts-check
import { createServer } from 'rpc-reflector'
import { MANAGER_CHANNEL_ID, SubChannel } from './sub-channel.js'
import { extractMessageEventData } from './utils.js'
import { MANAGER_CHANNEL_ID, MAPEO_IPC_ID, SubChannel } from './sub-channel.js'

/**
* @param {import('../mapeo-manager.js').MapeoManager} manager
Expand All @@ -11,53 +10,69 @@ export function createMapeoServer(manager, messagePort) {
/** @type {Map<string, { close: () => void, channel: SubChannel }>} */
const existingProjectServers = new Map()

const managerChannel = new SubChannel(messagePort, MANAGER_CHANNEL_ID)
const mapeoIpcChannel = new SubChannel(messagePort, MAPEO_IPC_ID)
mapeoIpcChannel.start()

const managerServer = createServer(manager, managerChannel)
mapeoIpcChannel.on('message', handleGetProject)

const managerChannel = new SubChannel(messagePort, MANAGER_CHANNEL_ID)
managerChannel.start()

messagePort.addEventListener('message', handleMessageEvent)
const managerServer = createServer(manager, managerChannel)

return {
close() {
messagePort.removeEventListener('message', handleMessageEvent)
mapeoIpcChannel.off('message', handleGetProject)

for (const [id, server] of existingProjectServers.entries()) {
server.close()
server.channel.close()
existingProjectServers.delete(id)
}

mapeoIpcChannel.close()
managerServer.close()
managerChannel.close()
},
}

/**
* @param {any} payload
*
* @param {import('./utils.js').MapeoIpcMessagePayload} payload
*/
async function handleMessageEvent(payload) {
const data = extractMessageEventData(payload)

const id = data?.id

if (typeof id !== 'string' || id === MANAGER_CHANNEL_ID) return

if (existingProjectServers.has(id)) return

const projectChannel = new SubChannel(messagePort, id)

const project = await manager.getProject(
/** @type {import('../types.js').ProjectPublicId} */ (id)
)
async function handleGetProject(payload) {
const { type, projectId } = payload

if (type !== 'get_project') return
if (existingProjectServers.has(projectId)) return

let project
try {
project = await manager.getProject(
/** @type {import('../types.js').ProjectPublicId} */ (projectId)
)
} catch (err) {
mapeoIpcChannel.postMessage({
type: 'get_project',
projectId,
error:
err instanceof Error
? err.message
: `Could not get project with ID ${projectId}`,
})
return
}

const projectChannel = new SubChannel(messagePort, projectId)
projectChannel.start()

const { close } = createServer(project, projectChannel)

existingProjectServers.set(id, { close, channel: projectChannel })

projectChannel.emit('message', data.message)
existingProjectServers.set(projectId, { close, channel: projectChannel })

projectChannel.start()
mapeoIpcChannel.postMessage({
type: 'get_project',
projectId,
})
}
}
4 changes: 3 additions & 1 deletion src/ipc-wrapper/sub-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import { EventEmitter } from 'eventemitter3'
import { extractMessageEventData } from './utils.js'

// Ideally unique ID used for identifying "global" Mapeo IPC messages
export const MAPEO_IPC_ID = '@@mapeo'
// Ideally unique ID used for identifying messages specific to a MapeoManager instance
export const MANAGER_CHANNEL_ID = '@@manager'

/**
Expand Down Expand Up @@ -110,6 +113,5 @@ function isRelevantEvent(event) {
if (!event || typeof event !== 'object') return false
if (!('id' in event && 'message' in event)) return false
if (typeof event.id !== 'string') return false

return true
}
10 changes: 10 additions & 0 deletions src/ipc-wrapper/utils.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/**
* @typedef {Object} MapeoGetProjectPayload
* @property {'get_project'} type
* @property {string} projectId
* @property {string} [error]
*
* @typedef {MapeoGetProjectPayload} MapeoIpcMessagePayload
*
*/

/**
* @template T
* @param {T} event
Expand Down
11 changes: 11 additions & 0 deletions test-e2e/ipc-wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ test('IPC wrappers work', async (t) => {
return cleanup()
})

test('Attempting to get non-existent project fails', async (t) => {
const { client, cleanup } = setup()

await t.exception(async () => {
// @ts-expect-error
await client.getProject('mapeo')
})

return cleanup()
})

test('Client calls fail after server closes', async (t) => {
const { client, server, cleanup } = setup()

Expand Down

0 comments on commit b7e98da

Please sign in to comment.