Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dataType.on('updated-docs') when docs update #396

Merged
merged 5 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions src/datastore/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@ import { encode, decode, getVersionId, parseVersionId } from '@mapeo/schema'
import MultiCoreIndexer from 'multi-core-indexer'
import pDefer from 'p-defer'
import { discoveryKey } from 'hypercore-crypto'
import { createMap } from '../utils.js'

/**
* @typedef {import('multi-core-indexer').IndexEvents} IndexEvents
*/
/**
* @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc
*/
/**
* @typedef {import('../datatype/index.js').MapeoDocTablesMap} MapeoDocTablesMap
*/
/**
* @typedef {object} DefaultEmitterEvents
* @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} newListener
* @property {(eventName: keyof IndexEvents, listener: (...args: any[]) => any) => void} removeListener
* For each schemaName in this dataStore, emits an array of docIds that have
* been indexed whenever indexing finishes (becomes idle). `projectSettings`
* currently not supported.
*
* @template {MapeoDoc['schemaName']} TSchemaName
* @typedef {{
* [S in Exclude<TSchemaName, 'projectSettings'>]: (docIds: Set<string>) => void
* }} DataStoreEvents
*/
/**
* @template T
Expand All @@ -37,7 +40,7 @@ const NAMESPACE_SCHEMAS = /** @type {const} */ ({
/**
* @template {keyof NamespaceSchemas} [TNamespace=keyof NamespaceSchemas]
* @template {NamespaceSchemas[TNamespace][number]} [TSchemaName=NamespaceSchemas[TNamespace][number]]
* @extends {TypedEmitter<IndexEvents & DefaultEmitterEvents>}
* @extends {TypedEmitter<DataStoreEvents<TSchemaName>>}
*/
export class DataStore extends TypedEmitter {
#coreManager
Expand All @@ -49,19 +52,25 @@ export class DataStore extends TypedEmitter {
#pendingIndex = new Map()
/** @type {Set<import('p-defer').DeferredPromise<void>['promise']>} */
#pendingAppends = new Set()
/** @type {Record<MapeoDoc['schemaName'], Set<string>>} */
#pendingEmits

/**
* @param {object} opts
* @param {import('../core-manager/index.js').CoreManager} opts.coreManager
* @param {TNamespace} opts.namespace
* @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise<void>} opts.batch
* @param {(entries: MultiCoreIndexer.Entry<'binary'>[]) => Promise<import('../index-writer/index.js').IndexedDocIds>} opts.batch
* @param {MultiCoreIndexer.StorageParam} opts.storage
*/
constructor({ coreManager, namespace, batch, storage }) {
super()
this.#coreManager = coreManager
this.#namespace = namespace
this.#batch = batch
this.#pendingEmits = createMap(
NAMESPACE_SCHEMAS[namespace],
() => new Set()
)
this.#writerCore = coreManager.getWriterCore(namespace).core
const cores = coreManager.getCores(namespace).map((cr) => cr.core)
this.#coreIndexer = new MultiCoreIndexer(cores, {
Expand All @@ -72,16 +81,7 @@ export class DataStore extends TypedEmitter {
if (coreRecord.namespace !== namespace) return
this.#coreIndexer.addCore(coreRecord.core)
})

// Forward events from coreIndexer
this.on('newListener', (eventName, listener) => {
if (['newListener', 'removeListener'].includes(eventName)) return
this.#coreIndexer.on(eventName, listener)
})
this.on('removeListener', (eventName, listener) => {
if (['newListener', 'removeListener'].includes(eventName)) return
this.#coreIndexer.off(eventName, listener)
})
this.#coreIndexer.on('idle', this.#handleIndexerIdle)
}

get indexer() {
Expand Down Expand Up @@ -110,7 +110,7 @@ export class DataStore extends TypedEmitter {
* @param {MultiCoreIndexer.Entry<'binary'>[]} entries
*/
async #handleEntries(entries) {
await this.#batch(entries)
const indexed = await this.#batch(entries)
await Promise.all(this.#pendingAppends)
// Writes to the writerCore need to wait until the entry is indexed before
// returning, so we check if any incoming entry has a pending promise
Expand All @@ -124,6 +124,17 @@ export class DataStore extends TypedEmitter {
if (!pending) continue
pending.resolve()
}
for (const schemaName of this.schemas) {
// Unsupported initially
if (schemaName === 'projectSettings') continue
const docIds = indexed[schemaName]
if (!docIds) continue
for (const docId of docIds) {
// We'll only emit these once the indexer is idle - a particular docId
// could have many updates, and we only emit it once after indexing
this.#pendingEmits[schemaName].add(docId)
}
}
}

/**
Expand Down Expand Up @@ -207,4 +218,16 @@ export class DataStore extends TypedEmitter {
if (!block) throw new Error('Not Found')
return block
}

#handleIndexerIdle = () => {
for (const eventName of this.eventNames()) {
if (!(eventName in this.#pendingEmits)) continue
const docIds = this.#pendingEmits[eventName]
if (!docIds.size) continue
// @ts-ignore - I'm pretty sure TS is just not smart enough here, it's not me!
this.emit(eventName, docIds)
}
// Make sure we reset this, otherwise we'd get a memory leak of this growing without bounds.
this.#pendingEmits = createMap(this.schemas, () => new Set())
}
}
6 changes: 5 additions & 1 deletion src/datatype/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { type BetterSQLite3Database } from 'drizzle-orm/better-sqlite3'
import { SQLiteSelectBuilder } from 'drizzle-orm/sqlite-core'
import { RunResult } from 'better-sqlite3'
import type Hypercore from 'hypercore'
import { TypedEmitter } from 'tiny-typed-emitter'

type MapeoDocTableName = `${MapeoDoc['schemaName']}Table`
type GetMapeoDocTables<T> = T[keyof T & MapeoDocTableName]
Expand All @@ -25,6 +26,9 @@ type MapeoDocTablesMap = {
{ _: { name: K } }
>
}
export interface DataTypeEvents<TDoc extends MapeoDoc> {
'updated-docs': (docs: TDoc[]) => void
}

export const kCreateWithDocId: unique symbol
export const kSelect: unique symbol
Expand All @@ -45,7 +49,7 @@ export class DataType<
TSchemaName extends TTable['_']['name'],
TDoc extends MapeoDocMap[TSchemaName],
TValue extends MapeoValueMap[TSchemaName]
> {
> extends TypedEmitter<DataTypeEvents<TDoc>> {
constructor({
dataStore,
table,
Expand Down
41 changes: 39 additions & 2 deletions src/datatype/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// @ts-check
import { validate } from '@mapeo/schema'
import { getTableConfig } from 'drizzle-orm/sqlite-core'
import { eq, placeholder } from 'drizzle-orm'
import { eq, inArray, placeholder } from 'drizzle-orm'
import { randomBytes } from 'node:crypto'
import { deNullify } from '../utils.js'
import crypto from 'hypercore-crypto'
import { TypedEmitter } from 'tiny-typed-emitter'

/**
* @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc
Expand Down Expand Up @@ -37,6 +38,11 @@ import crypto from 'hypercore-crypto'
* @template {keyof any} K
* @typedef {T extends any ? Omit<T, K> : never} OmitUnion
*/
/**
* @template {MapeoDoc} TDoc
* @typedef {object} DataTypeEvents
* @property {(docs: TDoc[]) => void} updated-docs
*/

function generateId() {
return randomBytes(32).toString('hex')
Expand All @@ -54,8 +60,9 @@ export const kTable = Symbol('table')
* @template {MapeoDocTablesMap[TSchemaName]} TTable
* @template {Exclude<MapeoDocMap[TSchemaName], { schemaName: 'coreOwnership' }>} TDoc
* @template {Exclude<MapeoValueMap[TSchemaName], { schemaName: 'coreOwnership' }>} TValue
* @extends {TypedEmitter<DataTypeEvents<TDoc> & import('../types.js').DefaultEmitterEvents<DataTypeEvents<TDoc>>>}
*/
export class DataType {
export class DataType extends TypedEmitter {
#dataStore
#table
#getPermissions
Expand All @@ -72,6 +79,7 @@ export class DataType {
* @param {() => any} [opts.getPermissions]
*/
constructor({ dataStore, table, getPermissions, db }) {
super()
this.#dataStore = dataStore
this.#table = table
this.#schemaName = /** @type {TSchemaName} */ (getTableConfig(table).name)
Expand All @@ -85,6 +93,19 @@ export class DataType {
.prepare(),
getMany: db.select().from(table).prepare(),
}
this.on('newListener', (eventName) => {
if (eventName !== 'updated-docs') return
if (this.listenerCount('updated-docs') > 1) return
if (this.#schemaName === 'projectSettings') return
// Avoid adding a listener to the dataStore unless we need to (e.g. this has a listener attached), for performance reasons.
this.#dataStore.on(this.#schemaName, this.#handleDataStoreUpdate)
})
this.on('removeListener', (eventName) => {
if (eventName !== 'updated-docs') return
if (this.listenerCount('updated-docs') > 0) return
if (this.#schemaName === 'projectSettings') return
this.#dataStore.off(this.#schemaName, this.#handleDataStoreUpdate)
})
}

get [kTable]() {
Expand Down Expand Up @@ -236,4 +257,20 @@ export class DataType {
}
return { docId, createdAt, createdBy }
}

/**
* @param {Set<string>} docIds
*/
#handleDataStoreUpdate = (docIds) => {
if (this.listenerCount('updated-docs') === 0) return
const updatedDocs = /** @type {TDoc[]} */ (
this.#db
.select()
.from(this.#table)
.where(inArray(this.#table.docId, [...docIds]))
.all()
.map((doc) => deNullify(doc))
)
this.emit('updated-docs', updatedDocs)
}
}
18 changes: 13 additions & 5 deletions src/index-writer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { Logger } from '../logger.js'
/**
* @typedef {import('@mapeo/schema').MapeoDoc} MapeoDoc
*/
/**
* @typedef {{ [K in MapeoDoc['schemaName']]?: string[] }} IndexedDocIds
*/
/**
* @typedef {ReturnType<import('@mapeo/schema').decode>} MapeoDocInternal
*/
Expand Down Expand Up @@ -54,14 +57,16 @@ export class IndexWriter {
}

/**
*
* @param {import('multi-core-indexer').Entry[]} entries
* @returns {Promise<IndexedDocIds>} map of indexed docIds by schemaName
*/
async batch(entries) {
// sqlite-indexer is _significantly_ faster when batching even <10 at a
// time, so best to queue docs here before calling sliteIndexer.batch()
/** @type {Record<string, MapeoDoc[]>} */
const queued = {}
/** @type {IndexedDocIds} */
const indexed = {}
for (const { block, key, index } of entries) {
try {
const version = { coreDiscoveryKey: discoveryKey(key), index }
Expand All @@ -71,19 +76,21 @@ export class IndexWriter {
// Unknown or invalid entry - silently ignore
continue
}
// Don't have an indexer for this type - silently ignore
if (!this.#indexers.has(doc.schemaName)) continue
if (queued[doc.schemaName]) {
queued[doc.schemaName].push(doc)
// @ts-expect-error - we know this is defined, TS doesn't
indexed[doc.schemaName].push(doc.docId)
} else {
queued[doc.schemaName] = [doc]
indexed[doc.schemaName] = [doc.docId]
}
}
for (const [schemaName, docs] of Object.entries(queued)) {
// @ts-expect-error
const indexer = this.#indexers.get(schemaName)
if (!indexer) {
// Don't have an indexer for this type - silently ignore
continue
}
if (!indexer) continue // Won't happen, but TS doesn't know that
indexer.batch(docs)
if (this.#l.enabled) {
for (const doc of docs) {
Expand All @@ -96,5 +103,6 @@ export class IndexWriter {
}
}
}
return indexed
}
}
7 changes: 5 additions & 2 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,14 @@ export class MapeoProject {
// Ignore errors thrown by values that can't be decoded for now
}
}

await Promise.all([
// TODO: Note that docs indexed to the shared index writer (project
// settings) are not currently returned here, so it is not possible to
// subscribe to updates for projectSettings
const [indexed] = await Promise.all([
projectIndexWriter.batch(otherEntries),
sharedIndexWriter.batch(projectSettingsEntries),
])
return indexed
}

get observation() {
Expand Down
9 changes: 3 additions & 6 deletions src/sync/peer-sync-controller.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mapObject from 'map-obj'
import { NAMESPACES } from '../core-manager/index.js'
import { Logger } from '../logger.js'
import { createMap } from '../utils.js'

/**
* @typedef {import('../core-manager/index.js').Namespace} Namespace
Expand Down Expand Up @@ -304,11 +305,7 @@ function getSyncStatus(peerId, state) {
/**
* @template T
* @param {T} value
* @returns {Record<Namespace, T>} */
**/
function createNamespaceMap(value) {
const map = /** @type {Record<Namespace, T>} */ ({})
for (const ns of NAMESPACES) {
map[ns] = value
}
return map
return createMap(NAMESPACES, value)
}
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import MultiCoreIndexer from 'multi-core-indexer'
import Corestore from 'corestore'
import Hypercore from 'hypercore'
import RandomAccessStorage from 'random-access-storage'
import { DefaultListener, ListenerSignature } from 'tiny-typed-emitter'

type SupportedBlobVariants = typeof SUPPORTED_BLOB_VARIANTS
export type BlobType = keyof SupportedBlobVariants
Expand Down Expand Up @@ -184,3 +185,10 @@ export type Entries<T> = {
}[keyof T][]

export type CoreStorage = (name: string) => RandomAccessStorage

export type DefaultEmitterEvents<
L extends ListenerSignature<L> = DefaultListener
> = {
newListener: (event: keyof L, listener: L[keyof L]) => void
removeListener: (event: keyof L, listener: L[keyof L]) => void
}
16 changes: 16 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,19 @@ export function projectIdToNonce(projectId) {
export function getDeviceId(keyManager) {
return keyManager.getIdentityKeypair().publicKey.toString('hex')
}

/**
* Small helper to create a typed map
*
* @template {string} K
* @template {any} V
* @param {ReadonlyArray<K>} keys
* @param {V} value
* @returns {Record<K, V extends () => infer T ? T : V>} */
export function createMap(keys, value) {
const map = /** @type {Record<K, V extends () => infer T ? T : V>} */ ({})
for (const key of keys) {
map[key] = typeof value === 'function' ? value() : value
}
return map
}
Loading
Loading