From 3b022674cecfcc1b42eaba27921fd10833a671d8 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 30 Nov 2023 21:22:00 +0900 Subject: [PATCH 1/5] feat: dataType.on('updated-docs') when docs update Emits an array of updated docs for the data type whenever the docs update (rather than emitting one doc at a time, it waits for indexing to complete, then emits all the docs that have been updated. If there are several updates to a particular docId, then only the resolved `head` version is returned). Attaching this listener to a data type that has many values / is frequently updated will have a performance impact, because large amounts of data could be read from the database and there is no limit, e.g. if this was attached to the `observation` dataType and a new device synced with a project with 10,000 observations, an attached listener would be called with 10,000 observations. The main reason for this feature is for internal use for capabilities, listening for changes and modifying permissions accordingly. --- src/datastore/index.js | 60 ++++++++++++++++++++++---------- src/datatype/index.d.ts | 6 +++- src/datatype/index.js | 41 ++++++++++++++++++++-- src/index-writer/index.js | 18 +++++++--- src/mapeo-project.js | 7 ++-- src/sync/peer-sync-controller.js | 9 ++--- src/types.ts | 8 +++++ src/utils.js | 16 +++++++++ test-e2e/project-crud.js | 14 +++++--- test-types/data-types.ts | 12 +++++++ 10 files changed, 152 insertions(+), 39 deletions(-) diff --git a/src/datastore/index.js b/src/datastore/index.js index c02bb098..9583f3b3 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -3,10 +3,8 @@ import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema' import MultiCoreIndexer from 'multi-core-indexer' import pDefer from 'p-defer' import { discoveryKey } from 'hypercore-crypto' +import { createMap } from '../utils.js' -/** - * @typedef {import('multi-core-indexer').IndexEvents} IndexEvents - */ /** * @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc */ @@ -14,9 +12,14 @@ import { discoveryKey } from 'hypercore-crypto' * @typedef {import('../datatype/index.js').MapeoDocTablesMap} MapeoDocTablesMap */ /** - * @typedef {object} DefaultEmitterEvents - * @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} newListener - * @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} removeListener + * For each schemaName in this dataStore, emits an array of docIds that have + * been indexed whenever indexing finishes (becomes idle). `projectSettings` + * currently not supported. + * + * @template {MapeoDoc['schemaName']} TSchemaName + * @typedef {{ + * [S in Exclude]: (docIds: Set) => void + * }} DataStoreEvents */ /** * @template T @@ -37,7 +40,7 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({ /** * @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas] * @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]] - * @extends {TypedEmitter} + * @extends {TypedEmitter>} */ export class DataStore extends TypedEmitter { #coreManager @@ -49,12 +52,14 @@ export class DataStore extends TypedEmitter { #pendingIndex = new Map() /** @type {Set['promise']>} */ #pendingAppends = new Set() + /** @type {Record>} */ + #pendingEmits /** * @param {object} opts * @param {import('../core-manager/index.js').CoreManager} opts.coreManager * @param {TNamespace} opts.namespace - * @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise} opts.batch + * @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise} opts.batch * @param {MultiCoreIndexer.StorageParam} opts.storage */ constructor({ coreManager, namespace, batch, storage }) { @@ -62,6 +67,10 @@ export class DataStore extends TypedEmitter { this.#coreManager = coreManager this.#namespace = namespace this.#batch = batch + this.#pendingEmits = createMap( + NAMESPACE_SCHEMAS[namespace], + () => new Set() + ) this.#writerCore = coreManager.getWriterCore(namespace).core const cores = coreManager.getCores(namespace).map((cr) => cr.core) this.#coreIndexer = new MultiCoreIndexer(cores, { @@ -72,16 +81,7 @@ export class DataStore extends TypedEmitter { if (coreRecord.namespace !== namespace) return this.#coreIndexer.addCore(coreRecord.core) }) - - // Forward events from coreIndexer - this.on('newListener', (eventName, listener) => { - if (['newListener', 'removeListener'].includes(eventName)) return - this.#coreIndexer.on(eventName, listener) - }) - this.on('removeListener', (eventName, listener) => { - if (['newListener', 'removeListener'].includes(eventName)) return - this.#coreIndexer.off(eventName, listener) - }) + this.#coreIndexer.on('idle', this.#handleIndexerIdle) } get indexer() { @@ -110,7 +110,7 @@ export class DataStore extends TypedEmitter { * @param {MultiCoreIndexer.Entry<'binary'>[]} entries */ async #handleEntries(entries) { - await this.#batch(entries) + const indexed = await this.#batch(entries) await Promise.all(this.#pendingAppends) // Writes to the writerCore need to wait until the entry is indexed before // returning, so we check if any incoming entry has a pending promise @@ -124,6 +124,18 @@ export class DataStore extends TypedEmitter { if (!pending) continue pending.resolve() } + for (const schemaName of this.schemas) { + // Unsupported initially + if (schemaName === 'projectSettings') continue + const docIds = indexed[schemaName] + if (!docIds) continue + for (const docId of docIds) { + // We'll only emit these once the indexer is idle - a particular docId + // could have many updates, and we only emit it once after indexing + // @ts-ignore + this.#pendingEmits[schemaName].add(docId) + } + } } /** @@ -207,4 +219,14 @@ export class DataStore extends TypedEmitter { if (!block) throw new Error('Not Found') return block } + + #handleIndexerIdle = () => { + for (const eventName of this.eventNames()) { + const docIds = this.#pendingEmits[eventName] + if (!docIds.size) continue + // @ts-ignore - I'm pretty sure TS is just not smart enough here, it's not me! + this.emit(eventName, docIds) + docIds.clear() + } + } } diff --git a/src/datatype/index.d.ts b/src/datatype/index.d.ts index 9397857b..3518f4c7 100644 --- a/src/datatype/index.d.ts +++ b/src/datatype/index.d.ts @@ -12,6 +12,7 @@ import { type BetterSQLite3Database } from 'drizzle-orm/better-sqlite3' import { SQLiteSelectBuilder } from 'drizzle-orm/sqlite-core' import { RunResult } from 'better-sqlite3' import type Hypercore from 'hypercore' +import { TypedEmitter } from 'tiny-typed-emitter' type MapeoDocTableName = `${MapeoDoc['schemaName']}Table` type GetMapeoDocTables = T[keyof T & MapeoDocTableName] @@ -25,6 +26,9 @@ type MapeoDocTablesMap = { { _: { name: K } } > } +export interface DataTypeEvents { + 'updated-docs': (docs: TDoc[]) => void +} export const kCreateWithDocId: unique symbol export const kSelect: unique symbol @@ -45,7 +49,7 @@ export class DataType< TSchemaName extends TTable['_']['name'], TDoc extends MapeoDocMap[TSchemaName], TValue extends MapeoValueMap[TSchemaName] -> { +> extends TypedEmitter> { constructor({ dataStore, table, diff --git a/src/datatype/index.js b/src/datatype/index.js index c072a1f4..a687eced 100644 --- a/src/datatype/index.js +++ b/src/datatype/index.js @@ -1,10 +1,11 @@ // @ts-check import { validate } from '@mapeo/schema' import { getTableConfig } from 'drizzle-orm/sqlite-core' -import { eq, placeholder } from 'drizzle-orm' +import { eq, inArray, placeholder } from 'drizzle-orm' import { randomBytes } from 'node:crypto' import { deNullify } from '../utils.js' import crypto from 'hypercore-crypto' +import { TypedEmitter } from 'tiny-typed-emitter' /** * @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc @@ -37,6 +38,11 @@ import crypto from 'hypercore-crypto' * @template {keyof any} K * @typedef {T extends any ? Omit : never} OmitUnion */ +/** + * @template {MapeoDoc} TDoc + * @typedef {object} DataTypeEvents + * @property {(docs: TDoc[]) => void} updated-docs + */ function generateId() { return randomBytes(32).toString('hex') @@ -54,8 +60,9 @@ export const kTable = Symbol('table') * @template {MapeoDocTablesMap[TSchemaName]} TTable * @template {Exclude} TDoc * @template {Exclude} TValue + * @extends {TypedEmitter & import('../types.js').DefaultEmitterEvents>>} */ -export class DataType { +export class DataType extends TypedEmitter { #dataStore #table #getPermissions @@ -72,6 +79,7 @@ export class DataType { * @param {() => any} [opts.getPermissions] */ constructor({ dataStore, table, getPermissions, db }) { + super() this.#dataStore = dataStore this.#table = table this.#schemaName = /** @type {TSchemaName} */ (getTableConfig(table).name) @@ -85,6 +93,19 @@ export class DataType { .prepare(), getMany: db.select().from(table).prepare(), } + this.on('newListener', (eventName) => { + if (eventName !== 'updated-docs') return + if (this.listenerCount('updated-docs') > 1) return + if (this.#schemaName === 'projectSettings') return + // Avoid adding a listener to the dataStore unless we need to (e.g. this has a listener attached), for performance reasons. + this.#dataStore.on(this.#schemaName, this.#handleDataStoreUpdate) + }) + this.on('removeListener', (eventName) => { + if (eventName !== 'updated-docs') return + if (this.listenerCount('updated-docs') > 0) return + if (this.#schemaName === 'projectSettings') return + this.#dataStore.off(this.#schemaName, this.#handleDataStoreUpdate) + }) } get [kTable]() { @@ -236,4 +257,20 @@ export class DataType { } return { docId, createdAt, createdBy } } + + /** + * @param {Set} docIds + */ + #handleDataStoreUpdate = (docIds) => { + if (this.listenerCount('updated-docs') === 0) return + const updatedDocs = /** @type {TDoc[]} */ ( + this.#db + .select() + .from(this.#table) + .where(inArray(this.#table.docId, [...docIds])) + .all() + .map((doc) => deNullify(doc)) + ) + this.emit('updated-docs', updatedDocs) + } } diff --git a/src/index-writer/index.js b/src/index-writer/index.js index a48c5fde..05b8101c 100644 --- a/src/index-writer/index.js +++ b/src/index-writer/index.js @@ -11,6 +11,9 @@ import { Logger } from '../logger.js' /** * @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc */ +/** + * @typedef {{ [K in MapeoDoc['schemaName']]?: string[] }} IndexedDocIds + */ /** * @typedef {ReturnType} MapeoDocInternal */ @@ -54,14 +57,16 @@ export class IndexWriter { } /** - * * @param {import('multi-core-indexer').Entry[]} entries + * @returns {Promise} map of indexed docIds by schemaName */ async batch(entries) { // sqlite-indexer is _significantly_ faster when batching even <10 at a // time, so best to queue docs here before calling sliteIndexer.batch() /** @type {Record} */ const queued = {} + /** @type {IndexedDocIds} */ + const indexed = {} for (const { block, key, index } of entries) { try { const version = { coreDiscoveryKey: discoveryKey(key), index } @@ -71,19 +76,21 @@ export class IndexWriter { // Unknown or invalid entry - silently ignore continue } + // Don't have an indexer for this type - silently ignore + if (!this.#indexers.has(doc.schemaName)) continue if (queued[doc.schemaName]) { queued[doc.schemaName].push(doc) + // @ts-expect-error - we know this is defined, TS doesn't + indexed[doc.schemaName].push(doc.docId) } else { queued[doc.schemaName] = [doc] + indexed[doc.schemaName] = [doc.docId] } } for (const [schemaName, docs] of Object.entries(queued)) { // @ts-expect-error const indexer = this.#indexers.get(schemaName) - if (!indexer) { - // Don't have an indexer for this type - silently ignore - continue - } + if (!indexer) continue // Won't happen, but TS doesn't know that indexer.batch(docs) if (this.#l.enabled) { for (const doc of docs) { @@ -96,5 +103,6 @@ export class IndexWriter { } } } + return indexed } } diff --git a/src/mapeo-project.js b/src/mapeo-project.js index 335af9bd..9358da21 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -365,11 +365,14 @@ export class MapeoProject { // Ignore errors thrown by values that can't be decoded for now } } - - await Promise.all([ + // TODO: Note that docs indexed to the shared index writer (project + // settings) are not currently returned here, so it is not possible to + // subscribe to updates for projectSettings + const [indexed] = await Promise.all([ projectIndexWriter.batch(otherEntries), sharedIndexWriter.batch(projectSettingsEntries), ]) + return indexed } get observation() { diff --git a/src/sync/peer-sync-controller.js b/src/sync/peer-sync-controller.js index d1d55aa0..fcf3a944 100644 --- a/src/sync/peer-sync-controller.js +++ b/src/sync/peer-sync-controller.js @@ -1,6 +1,7 @@ import mapObject from 'map-obj' import { NAMESPACES } from '../core-manager/index.js' import { Logger } from '../logger.js' +import { createMap } from '../utils.js' /** * @typedef {import('../core-manager/index.js').Namespace} Namespace @@ -304,11 +305,7 @@ function getSyncStatus(peerId, state) { /** * @template T * @param {T} value - * @returns {Record} */ + **/ function createNamespaceMap(value) { - const map = /** @type {Record} */ ({}) - for (const ns of NAMESPACES) { - map[ns] = value - } - return map + return createMap(NAMESPACES, value) } diff --git a/src/types.ts b/src/types.ts index 40322e36..b0c04c74 100644 --- a/src/types.ts +++ b/src/types.ts @@ -15,6 +15,7 @@ import MultiCoreIndexer from 'multi-core-indexer' import Corestore from 'corestore' import Hypercore from 'hypercore' import RandomAccessStorage from 'random-access-storage' +import { DefaultListener, ListenerSignature } from 'tiny-typed-emitter' type SupportedBlobVariants = typeof SUPPORTED_BLOB_VARIANTS export type BlobType = keyof SupportedBlobVariants @@ -184,3 +185,10 @@ export type Entries = { }[keyof T][] export type CoreStorage = (name: string) => RandomAccessStorage + +export type DefaultEmitterEvents< + L extends ListenerSignature = DefaultListener +> = { + newListener: (event: keyof L, listener: L[keyof L]) => void + removeListener: (event: keyof L, listener: L[keyof L]) => void +} diff --git a/src/utils.js b/src/utils.js index 1da1b0d5..9b4d052a 100644 --- a/src/utils.js +++ b/src/utils.js @@ -139,3 +139,19 @@ export function projectIdToNonce(projectId) { export function getDeviceId(keyManager) { return keyManager.getIdentityKeypair().publicKey.toString('hex') } + +/** + * Small helper to create a typed map + * + * @template {string} K + * @template {any} V + * @param {ReadonlyArray} keys + * @param {V} value + * @returns {Record infer T ? T : V>} */ +export function createMap(keys, value) { + const map = /** @type {Record infer T ? T : V>} */ ({}) + for (const key of keys) { + map[key] = typeof value === 'function' ? value() : value + } + return map +} diff --git a/test-e2e/project-crud.js b/test-e2e/project-crud.js index 891fc1b0..d30340b8 100644 --- a/test-e2e/project-crud.js +++ b/test-e2e/project-crud.js @@ -1,7 +1,7 @@ import { test } from 'brittle' import { randomBytes } from 'crypto' import { valueOf } from '../src/utils.js' -import { createManager, stripUndef } from './utils.js' +import { createManager, sortById, stripUndef } from './utils.js' import { round } from './utils.js' import { generate } from '@mapeo/mock-data' import { setTimeout as delay } from 'timers/promises' @@ -73,12 +73,13 @@ test('CRUD operations', async (t) => { await t.test(`create and read ${schemaName}`, async (st) => { const projectId = await manager.createProject() const project = await manager.getProject(projectId) - const values = [] + /** @type {any[]} */ + const updates = [] const writePromises = [] + project[schemaName].on('updated-docs', (docs) => updates.push(...docs)) let i = 0 while (i++ < CREATE_COUNT) { const value = valueOf(generate(schemaName)[0]) - values.push(value) writePromises.push( // @ts-ignore project[schemaName].create(value) @@ -88,7 +89,12 @@ test('CRUD operations', async (t) => { const read = await Promise.all( written.map((doc) => project[schemaName].getByDocId(doc.docId)) ) - st.alike(written, read, 'return create() matches return of getByDocId()') + st.alike( + sortById(written), + sortById(read), + 'return create() matches return of getByDocId()' + ) + st.alike(sortById(updates), sortById(written), 'updated-docs emitted') }) await t.test('update', async (st) => { const projectId = await manager.createProject() diff --git a/test-types/data-types.ts b/test-types/data-types.ts index 086828c1..b5f21bc5 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -67,6 +67,10 @@ const observationByVersionId = await mapeoProject.observation.getByVersionId( ) Expect> +mapeoProject.observation.on('updated-docs', (docs) => { + Expect> +}) + ///// Presets const createdPreset = await mapeoProject.preset.create({} as PresetValue) @@ -84,6 +88,10 @@ Expect> const presetByVersionId = await mapeoProject.preset.getByVersionId('abc') Expect> +mapeoProject.preset.on('updated-docs', (docs) => { + Expect> +}) + ///// Fields const createdField = await mapeoProject.field.create({} as FieldValue) @@ -100,3 +108,7 @@ Expect> const fieldByVersionId = await mapeoProject.field.getByVersionId('abc') Expect> + +mapeoProject.field.on('updated-docs', (docs) => { + Expect> +}) From 70d7ba04cc7be672d3a06201dbf718dd15d12318 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 30 Nov 2023 21:45:33 +0900 Subject: [PATCH 2/5] fix: make sure we clear pending emits --- src/datastore/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/datastore/index.js b/src/datastore/index.js index 9583f3b3..30846be6 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -226,7 +226,8 @@ export class DataStore extends TypedEmitter { if (!docIds.size) continue // @ts-ignore - I'm pretty sure TS is just not smart enough here, it's not me! this.emit(eventName, docIds) - docIds.clear() } + // Make sure we reset this, otherwise we'd get a memory leak of this growing without bounds. + this.#pendingEmits = createMap(this.schemas, () => new Set()) } } From 976183f136c4e37d7e6e6b7df011832a89c6da97 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 30 Nov 2023 22:37:05 +0900 Subject: [PATCH 3/5] fix datastore test --- src/datastore/index.js | 1 + tests/datastore.js | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/datastore/index.js b/src/datastore/index.js index 30846be6..560b18c5 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -222,6 +222,7 @@ export class DataStore extends TypedEmitter { #handleIndexerIdle = () => { for (const eventName of this.eventNames()) { + if (!(eventName in this.#pendingEmits)) continue const docIds = this.#pendingEmits[eventName] if (!docIds.size) continue // @ts-ignore - I'm pretty sure TS is just not smart enough here, it's not me! diff --git a/tests/datastore.js b/tests/datastore.js index cfde7a65..765fc942 100644 --- a/tests/datastore.js +++ b/tests/datastore.js @@ -36,6 +36,7 @@ test('read and write', async (t) => { const versionId = getVersionId({ coreDiscoveryKey, index }) indexedVersionIds.push(versionId) } + return {} }, storage: () => new RAM(), }) @@ -69,6 +70,7 @@ test('writeRaw and read', async (t) => { namespace: 'config', batch: async () => { await new Promise((res) => setTimeout(res, 10)) + return {} }, storage: () => new RAM(), }) @@ -88,15 +90,16 @@ test('index events', async (t) => { namespace: 'data', batch: async () => { await new Promise((res) => setTimeout(res, 10)) + return {} }, storage: () => new RAM(), }) - dataStore.on('index-state', (state) => { + dataStore.indexer.on('index-state', (state) => { // eslint-disable-next-line no-unused-vars const { entriesPerSecond, ...rest } = state indexStates.push(rest) }) - const idlePromise = once(dataStore, 'idle') + const idlePromise = once(dataStore.indexer, 'idle') await dataStore.write(obs) await idlePromise const expectedStates = [ From 98a6aa70cd9dca452ccb923916c4b8591f144496 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 30 Nov 2023 23:51:13 +0900 Subject: [PATCH 4/5] attempt to fix flakey test --- test-e2e/sync.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-e2e/sync.js b/test-e2e/sync.js index ffd9fe6f..27d4a55d 100644 --- a/test-e2e/sync.js +++ b/test-e2e/sync.js @@ -252,9 +252,9 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) => t.alike(invitorState[ns].localState, inviteeState[ns].localState) } + await disconnect1() + // Temp fix until we have .close() method - waits for indexing idle to ensure // we don't close storage in teardown while index is still being written. await Promise.all(projects.map((p) => p.$getProjectSettings())) - - await disconnect1() }) From 60d57832474053d8590d04f04fe29c3ca1e8cb11 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Fri, 1 Dec 2023 08:00:23 +0900 Subject: [PATCH 5/5] Update src/datastore/index.js Co-authored-by: Andrew Chou --- src/datastore/index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/datastore/index.js b/src/datastore/index.js index 560b18c5..acbcd748 100644 --- a/src/datastore/index.js +++ b/src/datastore/index.js @@ -132,7 +132,6 @@ export class DataStore extends TypedEmitter { for (const docId of docIds) { // We'll only emit these once the indexer is idle - a particular docId // could have many updates, and we only emit it once after indexing - // @ts-ignore this.#pendingEmits[schemaName].add(docId) } }