diff --git a/packages/plugin-session-replay-browser/rollup.config.js b/packages/plugin-session-replay-browser/rollup.config.js index 8fbd9d329..cec2773d9 100644 --- a/packages/plugin-session-replay-browser/rollup.config.js +++ b/packages/plugin-session-replay-browser/rollup.config.js @@ -1,6 +1,14 @@ import { iife, umd } from '../../scripts/build/rollup.config'; +import { webWorkerPlugins } from '../session-replay-browser/rollup.config'; iife.input = umd.input; iife.output.name = 'sessionReplay'; -export default [umd, iife]; +export default async () => { + const commonPlugins = await webWorkerPlugins(); + + iife.plugins = [...commonPlugins, ...iife.plugins]; + umd.plugins = [...commonPlugins, ...umd.plugins]; + + return [iife, umd]; +}; diff --git a/packages/plugin-session-replay-browser/src/session-replay.ts b/packages/plugin-session-replay-browser/src/session-replay.ts index 032639aae..0712bdec3 100644 --- a/packages/plugin-session-replay-browser/src/session-replay.ts +++ b/packages/plugin-session-replay-browser/src/session-replay.ts @@ -67,6 +67,7 @@ export class SessionReplayPlugin implements EnrichmentPlugin { version: { type: 'plugin', version: VERSION }, performanceConfig: this.options.performanceConfig, storeType: this.options.storeType, + experimental: this.options.experimental, }).promise; } catch (error) { config.loggerProvider.error(`Session Replay: Failed to initialize due to ${(error as Error).message}`); diff --git a/packages/plugin-session-replay-browser/src/typings/session-replay.ts b/packages/plugin-session-replay-browser/src/typings/session-replay.ts index ebdb4a4b1..bf0295ba2 100644 --- a/packages/plugin-session-replay-browser/src/typings/session-replay.ts +++ b/packages/plugin-session-replay-browser/src/typings/session-replay.ts @@ -29,4 +29,7 @@ export interface SessionReplayOptions { performanceConfig?: SessionReplayPerformanceConfig; storeType?: StoreType; customSessionId?: (event: Event) => string | undefined; + experimental?: { + useWebWorker: boolean; + }; } diff --git a/packages/session-replay-browser/package.json b/packages/session-replay-browser/package.json index 95e4829e9..77a3866f7 100644 --- a/packages/session-replay-browser/package.json +++ b/packages/session-replay-browser/package.json @@ -45,6 +45,7 @@ "@amplitude/rrweb": "2.0.0-alpha.26", "@amplitude/rrweb-packer": "2.0.0-alpha.26", "@amplitude/rrweb-snapshot": "2.0.0-alpha.26", + "@rollup/plugin-replace": "^6.0.1", "idb": "^8.0.0", "tslib": "^2.4.1" }, diff --git a/packages/session-replay-browser/rollup.config.js b/packages/session-replay-browser/rollup.config.js index 8fbd9d329..909df22f0 100644 --- a/packages/session-replay-browser/rollup.config.js +++ b/packages/session-replay-browser/rollup.config.js @@ -1,6 +1,60 @@ import { iife, umd } from '../../scripts/build/rollup.config'; + +import resolve from '@rollup/plugin-node-resolve'; +import replace from '@rollup/plugin-replace'; +import { terser } from 'rollup-plugin-terser'; +import typescript from '@rollup/plugin-typescript'; +import { rollup } from 'rollup'; +import path from 'node:path'; + iife.input = umd.input; iife.output.name = 'sessionReplay'; -export default [umd, iife]; +async function buildWebWorker() { + const input = path.join(path.dirname(new URL(import.meta.url).pathname), './src/worker/compression.ts'); + const bundle = await rollup({ + input, + plugins: [ + resolve({ + browser: true, + }), + typescript({ + tsconfig: 'tsconfig.json', + // no need to output types + declaration: false, + declarationMap: false, + }), + terser(), + ], + }); + + const { output } = await bundle.generate({ + format: 'iife', + name: 'WebWorker', + inlineDynamicImports: true, + }); + const webWorkerCode = output[0].code; + + return webWorkerCode; +} + +export async function webWorkerPlugins() { + return [ + replace({ + preventAssignment: true, + values: { + 'replace.COMPRESSION_WEBWORKER_BODY': JSON.stringify(await buildWebWorker()), + }, + }), + ]; +} + +export default async () => { + const commonPlugins = await webWorkerPlugins(); + + iife.plugins = [...commonPlugins, ...iife.plugins]; + umd.plugins = [...commonPlugins, ...umd.plugins]; + + return [iife, umd]; +}; diff --git a/packages/session-replay-browser/src/config/local-config.ts b/packages/session-replay-browser/src/config/local-config.ts index 6e4b117b0..8d37221e0 100644 --- a/packages/session-replay-browser/src/config/local-config.ts +++ b/packages/session-replay-browser/src/config/local-config.ts @@ -30,6 +30,7 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo version?: SessionReplayVersion; storeType: StoreType; performanceConfig?: SessionReplayPerformanceConfig; + experimental?: { useWebWorker: boolean }; constructor(apiKey: string, options: SessionReplayOptions) { const defaultConfig = getDefaultConfig(); @@ -59,5 +60,8 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo if (options.debugMode) { this.debugMode = options.debugMode; } + if (options.experimental) { + this.experimental = options.experimental; + } } } diff --git a/packages/session-replay-browser/src/config/types.ts b/packages/session-replay-browser/src/config/types.ts index bcb4fbb2d..f9a11ac13 100644 --- a/packages/session-replay-browser/src/config/types.ts +++ b/packages/session-replay-browser/src/config/types.ts @@ -57,6 +57,10 @@ export interface SessionReplayLocalConfig extends Config { version?: SessionReplayVersion; performanceConfig?: SessionReplayPerformanceConfig; storeType: StoreType; + + experimental?: { + useWebWorker: boolean; + }; } export interface SessionReplayJoinedConfig extends SessionReplayLocalConfig { diff --git a/packages/session-replay-browser/src/events/event-compressor.ts b/packages/session-replay-browser/src/events/event-compressor.ts index 600e73356..849d91519 100644 --- a/packages/session-replay-browser/src/events/event-compressor.ts +++ b/packages/session-replay-browser/src/events/event-compressor.ts @@ -1,8 +1,8 @@ +import { getGlobalScope } from '@amplitude/analytics-client-common'; +import { pack } from '@amplitude/rrweb-packer'; import type { eventWithTime } from '@amplitude/rrweb-types'; import { SessionReplayJoinedConfig } from 'src/config/types'; import { SessionReplayEventsManager } from 'src/typings/session-replay'; -import { pack } from '@amplitude/rrweb-packer'; -import { getGlobalScope } from '@amplitude/analytics-client-common'; interface TaskQueue { event: eventWithTime; @@ -18,11 +18,13 @@ export class EventCompressor { deviceId: string | undefined; canUseIdleCallback: boolean | undefined; timeout: number; + worker?: Worker; constructor( eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>, config: SessionReplayJoinedConfig, deviceId: string | undefined, + workerScriptInternal?: string, // this is used for unit testing ) { const globalScope = getGlobalScope(); this.canUseIdleCallback = globalScope && 'requestIdleCallback' in globalScope; @@ -30,6 +32,26 @@ export class EventCompressor { this.config = config; this.deviceId = deviceId; this.timeout = config.performanceConfig?.timeout || DEFAULT_TIMEOUT; + + // These two lines will be changed at compile time. + const replace: Record = {}; + // This next line is going to be ridiculously hard to cover in unit tests, ignoring. + /* istanbul ignore next */ + const workerScript = replace.COMPRESSION_WEBWORKER_BODY ?? workerScriptInternal; + if (this.config.experimental?.useWebWorker && globalScope && globalScope.Worker && workerScript) { + config.loggerProvider.log('[Experimental] Enabling web worker for compression'); + + const worker = new Worker(URL.createObjectURL(new Blob([workerScript], { type: 'application/javascript' }))); + worker.onerror = (e) => { + config.loggerProvider.error(e); + }; + worker.onmessage = (e) => { + const { compressedEvent, sessionId } = e.data as Record; + this.addCompressedEventToManager(compressedEvent, sessionId); + }; + + this.worker = worker; + } } // Schedule processing during idle time @@ -86,9 +108,7 @@ export class EventCompressor { return JSON.stringify(packedEvent); }; - public addCompressedEvent = (event: eventWithTime, sessionId: string | number) => { - const compressedEvent = this.compressEvent(event); - + private addCompressedEventToManager = (compressedEvent: string, sessionId: string | number) => { if (this.eventsManager && this.deviceId) { this.eventsManager.addEvent({ event: { type: 'replay', data: compressedEvent }, @@ -97,4 +117,18 @@ export class EventCompressor { }); } }; + + public addCompressedEvent = (event: eventWithTime, sessionId: string | number) => { + if (this.worker) { + // This indirectly compresses the event. + this.worker.postMessage({ event, sessionId }); + } else { + const compressedEvent = this.compressEvent(event); + this.addCompressedEventToManager(compressedEvent, sessionId); + } + }; + + public terminate = () => { + this.worker?.terminate(); + }; } diff --git a/packages/session-replay-browser/src/session-replay.ts b/packages/session-replay-browser/src/session-replay.ts index 97708603d..fa598f2b4 100644 --- a/packages/session-replay-browser/src/session-replay.ts +++ b/packages/session-replay-browser/src/session-replay.ts @@ -138,6 +138,10 @@ export class SessionReplay implements AmplitudeSessionReplay { } this.eventsManager = new MultiEventManager<'replay' | 'interaction', string>(...managers); + // To prevent too many threads. + if (this.eventCompressor) { + this.eventCompressor.terminate(); + } this.eventCompressor = new EventCompressor(this.eventsManager, this.config, this.getDeviceId()); this.loggerProvider.log('Installing @amplitude/session-replay-browser.'); diff --git a/packages/session-replay-browser/src/worker/compression.ts b/packages/session-replay-browser/src/worker/compression.ts new file mode 100644 index 000000000..2e7e79cff --- /dev/null +++ b/packages/session-replay-browser/src/worker/compression.ts @@ -0,0 +1,15 @@ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import { pack } from '@amplitude/rrweb-packer'; + +onmessage = (e) => { + const { event, sessionId } = e.data; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + const compressedEvent = JSON.stringify(pack(event)); + + postMessage({ compressedEvent, sessionId }); +}; + +// added for testing +export const compressionOnMessage = onmessage; diff --git a/packages/session-replay-browser/test/event-compressor.test.ts b/packages/session-replay-browser/test/event-compressor.test.ts index 523f5f151..4187f6fd8 100644 --- a/packages/session-replay-browser/test/event-compressor.test.ts +++ b/packages/session-replay-browser/test/event-compressor.test.ts @@ -3,6 +3,7 @@ import { SessionReplayLocalConfig } from '../src/config/local-config'; import { EventCompressor } from '../src/events/event-compressor'; import { createEventsManager } from '../src/events/events-manager'; import { SessionReplayEventsManager } from '../src/typings/session-replay'; +import { eventWithTime } from '@amplitude/rrweb'; const mockEvent = { type: 4, @@ -41,6 +42,9 @@ describe('EventCompressor', () => { enabled: true, timeout: 2000, }, + experimental: { + useWebWorker: true, + }, }); beforeEach(async () => { @@ -159,4 +163,66 @@ describe('EventCompressor', () => { // Ensure processQueue was called recursively expect(processQueueMock).toHaveBeenCalledTimes(2); }); + + test.each([true, false])('should use webworkers if script exists', async (error: boolean) => { + let postMessageMock = jest.fn(); + let onMessageMock = jest.fn(); + let onErrorMock = jest.fn(); + let terminateMock = jest.fn(); + class MockWorker { + postMessage = (e: any) => { + postMessageMock = jest.fn(() => { + this.onmessage({ data: { compressedEvent: '', sessionId: 1234 } }); + }); + onErrorMock = jest.fn(() => { + this.onerror(e); + }); + if (error) { + onErrorMock(e); + } else { + postMessageMock(e); + } + }; + onmessage = (e: any) => { + onMessageMock = jest.fn(); + onMessageMock(e); + }; + onerror = (e: any) => { + onErrorMock = jest.fn(); + onErrorMock(e); + }; + terminate = () => { + terminateMock = jest.fn(); + terminateMock(); + }; + } + + global.Worker = MockWorker as unknown as typeof global.Worker; + + URL.createObjectURL = jest.fn(); + eventsManager = await createEventsManager<'replay'>({ + config, + type: 'replay', + storeType: 'memory', + }); + eventCompressor = new EventCompressor(eventsManager, config, deviceId, 'console.log("hi")'); + + const testEvent: eventWithTime = { + data: { + height: 1, + width: 1, + href: 'http://localhost', + }, + type: 4, + timestamp: 1, + }; + const testSessionId = 1234; + eventCompressor.addCompressedEvent(testEvent, testSessionId); + + expect(postMessageMock).toHaveBeenCalledTimes(error ? 0 : 1); + expect(onErrorMock).toHaveBeenCalledTimes(error ? 1 : 0); + + eventCompressor.terminate(); + expect(terminateMock).toHaveBeenCalled(); + }); }); diff --git a/packages/session-replay-browser/test/worker/compression.test.ts b/packages/session-replay-browser/test/worker/compression.test.ts new file mode 100644 index 000000000..a73774901 --- /dev/null +++ b/packages/session-replay-browser/test/worker/compression.test.ts @@ -0,0 +1,32 @@ +import { eventWithTime } from '@amplitude/rrweb'; +import { compressionOnMessage } from '../../src/worker/compression'; +import { pack } from '@amplitude/rrweb-packer'; + +describe('compression', () => { + test('should compress event', async () => { + global.postMessage = jest.fn(); + + const testEvent: eventWithTime = { + timestamp: 1, + type: 4, + data: { + height: 1, + width: 1, + href: 'http://localhost', + }, + }; + + // hack to make typescript not complain + (compressionOnMessage as (_: unknown) => void)({ + data: { + event: testEvent, + sessionId: 1234, + }, + }); + + expect(global.postMessage).toHaveBeenCalledWith({ + sessionId: 1234, + compressedEvent: JSON.stringify(pack(testEvent)), + }); + }); +});