diff --git a/packages/core/src/data-module/IotAppKitDataModule.spec.ts b/packages/core/src/data-module/IotAppKitDataModule.spec.ts index b21529dc0..c5d2db101 100644 --- a/packages/core/src/data-module/IotAppKitDataModule.spec.ts +++ b/packages/core/src/data-module/IotAppKitDataModule.spec.ts @@ -4,9 +4,9 @@ import { DataSource, DataSourceRequest } from './types.d'; import { DataPoint, DataStream, DataStreamInfo, Resolution } from '@synchro-charts/core'; import { Request } from './data-cache/requestTypes'; import { DataStreamsStore, DataStreamStore } from './data-cache/types'; -import { EMPTY_CACHE } from './data-cache/caching/caching'; +import * as caching from './data-cache/caching/caching'; import { createSiteWiseLegacyDataSource } from '../data-sources/site-wise-legacy/data-source'; -import { MINUTE_IN_MS, MONTH_IN_MS, SECOND_IN_MS } from '../common/time'; +import { HOUR_IN_MS, MINUTE_IN_MS, MONTH_IN_MS, SECOND_IN_MS } from '../common/time'; import { IotAppKitDataModule } from './IotAppKitDataModule'; import { SITEWISE_DATA_SOURCE } from '../data-sources/site-wise/data-source'; @@ -16,6 +16,8 @@ import { toDataStreamId, toSiteWiseAssetProperty } from '../data-sources/site-wi import Mock = jest.Mock; +const { EMPTY_CACHE } = caching; + const { propertyId: PROPERTY_ID, assetId: ASSET_ID } = toSiteWiseAssetProperty(DATA_STREAM.id); const DATA_STREAM_QUERY: SiteWiseDataStreamQuery = { @@ -337,7 +339,7 @@ describe('error handling', () => { it('provides a data stream which has an error associated with it on initial subscription', () => { const customSource = createMockSiteWiseDataSource([DATA_STREAM]); - const dataModule = new IotAppKitDataModule(CACHE_WITH_ERROR); + const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITH_ERROR }); const dataStreamCallback = jest.fn(); dataModule.registerDataSource(customSource); @@ -357,7 +359,7 @@ describe('error handling', () => { it('does not re-request a data stream with an error associated with it', async () => { const customSource = createMockSiteWiseDataSource([DATA_STREAM]); - const dataModule = new IotAppKitDataModule(CACHE_WITH_ERROR); + const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITH_ERROR }); const dataStreamCallback = jest.fn(); dataModule.registerDataSource(customSource); @@ -378,7 +380,7 @@ describe('error handling', () => { it('does request a data stream which has no error associated with it', () => { const customSource = createMockSiteWiseDataSource([DATA_STREAM]); - const dataModule = new IotAppKitDataModule(CACHE_WITHOUT_ERROR); + const dataModule = new IotAppKitDataModule({ initialDataCache: CACHE_WITHOUT_ERROR }); const dataStreamCallback = jest.fn(); @@ -398,6 +400,14 @@ describe('error handling', () => { }); describe('caching', () => { + beforeAll(() => { + jest.useFakeTimers('modern'); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + it('does not request already cached data', () => { const dataModule = new IotAppKitDataModule(); const dataSource = createMockSiteWiseDataSource([DATA_STREAM]); @@ -504,6 +514,81 @@ describe('caching', () => { }, ]); }); + + it('requests already cached data if the default TTL has expired', async () => { + const dataModule = new IotAppKitDataModule(); + const dataSource = createMockSiteWiseDataSource([DATA_STREAM]); + dataModule.registerDataSource(dataSource); + + const END_1 = new Date(); + const START_1 = new Date(END_1.getTime() - HOUR_IN_MS); + + const dataStreamCallback = jest.fn(); + const { update } = dataModule.subscribeToDataStreams( + { + query: DATA_STREAM_QUERY, + requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false }, + }, + dataStreamCallback + ); + + (dataSource.initiateRequest as Mock).mockClear(); + + jest.advanceTimersByTime(MINUTE_IN_MS); + + update({ requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false } }); + + expect(dataSource.initiateRequest).toBeCalledWith(expect.any(Object), [ + { + id: DATA_STREAM_INFO.id, + resolution: DATA_STREAM_INFO.resolution, + // 1 minute time advancement invalidates 3 minutes of cache by default, which is 2 minutes from END_1 + start: new Date(END_1.getTime() - 2 * MINUTE_IN_MS), + end: END_1, + }, + ]); + }); + + it('requests already cached data if custom TTL has expired', async () => { + const customCacheSettings = { + ttlDurationMapping: { + [MINUTE_IN_MS]: 0, + [5 * MINUTE_IN_MS]: 30 * SECOND_IN_MS, + }, + }; + + const dataModule = new IotAppKitDataModule({ cacheSettings: customCacheSettings }); + const dataSource = createMockSiteWiseDataSource([DATA_STREAM]); + dataModule.registerDataSource(dataSource); + + const END_1 = new Date(); + const START_1 = new Date(END_1.getTime() - HOUR_IN_MS); + + const dataStreamCallback = jest.fn(); + const { update } = dataModule.subscribeToDataStreams( + { + query: DATA_STREAM_QUERY, + requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false }, + }, + dataStreamCallback + ); + + (dataSource.initiateRequest as Mock).mockClear(); + + jest.advanceTimersByTime(MINUTE_IN_MS); + + update({ requestInfo: { viewport: { start: START_1, end: END_1 }, onlyFetchLatestValue: false } }); + + expect(dataSource.initiateRequest).toBeCalledWith(expect.any(Object), [ + { + id: DATA_STREAM_INFO.id, + resolution: DATA_STREAM_INFO.resolution, + // 1 minute time advancement invalidates 5 minutes of cache with custom mapping, which is 4 minutes from END_1 + start: new Date(END_1.getTime() - 4 * MINUTE_IN_MS), + end: END_1, + }, + ]); + }); }); describe('request scheduler', () => { @@ -693,4 +778,3 @@ it('requests data range with buffer', () => { unsubscribe(); }); - diff --git a/packages/core/src/data-module/IotAppKitDataModule.ts b/packages/core/src/data-module/IotAppKitDataModule.ts index eaed3d226..5c6e2de43 100644 --- a/packages/core/src/data-module/IotAppKitDataModule.ts +++ b/packages/core/src/data-module/IotAppKitDataModule.ts @@ -10,7 +10,7 @@ import { Subscription, SubscriptionUpdate, } from './types.d'; -import { DataStreamsStore } from './data-cache/types'; +import { DataStreamsStore, CacheSettings } from './data-cache/types'; import DataSourceStore from './data-source-store/dataSourceStore'; import { SubscriptionResponse } from '../data-sources/site-wise/types.d'; import { DataCache } from './data-cache/dataCacheWrapped'; @@ -18,6 +18,20 @@ import { Request } from './data-cache/requestTypes'; import { requestRange } from './data-cache/requestRange'; import { getDateRangesToRequest } from './data-cache/caching/caching'; import { viewportEndDate, viewportStartDate } from '../common/viewport'; +import { MINUTE_IN_MS, SECOND_IN_MS } from '../common/time'; + +export const DEFAULT_CACHE_SETTINGS = { + ttlDurationMapping: { + [1.2 * MINUTE_IN_MS]: 0, + [3 * MINUTE_IN_MS]: 30 * SECOND_IN_MS, + [20 * MINUTE_IN_MS]: 5 * MINUTE_IN_MS, + }, +}; + +interface IotAppKitDataModuleConfiguration { + initialDataCache?: DataStreamsStore; + cacheSettings?: Partial; +} export class IotAppKitDataModule implements DataModule { private dataCache: DataCache; @@ -26,13 +40,21 @@ export class IotAppKitDataModule implements DataModule { private dataSourceStore = new DataSourceStore(); + private cacheSettings: CacheSettings; + /** * Create a new data module, optionally with a pre-hydrated data cache. * */ - constructor(initialDataCache?: DataStreamsStore) { + constructor(configuration: IotAppKitDataModuleConfiguration = {}) { + const { initialDataCache, cacheSettings } = configuration; + this.dataCache = new DataCache(initialDataCache); this.subscriptions = new SubscriptionStore({ dataSourceStore: this.dataSourceStore, dataCache: this.dataCache }); + this.cacheSettings = { + ...DEFAULT_CACHE_SETTINGS, + ...cacheSettings, + }; } public registerDataSource = this.dataSourceStore.registerDataSource; @@ -81,6 +103,7 @@ export class IotAppKitDataModule implements DataModule { end: adjustedEnd, resolution, dataStreamId: id, + cacheSettings: this.cacheSettings, }); return { diff --git a/packages/core/src/data-module/data-cache/caching/caching.spec.ts b/packages/core/src/data-module/data-cache/caching/caching.spec.ts index f0d5bcb43..d35482a04 100755 --- a/packages/core/src/data-module/data-cache/caching/caching.spec.ts +++ b/packages/core/src/data-module/data-cache/caching/caching.spec.ts @@ -7,6 +7,7 @@ import { getDateRangesToRequest, unexpiredCacheIntervals, } from './caching'; +import { DEFAULT_CACHE_SETTINGS } from '../../IotAppKitDataModule'; import { HOUR_IN_MS, MINUTE_IN_MS, SECOND_IN_MS } from '../../../common/time'; import { DataStreamsStore } from '../types'; import { IntervalStructure } from '../../../common/intervalStructure'; @@ -66,6 +67,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: REQUESTED_START, end: REQUESTED_END, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([ [REQUESTED_START, CACHED_START], @@ -107,6 +109,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: REQUESTED_START, end: REQUESTED_END, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([ [REQUESTED_START, CACHED_START], @@ -150,6 +153,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: REQUESTED_START, end: REQUESTED_END, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([[REQUESTED_START, REQUESTED_END]]); }); @@ -165,6 +169,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: START_DATE, end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([[START_DATE, END_DATE]]); }); @@ -194,6 +199,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: START_DATE, end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([[START_DATE, END_DATE]]); }); @@ -221,6 +227,7 @@ describe('getDateRangesToRequest', () => { start: new Date(2000, 0, 0), end: new Date(2001, 0, 0), resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([[new Date(2000, 0, 6), new Date(2001, 0, 0)]]); }); @@ -248,6 +255,7 @@ describe('getDateRangesToRequest', () => { start: new Date(2000, 0, 0), end: new Date(2001, 0, 0), resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([]); }); @@ -262,6 +270,7 @@ describe('getDateRangesToRequest', () => { start: START_DATE, end: END_DATE, resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([]); }); @@ -276,6 +285,7 @@ describe('getDateRangesToRequest', () => { start: START_DATE, end: END_DATE, resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toHaveLength(1); }); @@ -290,6 +300,7 @@ describe('getDateRangesToRequest', () => { start: START_DATE, end: END_DATE, resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).not.toEqual([]); }); @@ -319,6 +330,7 @@ describe('getDateRangesToRequest', () => { start: START_DATE, end: END_DATE, resolution: 0, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([]); }); @@ -349,6 +361,7 @@ describe('getDateRangesToRequest', () => { dataStreamId: STREAM_ID, start: START_DATE, end: END_DATE, + cacheSettings: DEFAULT_CACHE_SETTINGS, }) ).toEqual([[START_DATE, END_DATE]]); }); @@ -921,6 +934,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(1621879612500), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeFalse(); }); @@ -971,6 +985,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(1621880261010), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeTrue(); @@ -1022,6 +1037,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(1621880261010), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeFalse(); @@ -1040,6 +1056,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(1621879662000), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeFalse(); @@ -1073,6 +1090,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(1621879661000), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeTrue(); @@ -1110,6 +1128,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(14), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeTrue(); @@ -1147,6 +1166,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(13), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeFalse(); @@ -1184,6 +1204,7 @@ describe('checkCacheForRecentPoint', () => { dataStreamId: STREAM_ID, resolution: RESOLUTION, start: new Date(16), + cacheSettings: DEFAULT_CACHE_SETTINGS, }); expect(presentInCache).toBeFalse(); diff --git a/packages/core/src/data-module/data-cache/caching/caching.ts b/packages/core/src/data-module/data-cache/caching/caching.ts index e9ed6a54e..cb5d7d2f3 100755 --- a/packages/core/src/data-module/data-cache/caching/caching.ts +++ b/packages/core/src/data-module/data-cache/caching/caching.ts @@ -1,5 +1,5 @@ import { DataPoint, Primitive } from '@synchro-charts/core'; -import { MINUTE_IN_MS, SECOND_IN_MS } from '../../../common/time'; +import { MINUTE_IN_MS } from '../../../common/time'; import { getDataStreamStore } from '../getDataStreamStore'; import { addInterval, @@ -9,25 +9,11 @@ import { subtractIntervals, } from '../../../common/intervalStructure'; -import { DataStreamsStore, DataStreamStore, TTLDurationMapping } from '../types'; +import { CacheSettings, DataStreamsStore, DataStreamStore, TTLDurationMapping } from '../types'; import { getExpiredCacheIntervals } from './expiredCacheIntervals'; import { RequestConfig } from '../requestTypes'; import { pointBisector } from '../../../common/dataFilters'; -// given duration specified as a key, it may only be re-requested after the provided TTL value -// has been surpassed. -// If the duration since the last request was longer the any of the provided duration's, then the value never expires. -// INVARIANT: for any two pairs (durationMS, TTL), if a given durationMS is larger than another durationMS, it's TTL -// must also be larger -// i.e. given two pairs, (d1, ttl1) and (d2, ttl2), -// d1 > d2 iff ttl1 > ttl2 - -const TTL_DURATION_CACHE_RULES: TTLDurationMapping = { - [1.2 * MINUTE_IN_MS]: 0, - [3 * MINUTE_IN_MS]: 30 * SECOND_IN_MS, - [20 * MINUTE_IN_MS]: 5 * MINUTE_IN_MS, -}; - export const unexpiredCacheIntervals = ( streamStore: DataStreamStore, ttlDurationMapping: TTLDurationMapping @@ -83,12 +69,14 @@ export const getDateRangesToRequest = ({ start, end, resolution, + cacheSettings, }: { store: DataStreamsStore; dataStreamId: string; start: Date; end: Date; resolution: number; + cacheSettings: CacheSettings; }): [Date, Date][] => { const streamStore = getDataStreamStore(dataStreamId, resolution, store); @@ -103,7 +91,7 @@ export const getDateRangesToRequest = ({ } // NOTE: Use the request cache since we don't want to request intervals that already have been requested. - const cacheIntervals = unexpiredCacheIntervals(streamStore, TTL_DURATION_CACHE_RULES); + const cacheIntervals = unexpiredCacheIntervals(streamStore, cacheSettings.ttlDurationMapping); const millisecondIntervals = subtractIntervals([start.getTime(), end.getTime()], cacheIntervals); return millisecondIntervals @@ -179,17 +167,19 @@ export const checkCacheForRecentPoint = ({ dataStreamId, resolution, start, + cacheSettings, }: { store: DataStreamsStore; dataStreamId: string; resolution: number; start: Date; + cacheSettings: CacheSettings; }) => { const streamStore = getDataStreamStore(dataStreamId, resolution, store); if (streamStore && streamStore.dataCache.intervals.length > 0) { const { dataCache } = streamStore; - const cacheIntervals = unexpiredCacheIntervals(streamStore, TTL_DURATION_CACHE_RULES); + const cacheIntervals = unexpiredCacheIntervals(streamStore, cacheSettings.ttlDurationMapping); const intersectedIntervals = intersect(cacheIntervals, dataCache.intervals); const interval = intersectedIntervals.find((inter) => inter[0] <= start.getTime() && start.getTime() <= inter[1]); diff --git a/packages/core/src/data-module/data-cache/types.ts b/packages/core/src/data-module/data-cache/types.ts index f26c4a490..633e3bc39 100755 --- a/packages/core/src/data-module/data-cache/types.ts +++ b/packages/core/src/data-module/data-cache/types.ts @@ -27,6 +27,7 @@ export type DataStreamStore = { isRefreshing: boolean; error?: string; }; + export type DataStreamsStore = { [dataStreamId: string]: | { @@ -34,3 +35,14 @@ export type DataStreamsStore = { } | undefined; }; + +export type CacheSettings = { + // Mapping of duration to TTL, in MS. + // Cache data is re-requested for data if the duration TTL is surpassed. + // If the duration since the last request was longer the any of the provided duration's, then the value never expires. + // INVARIANT: for any two pairs (durationMS, TTL), if a given durationMS is larger than another durationMS, it's TTL + // must also be larger + // i.e. given two pairs, (d1, ttl1) and (d2, ttl2), + // d1 > d2 iff ttl1 > ttl2 + ttlDurationMapping: TTLDurationMapping; +};