From 4d95c0c8a5842da95ed81b08a155a81d858f4a41 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 3 Sep 2024 22:42:04 -0400 Subject: [PATCH] sync engine level replaces interval when called again --- packages/agent/src/sync-engine-level.ts | 42 +++++++++++++----- .../agent/tests/sync-engine-level.spec.ts | 43 +++++++++++++++++++ 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 15affed2b..de38864ef 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -69,6 +69,7 @@ export class SyncEngineLevel implements SyncEngine { private _db: AbstractLevel; private _syncIntervalId?: ReturnType; + private _syncLock = false; private _ulidFactory: ULIDFactory; constructor({ agent, dataPath, db }: SyncEngineLevelParams) { @@ -268,11 +269,23 @@ export class SyncEngineLevel implements SyncEngine { throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); } - if (!direction || direction === 'push') { - await this.push(); + if (this._syncLock) { + throw new Error('SyncEngineLevel: Cannot call sync while a sync operation is in progress.'); } - if (!direction || direction === 'pull') { - await this.pull(); + + try { + this._syncLock = true; + if (!direction || direction === 'push') { + await this.push(); + } + if (!direction || direction === 'pull') { + await this.pull(); + } + } catch (error: any) { + this._syncLock = false; + throw error; + } finally { + this._syncLock = false; } } @@ -283,24 +296,31 @@ export class SyncEngineLevel implements SyncEngine { const intervalMilliseconds = ms(interval); return new Promise((resolve, reject) => { - const intervalSync = async () => { - if (this._syncIntervalId) { - clearInterval(this._syncIntervalId); + if (this._syncLock) { + return; } + // clears the interval and sets the syncIntervalId to undefined + this.stopSync(); + try { - await this.push(); - await this.pull(); + await this.sync(); } catch (error: any) { this.stopSync(); reject(error); } - // then we start sync again - this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); + if (!this._syncIntervalId) { + // only set a new interval if none is set. The most recently called `startSync` will set the final interval. + this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); + } }; + if (this._syncIntervalId) { + clearInterval(this._syncIntervalId); + } + this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); }); } diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 5037f6a30..07633d90c 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -2473,6 +2473,49 @@ describe('SyncEngineLevel', () => { pushSpy.restore(); clock.restore(); }); + + it('calls sync once per interval with the latest interval timer being respected', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers(); + + const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); + // set to be a sync time longer than the interval + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 1_000); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + await clock.tickAsync(1_400); // less than the initial interval + the sync time + + expect(syncSpy.callCount).to.equal(1); + + // set to be a short sync time + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 15); + })); + + testHarness.agent.sync.startSync({ interval: '300ms' }); + + await clock.tickAsync(301); // exactly the new interval + 1 + + expect(syncSpy.callCount).to.equal(2); + + + await clock.tickAsync(601); // two more intervals + + expect(syncSpy.callCount).to.equal(4); + + syncSpy.restore(); + clock.restore(); + }); }); }); }); \ No newline at end of file