Skip to content

Commit

Permalink
Scope signal buffer to session (#1190)
Browse files Browse the repository at this point in the history
  • Loading branch information
silesky authored Nov 18, 2024
1 parent 0a30f38 commit 46e8819
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 58 deletions.
7 changes: 7 additions & 0 deletions .changeset/khaki-cheetahs-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@segment/analytics-signals': minor
---

* Clear signal buffer at start of new session
* Prune signalBuffer to maxBufferSize on new session (if different)
* Add sessionStorage storage type
117 changes: 107 additions & 10 deletions packages/signals/signals/src/core/buffer/__tests__/buffer.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,123 @@
import { sleep } from '@segment/analytics-core'
import { range } from '../../../test-helpers/range'
import { createInteractionSignal } from '../../../types/factories'
import { getSignalBuffer, SignalBuffer } from '../index'

describe(getSignalBuffer, () => {
let buffer: SignalBuffer
beforeEach(async () => {
sessionStorage.clear()
buffer = getSignalBuffer({
maxBufferSize: 10,
})
await buffer.clear()
})
describe('indexDB', () => {
it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
})
it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})

it('should delete older signals when maxBufferSize is exceeded', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})

it('should delete older signals on initialize if current number exceeds maxBufferSize', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

// Re-initialize buffer
buffer = getSignalBuffer({
maxBufferSize: 10,
})

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})

it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
it('should clear signal buffer if there is a new session according to session storage', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])

// Simulate a new session by clearing session storage and re-initializing the buffer
sessionStorage.clear()
await sleep(100)
buffer = getSignalBuffer({
maxBufferSize: 10,
})

await expect(buffer.getAll()).resolves.toHaveLength(0)
})
})
it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
describe('sessionStorage', () => {
it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
})

it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})

it('should delete older signals when maxBufferSize is exceeded', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})
})
176 changes: 133 additions & 43 deletions packages/signals/signals/src/core/buffer/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Signal } from '@segment/analytics-signals-runtime'
import { openDB, DBSchema, IDBPDatabase } from 'idb'
import { openDB, DBSchema, IDBPDatabase, IDBPObjectStore } from 'idb'
import { logger } from '../../lib/logger'
import { WebStorage } from '../../lib/storage/web-storage'

interface SignalDatabase extends DBSchema {
signals: {
Expand All @@ -15,77 +16,147 @@ export interface SignalPersistentStorage {
clear(): void
}

export class SignalStore implements SignalPersistentStorage {
interface IDBPDatabaseSignals extends IDBPDatabase<SignalDatabase> {}
interface IDBPObjectStoreSignals
extends IDBPObjectStore<
SignalDatabase,
['signals'],
'signals',
'readonly' | 'readwrite' | 'versionchange'
> {}

interface StoreSettings {
maxBufferSize: number
}
export class SignalStoreIndexDB implements SignalPersistentStorage {
static readonly DB_NAME = 'Segment Signals Buffer'
static readonly STORE_NAME = 'signals'
private signalStore: Promise<IDBPDatabase<SignalDatabase>>
private signalCount = 0
private db: Promise<IDBPDatabaseSignals>
private maxBufferSize: number

public length() {
return this.signalCount
}

private sessionKeyStorage = new WebStorage(window.sessionStorage)
static deleteDatabase() {
return indexedDB.deleteDatabase(SignalStore.DB_NAME)
return indexedDB.deleteDatabase(SignalStoreIndexDB.DB_NAME)
}

constructor(settings: { maxBufferSize?: number } = {}) {
this.maxBufferSize = settings.maxBufferSize ?? 50
this.signalStore = this.createSignalStore()
void this.initializeSignalCount()
async getStore(
permission: IDBTransactionMode,
database?: IDBPDatabaseSignals
): Promise<IDBPObjectStoreSignals> {
const db = database ?? (await this.db)
const store = db
.transaction(SignalStoreIndexDB.STORE_NAME, permission)
.objectStore(SignalStoreIndexDB.STORE_NAME)
return store
}

private getStore() {
return this.signalStore
constructor(settings: StoreSettings) {
this.maxBufferSize = settings.maxBufferSize
this.db = this.initSignalDB()
}

private async createSignalStore() {
const db = await openDB<SignalDatabase>(SignalStore.DB_NAME, 1, {
private async initSignalDB(): Promise<IDBPDatabaseSignals> {
const db = await openDB<SignalDatabase>(SignalStoreIndexDB.DB_NAME, 1, {
upgrade(db) {
db.createObjectStore(SignalStore.STORE_NAME, { autoIncrement: true })
db.createObjectStore(SignalStoreIndexDB.STORE_NAME, {
autoIncrement: true,
})
},
})
logger.debug('Signals Buffer (indexDB) initialized')
// if the signal buffer is too large, delete the oldest signals (e.g, the settings have changed)
const store = await this.getStore('readwrite', db)
await this.clearStoreIfNeeded(store)
await this.countAndDeleteOldestIfNeeded(store, true)
await store.transaction.done
return db
}

private async initializeSignalCount() {
const store = await this.signalStore
this.signalCount = await store.count(SignalStore.STORE_NAME)
logger.debug(
`Signal count initialized with ${this.signalCount} signals (max: ${this.maxBufferSize})`
)
private async clearStoreIfNeeded(store: IDBPObjectStoreSignals) {
// prevent the signals buffer from persisting across sessions (e.g, user closes tab and reopens)
const sessionKey = 'segment_signals_db_session_key'
if (!sessionStorage.getItem(sessionKey)) {
this.sessionKeyStorage.setItem(sessionKey, true)
await store.clear!()
logger.debug('New Session, so signals buffer cleared')
}
}

async add(signal: Signal): Promise<void> {
const store = await this.signalStore
if (this.signalCount >= this.maxBufferSize) {
// Get the key of the oldest signal and delete it
const oldestKey = await store
.transaction(SignalStore.STORE_NAME)
.store.getKey(IDBKeyRange.lowerBound(0))
if (oldestKey !== undefined) {
await store.delete(SignalStore.STORE_NAME, oldestKey)
} else {
this.signalCount--
const store = await this.getStore('readwrite')
await store.add!(signal)
await this.countAndDeleteOldestIfNeeded(store)
return store.transaction.done
}

private async countAndDeleteOldestIfNeeded(
store: IDBPObjectStoreSignals,
deleteMultiple = false
): Promise<void> {
let count = await store.count()
if (count > this.maxBufferSize) {
const cursor = await store.openCursor()
if (cursor) {
// delete up to maxItems
if (deleteMultiple) {
while (count > this.maxBufferSize) {
await cursor.delete!()
await cursor.continue()
count--
}
logger.debug(
`Signals Buffer: Purged signals to max buffer size of ${this.maxBufferSize}`
)
} else {
// just delete the oldest item
await cursor.delete!()
count--
}
}
}
await store.add(SignalStore.STORE_NAME, signal)
this.signalCount++
}

/**
* Get list of signals from the store, with the newest signals first.
*/
async getAll(): Promise<Signal[]> {
const store = await this.getStore()
return (await store.getAll(SignalStore.STORE_NAME)).reverse()
const store = await this.getStore('readonly')
const signals = await store.getAll()
await store.transaction.done
return signals.reverse()
}

async clear() {
const store = await this.getStore()
return store.clear(SignalStore.STORE_NAME)
async clear(): Promise<void> {
const store = await this.getStore('readwrite')
await store.clear!()
await store.transaction.done
}
}

export class SignalStoreSessionStorage implements SignalPersistentStorage {
private readonly storageKey = 'segment_signals_buffer'
private maxBufferSize: number

constructor(settings: StoreSettings) {
this.maxBufferSize = settings.maxBufferSize
}

add(signal: Signal): void {
const signals = this.getAll()
signals.unshift(signal)
if (signals.length > this.maxBufferSize) {
// delete the last one
signals.splice(-1)
}
sessionStorage.setItem(this.storageKey, JSON.stringify(signals))
}

clear(): void {
sessionStorage.removeItem(this.storageKey)
}

getAll(): Signal[] {
const signals = sessionStorage.getItem(this.storageKey)
return signals ? JSON.parse(signals) : []
}
}

Expand Down Expand Up @@ -125,14 +196,33 @@ export class SignalBuffer<
export interface SignalBufferSettingsConfig<
T extends SignalPersistentStorage = SignalPersistentStorage
> {
/**
* Maximum number of signals to store. Only applies if no custom storage implementation is provided.
*/
maxBufferSize?: number
/**
* Choose between sessionStorage and indexDB. Only applies if no custom storage implementation is provided.
* @default 'indexDB'
*/
storageType?: 'session' | 'indexDB'
/**
* Custom storage implementation
* @default SignalStoreIndexDB
*/
signalStorage?: T
}
export const getSignalBuffer = <
T extends SignalPersistentStorage = SignalPersistentStorage
>(
settings: SignalBufferSettingsConfig<T>
) => {
const store = settings.signalStorage ?? new SignalStore(settings)
const settingsWithDefaults: StoreSettings = {
maxBufferSize: 50,
...settings,
}
const store =
settings.signalStorage ?? settings.storageType === 'session'
? new SignalStoreSessionStorage(settingsWithDefaults)
: new SignalStoreIndexDB(settingsWithDefaults)
return new SignalBuffer(store)
}
2 changes: 2 additions & 0 deletions packages/signals/signals/src/core/signals/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type SignalsSettingsConfig = Pick<
| 'networkSignalsAllowList'
| 'networkSignalsDisallowList'
| 'networkSignalsAllowSameDomain'
| 'signalStorageType'
> & {
signalStorage?: SignalPersistentStorage
processSignal?: string
Expand Down Expand Up @@ -52,6 +53,7 @@ export class SignalGlobalSettings {

this.signalBuffer = {
signalStorage: settings.signalStorage,
storageType: settings.signalStorageType,
maxBufferSize: settings.maxBufferSize,
}
this.ingestClient = {
Expand Down
Loading

0 comments on commit 46e8819

Please sign in to comment.