From 93fc17047e724a8cab3d2741c8b3af653d9a46f7 Mon Sep 17 00:00:00 2001 From: Basit <1305718+mabaasit@users.noreply.github.com> Date: Mon, 4 Nov 2024 14:10:06 +0100 Subject: [PATCH] fix(user-data): use semaphore to limit reads COMPASS-7256 (#6427) * use semaphore * many files test * handle error * cr fixes --- .../compass-user-data/src/semaphore.spec.ts | 28 +++++++++++++++++++ packages/compass-user-data/src/semaphore.ts | 27 ++++++++++++++++++ packages/compass-user-data/src/user-data.ts | 5 ++++ 3 files changed, 60 insertions(+) create mode 100644 packages/compass-user-data/src/semaphore.spec.ts create mode 100644 packages/compass-user-data/src/semaphore.ts diff --git a/packages/compass-user-data/src/semaphore.spec.ts b/packages/compass-user-data/src/semaphore.spec.ts new file mode 100644 index 00000000000..3eb90550ef8 --- /dev/null +++ b/packages/compass-user-data/src/semaphore.spec.ts @@ -0,0 +1,28 @@ +import { expect } from 'chai'; +import { Semaphore } from './semaphore'; + +describe('semaphore', function () { + const maxConcurrentOps = 5; + let semaphore: Semaphore; + let taskHandler: (id: number) => Promise; + + beforeEach(() => { + semaphore = new Semaphore(maxConcurrentOps); + taskHandler = async (id: number) => { + const release = await semaphore.waitForRelease(); + const delay = Math.floor(Math.random() * 450) + 50; + try { + await new Promise((resolve) => setTimeout(resolve, delay)); + return id; + } finally { + release(); + } + }; + }); + + it('should run operations concurrently', async function () { + const tasks = Array.from({ length: 10 }, (_, i) => taskHandler(i)); + const results = await Promise.all(tasks); + expect(results).to.have.lengthOf(10); + }); +}); diff --git a/packages/compass-user-data/src/semaphore.ts b/packages/compass-user-data/src/semaphore.ts new file mode 100644 index 00000000000..98c8a8c047b --- /dev/null +++ b/packages/compass-user-data/src/semaphore.ts @@ -0,0 +1,27 @@ +export class Semaphore { + private currentCount = 0; + private queue: (() => void)[] = []; + constructor(private maxConcurrentOps: number) {} + + waitForRelease(): Promise<() => void> { + return new Promise((resolve) => { + const attempt = () => { + this.currentCount++; + resolve(this.release.bind(this)); + }; + if (this.currentCount < this.maxConcurrentOps) { + attempt(); + } else { + this.queue.push(attempt); + } + }); + } + + private release() { + this.currentCount--; + if (this.queue.length > 0) { + const next = this.queue.shift(); + next && next(); + } + } +} diff --git a/packages/compass-user-data/src/user-data.ts b/packages/compass-user-data/src/user-data.ts index bbb01daa8ef..92d3cd36d5e 100644 --- a/packages/compass-user-data/src/user-data.ts +++ b/packages/compass-user-data/src/user-data.ts @@ -4,6 +4,7 @@ import { createLogger } from '@mongodb-js/compass-logging'; import { getStoragePath } from '@mongodb-js/compass-utils'; import type { z } from 'zod'; import writeFile from 'write-file-atomic'; +import { Semaphore } from './semaphore'; const { log, mongoLogId } = createLogger('COMPASS-USER-STORAGE'); @@ -68,6 +69,7 @@ export class UserData { private readonly serialize: SerializeContent>; private readonly deserialize: DeserializeContent; private readonly getFileName: GetFileName; + private readonly semaphore = new Semaphore(100); constructor( private readonly validator: T, @@ -122,7 +124,9 @@ export class UserData { let data: string; let stats: Stats; let handle: fs.FileHandle | undefined = undefined; + let release: (() => void) | undefined = undefined; try { + release = await this.semaphore.waitForRelease(); handle = await fs.open(absolutePath, 'r'); [stats, data] = await Promise.all([ handle.stat(), @@ -139,6 +143,7 @@ export class UserData { throw error; } finally { await handle?.close(); + release?.(); } try {