From 57e956473c50dd0b65409775ab6058e96635efd8 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 6 Jun 2023 13:39:37 -0700 Subject: [PATCH 1/5] Proper updating of resource counts when a change occurs --- .editorconfig | 3 + .../data/__tests__/applyRemoteChanges.spec.js | 271 ++++++++++++ .../shared/data/applyRemoteChanges.js | 405 ++++++++++++++---- .../frontend/shared/data/changes.js | 57 +-- .../frontend/shared/data/constants.js | 7 + .../frontend/shared/data/locks.js | 24 ++ .../frontend/shared/data/resources.js | 34 ++ .../frontend/shared/data/serverSync.js | 195 +++++---- .../frontend/shared/utils/deferred.js | 67 +++ jest_config/jest.conf.js | 2 +- jest_config/setup.js | 11 +- package.json | 1 + 12 files changed, 884 insertions(+), 193 deletions(-) create mode 100644 contentcuration/contentcuration/frontend/shared/data/__tests__/applyRemoteChanges.spec.js create mode 100644 contentcuration/contentcuration/frontend/shared/data/locks.js create mode 100644 contentcuration/contentcuration/frontend/shared/utils/deferred.js diff --git a/.editorconfig b/.editorconfig index 1f49431c53..8db7923734 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,5 +1,8 @@ root = true +[*] +max_line_length = 100 + [*.js] indent_size = 2 diff --git a/contentcuration/contentcuration/frontend/shared/data/__tests__/applyRemoteChanges.spec.js b/contentcuration/contentcuration/frontend/shared/data/__tests__/applyRemoteChanges.spec.js new file mode 100644 index 0000000000..fc9d6d3f6c --- /dev/null +++ b/contentcuration/contentcuration/frontend/shared/data/__tests__/applyRemoteChanges.spec.js @@ -0,0 +1,271 @@ +import { CHANGE_TYPES } from '../constants'; +import { ChangeDispatcher, ChangeStream, resourceCounts } from '../applyRemoteChanges'; +import Deferred from 'shared/utils/deferred'; +import { ContentKindsNames } from 'shared/leUtils/ContentKinds'; +import { RolesNames } from 'shared/leUtils/Roles'; + +function tick() { + return new Promise(resolve => { + setTimeout(resolve, 0); + }); +} + +describe('ChangeStream', () => { + let dispatchers; + let changeStream; + + beforeEach(() => { + dispatchers = [ + { + apply: jest.fn(() => Promise.resolve()), + }, + { + apply: jest.fn(() => Promise.resolve()), + }, + ]; + changeStream = new ChangeStream(dispatchers); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('constructor', () => { + it('should initialize with the provided dispatchers', () => { + expect(changeStream._dispatchers).toBe(dispatchers); + }); + + it('should create a WritableStream instance', () => { + changeStream.init(); + expect(changeStream._stream).toBeDefined(); + }); + + it('should create a writer for the stream', () => { + changeStream.init(); + expect(changeStream._writer).toBeDefined(); + }); + }); + + describe('write', () => { + let writeSpy; + + beforeEach(() => { + dispatchers = []; + for (let i = 0; i < 2; i++) { + const deferred = new Deferred(); + const dispatcher = { + deferred, + apply: jest.fn(() => deferred.promise()), + }; + dispatchers.push(dispatcher); + } + changeStream = new ChangeStream(dispatchers); + changeStream.init(); + writeSpy = jest.spyOn(changeStream._writer, 'write'); + }); + + it('should acquire a lock and await the writer ready promise', async () => { + const changes = [{ id: 1 }, { id: 2 }]; + const result = changeStream.write(changes); + const resultDeferred = Deferred.fromPromise(result); + + await tick(); + + // All changes should be written to the stream sink + expect(writeSpy.mock.calls).toHaveLength(2); + expect(writeSpy).toHaveBeenCalledWith(changes[0]); + expect(writeSpy).toHaveBeenCalledWith(changes[1]); + + // the write should be awaiting the dispatcher's apply + expect(dispatchers[0].apply).toHaveBeenCalledWith(changes[0]); + expect(dispatchers[1].apply).not.toHaveBeenCalledWith(changes[0]); + + // The result should not be resolved yet, until all dispatchers have applied + expect(resultDeferred.isFulfilled).toBe(false); + + for (const dispatcher of dispatchers) { + dispatcher.deferred.resolve(); + } + + // Should resolve, otherwise it'll hit the Jest timeout + await result; + }); + }); + + describe('doWrite', () => { + it('should apply the change to each dispatcher', async () => { + const change = { id: 1 }; + await changeStream.doWrite(change); + + for (const dispatcher of dispatchers) { + expect(dispatcher.apply).toHaveBeenCalledWith(change); + } + }); + }); +}); + +describe('ChangeDispatcher', () => { + let changeDispatcher; + + beforeEach(() => { + changeDispatcher = new ChangeDispatcher(); + }); + + describe('apply', () => { + it('should call applyCreate if change type is CREATED and applyCreate is defined', async () => { + const change = { type: CHANGE_TYPES.CREATED }; + const applyCreateResult = 'create result'; + changeDispatcher.applyCreate = jest.fn().mockResolvedValue(applyCreateResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyCreate).toHaveBeenCalledWith(change); + expect(result).toBe(applyCreateResult); + }); + + it('should call applyUpdate if change type is UPDATED and applyUpdate is defined', async () => { + const change = { type: CHANGE_TYPES.UPDATED }; + const applyUpdateResult = 'update result'; + changeDispatcher.applyUpdate = jest.fn().mockResolvedValue(applyUpdateResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyUpdate).toHaveBeenCalledWith(change); + expect(result).toBe(applyUpdateResult); + }); + + it('should call applyDelete if change type is DELETED and applyDelete is defined', async () => { + const change = { type: CHANGE_TYPES.DELETED }; + const applyDeleteResult = 'delete result'; + changeDispatcher.applyDelete = jest.fn().mockResolvedValue(applyDeleteResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyDelete).toHaveBeenCalledWith(change); + expect(result).toBe(applyDeleteResult); + }); + + it('should call applyMove if change type is MOVED and applyMove is defined', async () => { + const change = { type: CHANGE_TYPES.MOVED }; + const applyMoveResult = 'move result'; + changeDispatcher.applyMove = jest.fn().mockResolvedValue(applyMoveResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyMove).toHaveBeenCalledWith(change); + expect(result).toBe(applyMoveResult); + }); + + it('should call applyCopy if change type is COPIED and applyCopy is defined', async () => { + const change = { type: CHANGE_TYPES.COPIED }; + const applyCopyResult = 'copy result'; + changeDispatcher.applyCopy = jest.fn().mockResolvedValue(applyCopyResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyCopy).toHaveBeenCalledWith(change); + expect(result).toBe(applyCopyResult); + }); + + it('should call applyPublish if change type is PUBLISHED and applyPublish is defined', async () => { + const change = { type: CHANGE_TYPES.PUBLISHED }; + const applyPublishResult = 'publish result'; + changeDispatcher.applyPublish = jest.fn().mockResolvedValue(applyPublishResult); + + const result = await changeDispatcher.apply(change); + + expect(changeDispatcher.applyPublish).toHaveBeenCalledWith(change); + expect(result).toBe(applyPublishResult); + }); + }); +}); + +describe('ResourceCounts', () => { + describe('_applyDiff', () => { + it('should return the correct diff when changedNode is a folder and multiplier is 1', () => { + const changedNode = { + kind: ContentKindsNames.TOPIC, + total_count: 10, + resource_count: 5, + coach_count: 3, + }; + const multiplier = 1; + const ancestor = { + total_count: 100, + resource_count: 50, + coach_count: 30, + }; + + const diff = resourceCounts._applyDiff(changedNode, multiplier, ancestor); + + expect(diff.total_count).toBe(110); // (1 * 10) + 100 + expect(diff.resource_count).toBe(55); // (1 * 5) + 50 + expect(diff.coach_count).toBe(33); // (1 * 3) + 30 + }); + + it('should return the correct diff when changedNode is a folder and multiplier is -1', () => { + const changedNode = { + kind: ContentKindsNames.TOPIC, + total_count: 10, + resource_count: 5, + coach_count: 3, + }; + const multiplier = -1; + const ancestor = { + total_count: 100, + resource_count: 50, + coach_count: 30, + }; + + const diff = resourceCounts._applyDiff(changedNode, multiplier, ancestor); + + expect(diff.total_count).toBe(90); // (-1 * 10) + 100 + expect(diff.resource_count).toBe(45); // (-1 * 5) + 50 + expect(diff.coach_count).toBe(27); // (-1 * 3) + 30 + }); + + it('should return the correct diff when changedNode is not a folder and counts are 0', () => { + const changedNode = { + kind: ContentKindsNames.AUDIO, + total_count: 0, + resource_count: 0, + coach_count: 0, + role_visibility: RolesNames.LEARNER, + }; + const multiplier = 1; + const ancestor = { + total_count: 50, + resource_count: 20, + coach_count: 10, + }; + + const diff = resourceCounts._applyDiff(changedNode, multiplier, ancestor); + + expect(diff.total_count).toBe(51); // 1 + 50 + expect(diff.resource_count).toBe(21); // 1 + 20 + expect(diff.coach_count).toBe(10); // No change + }); + + it('should return the correct diff when changedNode is not a folder but coach content', () => { + const changedNode = { + kind: ContentKindsNames.AUDIO, + total_count: 0, + resource_count: 0, + coach_count: 0, + role_visibility: RolesNames.COACH, + }; + const multiplier = -1; + const ancestor = { + total_count: 50, + resource_count: 20, + coach_count: 10, + }; + + const diff = resourceCounts._applyDiff(changedNode, multiplier, ancestor); + + expect(diff.total_count).toBe(49); // 50 - 1 + expect(diff.resource_count).toBe(19); // 20 - 1 + expect(diff.coach_count).toBe(9); // 10 - 1 + }); + }); +}); diff --git a/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js b/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js index ea10bc6768..c30741ac77 100644 --- a/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js +++ b/contentcuration/contentcuration/frontend/shared/data/applyRemoteChanges.js @@ -1,9 +1,12 @@ import Dexie from 'dexie'; import sortBy from 'lodash/sortBy'; import logging from '../logging'; -import { CHANGE_TYPES, TABLE_NAMES } from './constants'; +import { CHANGE_TYPES, TABLE_NAMES, LOCK_NAMES } from './constants'; import db from './db'; import { INDEXEDDB_RESOURCES } from './registry'; +import { acquireLock } from 'shared/data/locks'; +import { RolesNames } from 'shared/leUtils/Roles'; +import { ContentKindsNames } from 'shared/leUtils/ContentKinds'; const { CREATED, DELETED, UPDATED, MOVED, PUBLISHED, SYNCED, DEPLOYED } = CHANGE_TYPES; @@ -37,6 +40,10 @@ export function collectChanges(changes) { return collectedChanges; } +function sortChanges(changes) { + return sortBy(changes, ['server_rev', 'rev']); +} + /** * @param {Object} change - The change object * @param {String|Function} args[] - string table names with callback at the end @@ -50,108 +57,348 @@ function transaction(change, ...args) { }); } -function applyCreate(change) { - return transaction(change, () => { - const table = db.table(change.table); - return table - .put(change.obj, !table.schema.primKey.keyPath ? change.key : undefined) - .then(() => change.obj); - }); -} - -function applyUpdate(change) { - return transaction(change, () => { - return db - .table(change.table) - .where(':id') - .equals(change.key) - .modify(obj => applyMods(obj, change.mods)); - }); +/** + * Class consolidating code related to mapping changes to the appropriate apply function + * and logging related errors + */ +export class ChangeDispatcher { + async apply(change) { + let result = null; + try { + if (change.type === CHANGE_TYPES.CREATED && this.applyCreate) { + result = await this.applyCreate(change); + } else if (change.type === CHANGE_TYPES.UPDATED && this.applyUpdate) { + result = await this.applyUpdate(change); + } else if (change.type === CHANGE_TYPES.DELETED && this.applyDelete) { + result = await this.applyDelete(change); + } else if (change.type === CHANGE_TYPES.MOVED && this.applyMove) { + result = await this.applyMove(change); + } else if (change.type === CHANGE_TYPES.COPIED && this.applyCopy) { + result = await this.applyCopy(change); + } else if (change.type === CHANGE_TYPES.PUBLISHED && this.applyPublish) { + result = await this.applyPublish(change); + } + } catch (e) { + logging.error(e, { + filename: 'change.json', + // strip csrf token from headers + data: JSON.stringify(change), + contentType: 'application/json', + }); + } + return result; + } } -function applyDelete(change) { - return transaction(change, () => { - return db.table(change.table).delete(change.key); - }); -} +class ReturnedChanges extends ChangeDispatcher { + /** + * @param {CreatedChange} change + * @return {Promise} + */ + applyCreate(change) { + return transaction(change, () => { + const table = db.table(change.table); + return table + .put(change.obj, !table.schema.primKey.keyPath ? change.key : undefined) + .then(() => change.obj); + }); + } -function applyMove(change) { - const resource = INDEXEDDB_RESOURCES[change.table]; - if (!resource || !resource.tableMove) { - return Promise.resolve(); + /** + * @param {UpdatedChange} change + * @return {Promise} + */ + applyUpdate(change) { + return transaction(change, () => { + return db + .table(change.table) + .where(':id') + .equals(change.key) + .modify(obj => applyMods(obj, change.mods)); + }); } - const { key, target, position } = change; - return resource.resolveTreeInsert({ id: key, target, position, isCreate: false }, data => { + /** + * @param {DeletedChange} change + * @return {Promise} + */ + applyDelete(change) { return transaction(change, () => { - return resource.tableMove(data); + return db.table(change.table).delete(change.key); }); - }); + } + + /** + * @param {MovedChange} change + * @return {Promise} + */ + applyMove(change) { + const resource = INDEXEDDB_RESOURCES[change.table]; + if (!resource || !resource.tableMove) { + return Promise.resolve(); + } + + const { key, target, position } = change; + return resource.resolveTreeInsert({ id: key, target, position, isCreate: false }, data => { + return transaction(change, () => { + return resource.tableMove(data); + }); + }); + } + + /** + * @param {CopiedChange} change + * @return {Promise} + */ + applyCopy(change) { + const resource = INDEXEDDB_RESOURCES[change.table]; + if (!resource || !resource.tableCopy) { + return Promise.resolve(); + } + + const { key, target, position, from_key } = change; + // copying takes the ID of the node to copy, so we use `from_key` + return resource.resolveTreeInsert({ id: from_key, target, position, isCreate: true }, data => { + return transaction(change, () => { + // Update the ID on the payload to match the received change, since isCreate=true + // would generate new IDs + data.payload.id = key; + return resource.tableCopy(data); + }); + }); + } + + /** + * @param {PublishedChange} change + * @return {Promise} + */ + applyPublish(change) { + if (change.table !== TABLE_NAMES.CHANNEL) { + return Promise.resolve(); + } + + // Publish changes associate with the channel, but we open a transaction on contentnode + return transaction(change, TABLE_NAMES.CONTENTNODE, () => { + return db + .table(TABLE_NAMES.CONTENTNODE) + .where({ channel_id: change.channel_id }) + .modify({ changed: false, published: true }); + }); + } } -function applyCopy(change) { - const resource = INDEXEDDB_RESOURCES[change.table]; - if (!resource || !resource.tableCopy) { - return Promise.resolve(); +/** + * Updates aggregate counts on ancestors when a change occurs + */ +class ResourceCounts extends ChangeDispatcher { + async apply(change) { + // Resource counts are only applicable to content nodes + if (change.table !== TABLE_NAMES.CONTENTNODE) { + return null; + } + // When there's a current transaction, delay until it's complete + if (Dexie.currentTransaction) { + return new Promise((resolve, reject) => { + Dexie.currentTransaction.on('complete', () => this.apply(change).then(resolve, reject)); + }); + } + return super.apply(change); } - const { key, target, position, from_key } = change; - // copying takes the ID of the node to copy, so we use `from_key` - return resource.resolveTreeInsert({ id: from_key, target, position, isCreate: true }, data => { - return transaction(change, () => { - // Update the ID on the payload to match the received change, since isCreate=true - // would generate new IDs - data.payload.id = key; - return resource.tableCopy(data); + /** + * @return {ContentNode} + */ + get resource() { + return INDEXEDDB_RESOURCES[TABLE_NAMES.CONTENTNODE]; + } + + /** + * @return {Dexie.Table} + */ + get table() { + return this.resource.table; + } + + /** + * Generates the diff to apply to ancestor nodes that updates their counts appropriately + * + * @typedef {{ + * kind: string, + * total_count: number, + * resource_count: number, + * coach_count: number, + * role_visibility: string + * }} Node + * @param {Node} changedNode + * @param {number} multiplier + * @param {{total_count: number, resource_count: number, coach_count: number}} ancestor + * @return {{total_count: number, resource_count: number, coach_count: number}} + * @private + */ + _applyDiff(changedNode, multiplier, ancestor) { + const isFolder = changedNode.kind === ContentKindsNames.TOPIC; + return { + total_count: multiplier * (changedNode.total_count || 1) + ancestor.total_count, + resource_count: + multiplier * (isFolder ? changedNode.resource_count || 0 : 1) + ancestor.resource_count, + coach_count: + multiplier * + (isFolder + ? changedNode.coach_count || 0 + : Number(changedNode.role_visibility === RolesNames.COACH)) + + ancestor.coach_count, + }; + } + + /** + * @param {CreatedChange} change + * @return {Promise} + */ + async applyCreate(change) { + await this.resource.updateAncestors( + { id: change.key, ignoreChanges: true }, + this._applyDiff.bind(this, change.obj, 1) + ); + } + + /** + * @param {UpdatedChange} change + * @return {Promise} + */ + async applyUpdate(change) { + // If the role visibility hasn't changed, we don't need to update the ancestor counts + if (!change.mods.role_visibility) { + return; + } + + await this.resource.updateAncestors({ id: change.key, ignoreChanges: true }, ancestor => { + return { + coach_count: + ancestor.coach_count + (change.mods.role_visibility === RolesNames.COACH ? 1 : -1), + }; }); - }); + } + + /** + * @param {DeletedChange} change + * @return {Promise} + */ + async applyDelete(change) { + await this.resource.updateAncestors( + { id: change.key, ignoreChanges: true }, + this._applyDiff.bind(this, change.oldObj, -1) + ); + } + + /** + * @param {MovedChange} change + * @return {Promise} + */ + async applyMove(change) { + // Only if the node is being moved to a new parent do we need to update the ancestor counts + if (change.oldParent === change.parent) { + return; + } + + const node = await this.table.get(change.key); + await this.resource.updateAncestors( + { id: change.oldParent, includeSelf: true, ignoreChanges: true }, + this._applyDiff.bind(this, node, -1) + ); + await this.resource.updateAncestors( + { id: change.parent, includeSelf: true, ignoreChanges: true }, + this._applyDiff.bind(this, node, 1) + ); + } + + /** + * @param {CopiedChange} change + * @return {Promise} + */ + async applyCopy(change) { + const node = await this.table.get(change.key); + await this.resource.updateAncestors( + { id: change.key, ignoreChanges: true }, + this._applyDiff.bind(this, node, 1) + ); + } } -function applyPublish(change) { - if (change.table !== TABLE_NAMES.CHANNEL) { - return Promise.resolve(); +/** + * A wrapper class that uses a WritableStream to queue, process, and apply changes to the database + * through dispatcher class instances + */ +export class ChangeStream { + /** + * @param {ChangeDispatcher[]} dispatchers + */ + constructor(dispatchers) { + this._dispatchers = dispatchers; + this._stream = null; + this._writer = null; } - // Publish changes associate with the channel, but we open a transaction on contentnode - return transaction(change, TABLE_NAMES.CONTENTNODE, () => { - return db - .table(TABLE_NAMES.CONTENTNODE) - .where({ channel_id: change.channel_id }) - .modify({ changed: false, published: true }); - }); + /** + * Delay initialization of the stream, otherwise this could get invoked before + * the ponyfill is loaded in our Jest test environment + */ + init() { + this._stream = new WritableStream({ + write: change => this.doWrite(change), + }); + this._writer = this._stream.getWriter(); + } + + /** + * @param {Object[]} changes + * @return {Promise} + */ + write(changes) { + if (!this._stream) { + this.init(); + } + + return acquireLock({ name: LOCK_NAMES.APPLY_CHANGES }, async () => { + for (const change of sortChanges(changes)) { + // write to the queue but not necessarily applied yet + this._writer.write(change); + } + // return/await here ensures all changes are applied, which allows us to release the lock + return this._writer.ready; + }); + } + + /** + * @param {Object} change - A change object + * @return {Promise} + */ + async doWrite(change) { + for (const dispatcher of this._dispatchers) { + await dispatcher.apply(change); + } + } } +const returnedChanges = new ReturnedChanges(); +/** + * Export resourceCounts instance to update aggregate counts on ancestors when a change occurs + * @type {ResourceCounts} + */ +export const resourceCounts = new ResourceCounts(); +/** + * Export changeStream instance to write returned changes to the database from sync requests + * @type {ChangeStream} + */ +export const changeStream = new ChangeStream([returnedChanges, resourceCounts]); + /** * @see https://github.com/dfahlander/Dexie.js/blob/master/addons/Dexie.Syncable/src/apply-changes.js * @return {Promise} */ export default async function applyChanges(changes) { const results = []; - for (const change of sortBy(changes, ['server_rev', 'rev'])) { - let result; - try { - if (change.type === CHANGE_TYPES.CREATED) { - result = await applyCreate(change); - } else if (change.type === CHANGE_TYPES.UPDATED) { - result = await applyUpdate(change); - } else if (change.type === CHANGE_TYPES.DELETED) { - result = await applyDelete(change); - } else if (change.type === CHANGE_TYPES.MOVED) { - result = await applyMove(change); - } else if (change.type === CHANGE_TYPES.COPIED) { - result = await applyCopy(change); - } else if (change.type === CHANGE_TYPES.PUBLISHED) { - result = await applyPublish(change); - } - } catch (e) { - logging.error(e, { - filename: 'change.json', - // strip csrf token from headers - data: JSON.stringify(change), - contentType: 'application/json', - }); - } - + for (const change of sortChanges(changes)) { + const result = await returnedChanges.apply(change); if (result) { results.push(result); } diff --git a/contentcuration/contentcuration/frontend/shared/data/changes.js b/contentcuration/contentcuration/frontend/shared/data/changes.js index 6065218d10..4129ea1515 100644 --- a/contentcuration/contentcuration/frontend/shared/data/changes.js +++ b/contentcuration/contentcuration/frontend/shared/data/changes.js @@ -6,6 +6,7 @@ import isUndefined from 'lodash/isUndefined'; import omit from 'lodash/omit'; import sortBy from 'lodash/sortBy'; import logging from '../logging'; +import { resourceCounts } from './applyRemoteChanges'; import db, { CLIENTID } from 'shared/data/db'; import { promiseChunk } from 'shared/utils/helpers'; import { @@ -208,7 +209,7 @@ function omitIgnoredSubFields(obj) { export class Change { constructor({ type, key, table, source } = {}) { this.setAndValidateLookup(type, 'type', CHANGE_TYPES_LOOKUP); - this.setAndValidateNotNull(key, 'key'); + this.setAndValidateIsDefined(key, 'key'); this.setAndValidateLookup(table, 'table', TABLE_NAMES_LOOKUP); if (!INDEXEDDB_RESOURCES[this.table].syncable) { const error = new ReferenceError(`${this.table} is not a syncable table`); @@ -217,6 +218,7 @@ export class Change { } this.setAndValidateString(source, 'source'); } + get changeType() { return this.constructor.name; } @@ -244,28 +246,24 @@ export class Change { } this[name] = value; } - validateNotUndefined(value, name) { - if (isUndefined(value)) { + + validateIsDefined(value, name) { + if (isNull(value) || isUndefined(value)) { const error = new TypeError( - `${name} is required for a ${this.changeType} but it was undefined` + `${name} is required for a ${this.changeType} but it was ${ + isNull(value) ? 'null' : 'undefined' + }` ); logging.error(error, value); throw error; } } - setAndValidateNotUndefined(value, name) { - this.validateNotUndefined(value, name); - this[name] = value; - } - setAndValidateNotNull(value, name) { - this.validateNotUndefined(value, name); - if (isNull(value)) { - const error = new TypeError(`${name} is required for a ${this.changeType} but it was null`); - logging.error(error, value); - throw error; - } + + setAndValidateIsDefined(value, name) { + this.validateIsDefined(value, name); this[name] = value; } + validateObj(value, name) { if (!isPlainObject(value)) { const error = new TypeError(`${name} should be an object, but ${value} was passed instead`); @@ -273,10 +271,12 @@ export class Change { throw error; } } + setAndValidateObj(value, name, mapper = obj => obj) { this.validateObj(value, name); this[name] = mapper(value); } + setAndValidateBoolean(value, name) { if (typeof value !== 'boolean') { const error = new TypeError(`${name} should be a boolean, but ${value} was passed instead`); @@ -285,6 +285,7 @@ export class Change { } this[name] = value; } + setAndValidateObjOrNull(value, name, mapper = obj => obj) { if (!isPlainObject(value) && !isNull(value)) { const error = new TypeError( @@ -295,6 +296,7 @@ export class Change { } this[name] = mapper(value); } + setAndValidateString(value, name) { if (typeof value !== 'string') { const error = new TypeError(`${name} should be a string, but ${value} was passed instead`); @@ -303,13 +305,17 @@ export class Change { } this[name] = value; } - saveChange() { + + async saveChange() { if (!this.channelOrUserIdSet) { throw new ReferenceError( `Attempted to save ${this.changeType} change for ${this.table} before setting channel_id and user_id` ); } - return db[CHANGES_TABLE].add(this); + const rev = await db[CHANGES_TABLE].add(this); + // Do not await this + resourceCounts.apply(this); + return rev; } } @@ -377,7 +383,7 @@ export class DeletedChange extends Change { } export class MovedChange extends Change { - constructor({ target, position, parent, ...fields }) { + constructor({ target, position, parent, oldParent, ...fields }) { fields.type = CHANGE_TYPES.MOVED; super(fields); if (this.table !== TABLE_NAMES.CONTENTNODE) { @@ -385,9 +391,10 @@ export class MovedChange extends Change { `${this.changeType} is only supported by ${TABLE_NAMES.CONTENTNODE} table but ${this.table} was passed instead` ); } - this.setAndValidateNotUndefined(target, 'target'); + this.setAndValidateIsDefined(target, 'target'); this.setAndValidateLookup(position, 'position', RELATIVE_TREE_POSITIONS_LOOKUP); - this.setAndValidateNotUndefined(parent, 'parent'); + this.setAndValidateIsDefined(parent, 'parent'); + this.setAndValidateIsDefined(oldParent, 'oldParent'); this.setChannelAndUserId(); } } @@ -401,12 +408,12 @@ export class CopiedChange extends Change { `${this.changeType} is only supported by ${TABLE_NAMES.CONTENTNODE} table but ${this.table} was passed instead` ); } - this.setAndValidateNotUndefined(from_key, 'from_key'); + this.setAndValidateIsDefined(from_key, 'from_key'); this.setAndValidateObj(mods, 'mods', omitIgnoredSubFields); - this.setAndValidateNotUndefined(target, 'target'); + this.setAndValidateIsDefined(target, 'target'); this.setAndValidateLookup(position, 'position', RELATIVE_TREE_POSITIONS_LOOKUP); this.setAndValidateObjOrNull(excluded_descendants, 'excluded_descendants'); - this.setAndValidateNotUndefined(parent, 'parent'); + this.setAndValidateIsDefined(parent, 'parent'); this.setChannelAndUserId(); } } @@ -420,8 +427,8 @@ export class PublishedChange extends Change { `${this.changeType} is only supported by ${TABLE_NAMES.CHANNEL} table but ${this.table} was passed instead` ); } - this.setAndValidateNotUndefined(version_notes, 'version_notes'); - this.setAndValidateNotUndefined(language, 'language'); + this.setAndValidateIsDefined(version_notes, 'version_notes'); + this.setAndValidateIsDefined(language, 'language'); this.setChannelAndUserId({ id: this.key }); } } diff --git a/contentcuration/contentcuration/frontend/shared/data/constants.js b/contentcuration/contentcuration/frontend/shared/data/constants.js index 5f1ea5d357..709b251d1f 100644 --- a/contentcuration/contentcuration/frontend/shared/data/constants.js +++ b/contentcuration/contentcuration/frontend/shared/data/constants.js @@ -74,3 +74,10 @@ export const LAST_FETCHED = '__last_fetch'; export const CURRENT_USER = 'CURRENT_USER'; export const MAX_REV_KEY = 'max_rev'; + +export const LOCK_NAMES = { + SYNC: 'sync', + SYNC_CHANNEL: 'sync_channel:{channel_id}', + SYNC_USER: 'sync_user', + APPLY_CHANGES: 'apply_changes', +}; diff --git a/contentcuration/contentcuration/frontend/shared/data/locks.js b/contentcuration/contentcuration/frontend/shared/data/locks.js new file mode 100644 index 0000000000..d6963e88d2 --- /dev/null +++ b/contentcuration/contentcuration/frontend/shared/data/locks.js @@ -0,0 +1,24 @@ +/** + * Acquire an exclusive lock on the given name using the browser's Web Locks API, + * and then run the given async function. + * + * @param {string} name + * @param {boolean} exclusive + * @param {function(): Promise} asyncFunction + * @return {Promise} + */ +export function acquireLock({ name, exclusive = false }, asyncFunction) { + // If the browser supports the Web Locks API + // https://developer.mozilla.org/en-US/docs/Web/API/Web_Locks_API + if (navigator.locks) { + return navigator.locks.request( + name, + { + mode: exclusive ? 'exclusive' : 'shared', + }, + asyncFunction + ); + } + // Studio's supported browsers should support Web Locks but don't outright fail otherwise + return asyncFunction(); +} diff --git a/contentcuration/contentcuration/frontend/shared/data/resources.js b/contentcuration/contentcuration/frontend/shared/data/resources.js index 2575bfc3f9..fb1bdd10f6 100644 --- a/contentcuration/contentcuration/frontend/shared/data/resources.js +++ b/contentcuration/contentcuration/frontend/shared/data/resources.js @@ -1406,6 +1406,7 @@ export const ContentNode = new TreeResource({ // so that we can avoid doing fetches while such changes // are pending. parent: parent.id, + oldParent: isCreate ? null : node.parent, oldObj: isCreate ? null : node, table: this.tableName, source: CLIENTID, @@ -1546,6 +1547,12 @@ export const ContentNode = new TreeResource({ return payload; }, + /** + * Resolves with an array of all ancestors, including the node itself, in descending order + * + * @param {String} id + * @return {Promise<[{}]>} + */ getAncestors(id) { return this.table.get(id).then(node => { if (node) { @@ -1558,10 +1565,37 @@ export const ContentNode = new TreeResource({ } return [node]; } + // If node wasn't found, we fetch from the server return this.fetchCollection({ ancestors_of: id }); }); }, + /** + * Calls `updateCallback` on each ancestor, and calls `.update` for that ancestor + * with the return value from `updateCallback` + * + * @param {String} id + * @param {Boolean} includeSelf + * @param {Boolean} ignoreChanges - Ignore generating change events for the updates + * @param {Function} updateCallback + * @return {Promise} + */ + updateAncestors({ id, includeSelf = false, ignoreChanges = false }, updateCallback) { + return this.transaction({ mode: 'rw' }, async () => { + const ancestors = await this.getAncestors(id); + for (const ancestor of ancestors) { + if (ancestor.id === id && !includeSelf) { + continue; + } + if (ignoreChanges) { + await this.table.update(ancestor.id, updateCallback(ancestor)); + } else { + await this.update(ancestor.id, updateCallback(ancestor)); + } + } + }); + }, + /** * Uses local IndexedDB index on node_id+channel_id, otherwise specifically requests the * collection using the same params since GET detail endpoint doesn't support that the params diff --git a/contentcuration/contentcuration/frontend/shared/data/serverSync.js b/contentcuration/contentcuration/frontend/shared/data/serverSync.js index 27fb7b4cd0..100abf07f0 100644 --- a/contentcuration/contentcuration/frontend/shared/data/serverSync.js +++ b/contentcuration/contentcuration/frontend/shared/data/serverSync.js @@ -5,9 +5,10 @@ import pick from 'lodash/pick'; import orderBy from 'lodash/orderBy'; import uniq from 'lodash/uniq'; import logging from '../logging'; -import applyChanges from './applyRemoteChanges'; +import { changeStream } from './applyRemoteChanges'; +import { acquireLock } from './locks'; import { changeRevs } from './registry'; -import { CHANGE_TYPES, CHANGES_TABLE, MAX_REV_KEY } from './constants'; +import { CHANGE_TYPES, CHANGES_TABLE, MAX_REV_KEY, LOCK_NAMES } from './constants'; import db, { channelScope } from './db'; import { Channel, Session, Task } from './resources'; import client from 'shared/client'; @@ -98,16 +99,6 @@ function handleAllowed(response) { return Promise.resolve(); } -function handleReturnedChanges(response) { - // The changes property is an array of any changes from the server to apply in the - // client. - const returnedChanges = get(response, ['data', 'changes'], []); - if (returnedChanges.length) { - return applyChanges(returnedChanges); - } - return Promise.resolve(); -} - // These are keys that the changes table is indexed by, so we cannot modify these during // the modify call that we use to update the changes table, if they already exist. const noModifyKeys = { @@ -202,7 +193,11 @@ function handleTasks(response) { const noUserError = 'No user logged in'; -async function syncChanges(syncAllChanges) { +/** + * @param {boolean} syncAllChanges + * @return {Promise<[{}]>} - Resolves with an array of returned changes from the server + */ +function syncChanges(syncAllChanges) { // Note: we could in theory use Dexie syncable for what // we are doing here, but I can't find a good way to make // it ignore our regular API calls for seeding the database @@ -212,82 +207,92 @@ async function syncChanges(syncAllChanges) { // revisions. We will do this for now, but we have the option of doing // something more involved and better architectured in the future. - try { - // Get the current user - if there is no user, we can't sync. - const user = await Session.getSession(); - if (!user) { - // If not logged in, nothing to do. - throw new Error(noUserError); - } + // Either scoping to a channel or to a user + const syncLock = channelScope.id + ? LOCK_NAMES.SYNC_CHANNEL.replace('{channel_id}', channelScope.id) + : LOCK_NAMES.SYNC_USER; - const channel_revs = {}; - if (channelScope.id) { - channel_revs[channelScope.id] = get(user, [MAX_REV_KEY, channelScope.id], 0); - } + // If we are syncing all changes, we don't need to acquire an exclusive lock because we should + // already have a global lock. Hopefully this could prevent the possibility of deadlocks. + return acquireLock({ name: syncLock, exclusive: !syncAllChanges }, async () => { + try { + // Get the current user - if there is no user, we can't sync. + const user = await Session.getSession(); + if (!user) { + // If not logged in, nothing to do. + throw new Error(noUserError); + } - const unAppliedChanges = await db[CHANGES_TABLE].orderBy('server_rev') - .filter(c => c.synced && !c.errors && !c.disallowed) - .toArray(); + const channel_revs = {}; + if (channelScope.id) { + channel_revs[channelScope.id] = get(user, [MAX_REV_KEY, channelScope.id], 0); + } - const requestPayload = { - changes: [], - channel_revs, - user_rev: user.user_rev || 0, - unapplied_revs: unAppliedChanges.map(c => c.server_rev).filter(Boolean), - }; + const unAppliedChanges = await db[CHANGES_TABLE].orderBy('server_rev') + .filter(c => c.synced && !c.errors && !c.disallowed) + .toArray(); - // Snapshot which revs we are syncing, so that we can - // removes them from the changeRevs array after the sync - const revsToSync = []; - if (syncAllChanges) { - const unsyncedRevs = await db[CHANGES_TABLE].filter(c => !c.synced).primaryKeys(); - revsToSync.push(...unsyncedRevs); - } else { - revsToSync.push(...changeRevs); - } - if (revsToSync.length) { - const syncableChanges = db[CHANGES_TABLE].where('rev') - .anyOf(revsToSync) - .filter(c => !c.synced); - const changesToSync = await syncableChanges.toArray(); - // By the time we get here, our changesToSync Array should - // have every change we want to sync to the server, so we - // can now trim it down to only what is needed to transmit over the wire. - // TODO: remove moves when a delete change is present for an object, - // because a delete will wipe out the move. - const changes = changesToSync.map(trimChangeForSync).filter(Boolean); - // Create a promise for the sync - if there is nothing to sync just resolve immediately, - // in order to still call our change cleanup code. - if (changes.length) { - requestPayload.changes = changes; + const requestPayload = { + changes: [], + channel_revs, + user_rev: user.user_rev || 0, + unapplied_revs: unAppliedChanges.map(c => c.server_rev).filter(Boolean), + }; + + // Snapshot which revs we are syncing, so that we can + // removes them from the changeRevs array after the sync + const revsToSync = []; + if (syncAllChanges) { + const unsyncedRevs = await db[CHANGES_TABLE].filter(c => !c.synced).primaryKeys(); + revsToSync.push(...unsyncedRevs); + } else { + revsToSync.push(...changeRevs); + } + if (revsToSync.length) { + const syncableChanges = db[CHANGES_TABLE].where('rev') + .anyOf(revsToSync) + .filter(c => !c.synced); + const changesToSync = await syncableChanges.toArray(); + // By the time we get here, our changesToSync Array should + // have every change we want to sync to the server, so we + // can now trim it down to only what is needed to transmit over the wire. + // TODO: remove moves when a delete change is present for an object, + // because a delete will wipe out the move. + const changes = changesToSync.map(trimChangeForSync).filter(Boolean); + // Create a promise for the sync - if there is nothing to sync just resolve immediately, + // in order to still call our change cleanup code. + if (changes.length) { + requestPayload.changes = changes; + } + } + // The response from the sync endpoint has the format: + // { + // "disallowed": [], + // "allowed": [], + // "changes": [], + // "errors": [], + // "successes": [], + // } + const response = await client.post(urls['sync'](), requestPayload); + // Clear out this many changes from changeRevs array, since we have now synced them. + changeRevs.splice(0, revsToSync.length); + await Promise.all([ + handleDisallowed(response), + handleAllowed(response), + handleErrors(response), + handleSuccesses(response), + handleMaxRevs(response, user.id), + handleTasks(response), + ]); + // Return the array of returned changes + return get(response, ['data', 'changes'], []); + } catch (err) { + // There was an error during syncing, log, but carry on + if (err.message !== noUserError) { + logging.error(err); } } - // The response from the sync endpoint has the format: - // { - // "disallowed": [], - // "allowed": [], - // "changes": [], - // "errors": [], - // "successes": [], - // } - const response = await client.post(urls['sync'](), requestPayload); - // Clear out this many changes from changeRevs array, since we have now synced them. - changeRevs.splice(0, revsToSync.length); - await Promise.all([ - handleDisallowed(response), - handleAllowed(response), - handleReturnedChanges(response), - handleErrors(response), - handleSuccesses(response), - handleMaxRevs(response, user.id), - handleTasks(response), - ]); - } catch (err) { - // There was an error during syncing, log, but carry on - if (err.message !== noUserError) { - logging.error(err); - } - } + }); } // Set the sync debounce time artificially low in tests to avoid timeouts. @@ -302,13 +307,31 @@ function doSyncChanges(syncAll = false) { const deferredStack = syncDeferredStack.splice(0); // Wait for any existing sync to complete, then sync again. syncingPromise = syncingPromise - .then(() => syncChanges(syncAll)) - .then(result => { + .then(() => + acquireLock( + { + name: LOCK_NAMES.SYNC, + // If syncAll is true, we want to acquire an exclusive lock, which would make it globally + // blocking, otherwise we want to acquire a shared lock, which would allow other shared + // locks to be acquired and should not intersect with a global exclusive lock if one is + // already held. + exclusive: syncAll, + }, + () => syncChanges(syncAll) + ) + ) + .then(returnedChanges => { + // Apply returned changes + if (returnedChanges.length) { + return changeStream.write(returnedChanges); + } + }) + .then(() => { // If it is successful call all of the resolve functions that we have stored // from all the Promises that have been returned while this specific debounce // has been active. for (const { resolve } of deferredStack) { - resolve(result); + resolve(); } }) .catch(err => { diff --git a/contentcuration/contentcuration/frontend/shared/utils/deferred.js b/contentcuration/contentcuration/frontend/shared/utils/deferred.js new file mode 100644 index 0000000000..e94ef49dc5 --- /dev/null +++ b/contentcuration/contentcuration/frontend/shared/utils/deferred.js @@ -0,0 +1,67 @@ +/** + * Deferred promise + */ +export default class Deferred extends Promise { + /** + * @param {function(resolve, reject): void|null} executor + */ + constructor(executor = null) { + let self_resolve, self_reject; + executor = executor || (() => {}); + + super(function(resolve, reject) { + self_resolve = resolve; + self_reject = reject; + executor(resolve, reject); + }); + + this._resolve = self_resolve; + this._reject = self_reject; + + this.isResolved = false; + this.isRejected = false; + } + + get isFulfilled() { + return this.isResolved || this.isRejected; + } + + /** + * @param value + * @returns {Deferred} + */ + resolve(value) { + this._resolve(value); + this.isResolved = true; + return this; + } + + /** + * @param value + * @returns {Deferred} + */ + reject(value) { + this._reject(value); + this.isRejected = true; + return this; + } + + /** + * @returns {Promise} + */ + promise() { + return new Promise((resolve, reject) => { + this.then(resolve, reject); + }); + } + + /** + * @param {Promise} promise + * @return {Deferred} + */ + static fromPromise(promise) { + const deferred = new Deferred(); + promise.then(deferred.resolve.bind(deferred), deferred.reject.bind(deferred)); + return deferred; + } +} diff --git a/jest_config/jest.conf.js b/jest_config/jest.conf.js index ff89cf2c24..88a490ebb2 100644 --- a/jest_config/jest.conf.js +++ b/jest_config/jest.conf.js @@ -27,7 +27,7 @@ module.exports = { }, transformIgnorePatterns: ['/node_modules/(?!vuetify|epubjs|kolibri-design-system|kolibri-constants|axios)'], snapshotSerializers: ['/node_modules/jest-serializer-vue'], - setupFilesAfterEnv: [path.resolve(__dirname, './setup')], + setupFilesAfterEnv: ['/jest_config/setup.js'], coverageDirectory: '/coverage', collectCoverageFrom: ['!**/node_modules/**'], verbose: false, diff --git a/jest_config/setup.js b/jest_config/setup.js index c96391f83f..d5fb127156 100644 --- a/jest_config/setup.js +++ b/jest_config/setup.js @@ -9,7 +9,16 @@ import KThemePlugin from 'kolibri-design-system/lib/KThemePlugin'; import 'shared/i18n/setup'; // Polyfill indexeddb import 'fake-indexeddb/auto'; +// Ponyfill webstreams +import {ReadableStream, WritableStream, TransformStream, CountQueuingStrategy} from 'web-streams-polyfill/ponyfill/es2018'; import jquery from 'jquery'; + +window.jQuery = window.$ = jquery; +window.ReadableStream = global.ReadableStream = ReadableStream; +window.WritableStream = global.WritableStream = WritableStream; +window.TransformStream = global.TransformStream = TransformStream; +window.CountQueuingStrategy = global.CountQueuingStrategy = CountQueuingStrategy; + import AnalyticsPlugin from 'shared/analytics/plugin'; import { setupSchema } from 'shared/data'; import * as resources from 'shared/data/resources'; @@ -34,8 +43,6 @@ global.afterEach(() => { }); }); -window.jQuery = window.$ = jquery; - window.storageBaseUrl = '/content/storage/'; Vue.use(VueRouter); diff --git a/package.json b/package.json index d19c7173cc..74794499f6 100644 --- a/package.json +++ b/package.json @@ -116,6 +116,7 @@ "npm-run-all": "^4.1.3", "stylus": "^0.59.0", "stylus-loader": "^7.1.2", + "web-streams-polyfill": "^3.2.1", "workbox-webpack-plugin": "^6.5.4" }, "false": {}, From 07d3af922466d666da1ccbabcf2a86a85ef3f969 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Tue, 6 Jun 2023 13:44:19 -0700 Subject: [PATCH 2/5] Fix duplicate coach content icon for resources --- .../channelEdit/components/ContentNodeListItem/index.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentcuration/contentcuration/frontend/channelEdit/components/ContentNodeListItem/index.vue b/contentcuration/contentcuration/frontend/channelEdit/components/ContentNodeListItem/index.vue index 6ca94fa902..ba6d555b85 100644 --- a/contentcuration/contentcuration/frontend/channelEdit/components/ContentNodeListItem/index.vue +++ b/contentcuration/contentcuration/frontend/channelEdit/components/ContentNodeListItem/index.vue @@ -97,7 +97,7 @@ > {{ category(node.categories) }} - +