Skip to content

Commit

Permalink
expose batchDuration as a configurable setting on sitewise source
Browse files Browse the repository at this point in the history
  • Loading branch information
boweihan committed Jun 17, 2022
1 parent c6383d0 commit 7cd7e5f
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 24 deletions.
13 changes: 8 additions & 5 deletions packages/source-iotsitewise/src/initialize.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SiteWiseTimeSeriesDataProvider } from './time-series-data/provider';
import { IotAppKitDataModule, TreeQuery, TimeQuery, TimeSeriesData, TimeSeriesDataRequest } from '@iot-app-kit/core';
import { SiteWiseAssetQuery } from './time-series-data/types';
import { SiteWiseAssetQuery, SiteWiseDataSourceSettings } from './time-series-data/types';
import {
BranchReference,
RootedSiteWiseAssetTreeQueryArguments,
Expand All @@ -18,7 +18,7 @@ import { Credentials, Provider as AWSCredentialsProvider } from '@aws-sdk/types'
import { IoTSiteWiseClient } from '@aws-sdk/client-iotsitewise';
import { assetSession } from './sessions';

type IoTAppKitInitInputs =
type SiteWiseDataSourceInitInputs = (
| {
registerDataSources?: boolean;
iotSiteWiseClient: IoTSiteWiseClient;
Expand All @@ -27,7 +27,10 @@ type IoTAppKitInitInputs =
registerDataSources?: boolean;
awsCredentials: Credentials | AWSCredentialsProvider<Credentials>;
awsRegion: string;
};
}
) & {
settings?: SiteWiseDataSourceSettings;
};

export type SiteWiseQuery = {
timeSeriesData: (query: SiteWiseAssetQuery) => TimeQuery<TimeSeriesData[], TimeSeriesDataRequest>;
Expand All @@ -43,7 +46,7 @@ export type SiteWiseQuery = {
* @param awsCredentials - https://www.npmjs.com/package/@aws-sdk/credential-providers
* @param awsRegion - Region for AWS based data sources to point towards, i.e. us-east-1
*/
export const initialize = (input: IoTAppKitInitInputs) => {
export const initialize = (input: SiteWiseDataSourceInitInputs) => {
const siteWiseTimeSeriesModule = new IotAppKitDataModule();
const siteWiseSdk =
'iotSiteWiseClient' in input ? input.iotSiteWiseClient : sitewiseSdk(input.awsCredentials, input.awsRegion);
Expand All @@ -53,7 +56,7 @@ export const initialize = (input: IoTAppKitInitInputs) => {

if (input.registerDataSources !== false) {
/** Automatically registered data sources */
siteWiseTimeSeriesModule.registerDataSource(createDataSource(siteWiseSdk));
siteWiseTimeSeriesModule.registerDataSource(createDataSource(siteWiseSdk, input.settings));
}

return {
Expand Down
143 changes: 143 additions & 0 deletions packages/source-iotsitewise/src/time-series-data/client/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,146 @@ describe('getAggregatedPropertyDataPoints', () => {
expect(onSuccess.mock.calls).toEqual([onSuccessParams1, onSuccessParams2, onSuccessParams1, onSuccessParams2]);
});
});

describe('batch duration', () => {
beforeAll(() => {
jest.useFakeTimers('modern');
});

afterAll(() => {
jest.useRealTimers();
});

it('batches requests over a single frame', async () => {
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_DOUBLE_VALUE);
const assetId = 'some-asset-id';
const propertyId = 'some-property-id';

const onSuccess = jest.fn();
const onError = jest.fn();

const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }));

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const resolution = '0';

const requestInformation = {
id: toId({ assetId, propertyId }),
start: startDate,
end: endDate,
resolution,
fetchMostRecentBeforeEnd: true,
};

// single frame
client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});
client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises(); // clear promise queue
jest.advanceTimersByTime(0); // ensure latest requests are enqueued

// process the batch
expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

// now split into two frames
batchGetAssetPropertyValue.mockClear();

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises(); // clear promise queue
jest.advanceTimersByTime(0); // ensure latest requests are enqueued

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises(); // clear promise queue
jest.advanceTimersByTime(0); // ensure latest requests are enqueued

expect(batchGetAssetPropertyValue).toBeCalledTimes(2);
});

it('batches requests over a specified duration', async () => {
const batchGetAssetPropertyValue = jest.fn().mockResolvedValue(BATCH_ASSET_PROPERTY_DOUBLE_VALUE);
const assetId = 'some-asset-id';
const propertyId = 'some-property-id';

const onSuccess = jest.fn();
const onError = jest.fn();

const client = new SiteWiseClient(createMockSiteWiseSDK({ batchGetAssetPropertyValue }), { batchDuration: 100 });

const startDate = new Date(2000, 0, 0);
const endDate = new Date(2001, 0, 0);
const resolution = '0';

const requestInformation = {
id: toId({ assetId, propertyId }),
start: startDate,
end: endDate,
resolution,
fetchMostRecentBeforeEnd: true,
};

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises(); // clear promise queue
jest.advanceTimersByTime(50); // ensure latest requests are enqueued but not outside of batch window

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises(); // clear promise queue
jest.advanceTimersByTime(100); // ensure latest requests are enqueued and outside of batch window

await flushPromises();

// process the batch and paginate once
expect(batchGetAssetPropertyValue).toBeCalledTimes(1);

// now split into two separate batch windows
batchGetAssetPropertyValue.mockClear();

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises();
jest.advanceTimersByTime(150); // ensure latest requests are enqueued and outside of batch window

client.getLatestPropertyDataPoint({
requestInformations: [requestInformation],
onSuccess,
onError,
});

await flushPromises();
jest.advanceTimersByTime(150); // ensure latest requests are enqueued and outside of batch window

expect(batchGetAssetPropertyValue).toBeCalledTimes(2);
});
});
53 changes: 37 additions & 16 deletions packages/source-iotsitewise/src/time-series-data/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { batchGetHistoricalPropertyDataPoints } from './batchGetHistoricalProper
import { OnSuccessCallback, ErrorCallback, RequestInformationAndRange } from '@iot-app-kit/core';
import { batchGetAggregatedPropertyDataPoints } from './batchGetAggregatedPropertyDataPoints';
import { batchGetLatestPropertyDataPoints } from './batchGetLatestPropertyDataPoints';
import { SiteWiseDataSourceSettings } from '../types';

export type LatestPropertyParams = {
requestInformations: RequestInformationAndRange[];
Expand All @@ -28,39 +29,59 @@ export type AggregatedPropertyParams = {

export class SiteWiseClient {
private siteWiseSdk: IoTSiteWiseClient;
private settings: SiteWiseDataSourceSettings;

private latestPropertyDataLoader: DataLoader<LatestPropertyParams, void>;
private historicalPropertyDataLoader: DataLoader<HistoricalPropertyParams, void>;
private aggregatedPropertyDataLoader: DataLoader<AggregatedPropertyParams, void>;

constructor(siteWiseSdk: IoTSiteWiseClient) {
constructor(siteWiseSdk: IoTSiteWiseClient, settings: SiteWiseDataSourceSettings = {}) {
this.siteWiseSdk = siteWiseSdk;
this.settings = settings;
this.instantiateDataLoaders();
}

/**
* Instantiate batch data loaders for latest, historical, and aggregated data.
* by default, data loaders will schedule batches for each frame of execution which ensures
* no additional latency when capturing many related requests in a single batch.
*
* @todo: adjust batch frequency for optimal sitewise request batching (latency vs. #requests)
* @todo: switch out existing APIs for batch APIs
*/
private instantiateDataLoaders() {
this.latestPropertyDataLoader = new DataLoader<LatestPropertyParams, void>(async (keys) => {
batchGetLatestPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined); // values are updated in data cache and don't need to be rebroadcast
});
this.latestPropertyDataLoader = new DataLoader<LatestPropertyParams, void>(
async (keys) => {
batchGetLatestPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined); // values are updated in data cache and don't need to be rebroadcast
},
{
batchScheduleFn: this.settings.batchDuration
? (callback) => setTimeout(callback, this.settings.batchDuration)
: undefined,
}
);

this.historicalPropertyDataLoader = new DataLoader<HistoricalPropertyParams, void>(async (keys) => {
batchGetHistoricalPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined);
});
this.historicalPropertyDataLoader = new DataLoader<HistoricalPropertyParams, void>(
async (keys) => {
batchGetHistoricalPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined);
},
{
batchScheduleFn: this.settings.batchDuration
? (callback) => setTimeout(callback, this.settings.batchDuration)
: undefined,
}
);

this.aggregatedPropertyDataLoader = new DataLoader<AggregatedPropertyParams, void>(async (keys) => {
batchGetAggregatedPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined);
});
this.aggregatedPropertyDataLoader = new DataLoader<AggregatedPropertyParams, void>(
async (keys) => {
batchGetAggregatedPropertyDataPoints({ params: keys.flat(), client: this.siteWiseSdk });
return keys.map(() => undefined);
},
{
batchScheduleFn: this.settings.batchDuration
? (callback) => setTimeout(callback, this.settings.batchDuration)
: undefined,
}
);
}

getLatestPropertyDataPoint(params: LatestPropertyParams): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IoTSiteWiseClient, AggregateType } from '@aws-sdk/client-iotsitewise';
import { SiteWiseDataStreamQuery } from './types';
import { SiteWiseDataSourceSettings, SiteWiseDataStreamQuery } from './types';
import { SiteWiseClient } from './client/client';
import { toId } from './util/dataStreamId';
import {
Expand Down Expand Up @@ -60,8 +60,11 @@ export const determineResolution = ({
}
};

export const createDataSource = (siteWise: IoTSiteWiseClient): DataSource<SiteWiseDataStreamQuery> => {
const client = new SiteWiseClient(siteWise);
export const createDataSource = (
siteWise: IoTSiteWiseClient,
settings?: SiteWiseDataSourceSettings
): DataSource<SiteWiseDataStreamQuery> => {
const client = new SiteWiseClient(siteWise, settings);
return {
name: SITEWISE_DATA_SOURCE,
initiateRequest: ({ onSuccess, onError }, requestInformations) =>
Expand Down
4 changes: 4 additions & 0 deletions packages/source-iotsitewise/src/time-series-data/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ export type SiteWiseAssetQuery = {
export type SiteWiseAssetDataStreamQuery = DataStreamQuery & SiteWiseAssetQuery;

export type SiteWiseDataStreamQuery = SiteWiseAssetDataStreamQuery;

export type SiteWiseDataSourceSettings = {
batchDuration?: number;
};

0 comments on commit 7cd7e5f

Please sign in to comment.