Skip to content

Commit

Permalink
chore: add memoryUsage code for investigation for memory leaks (#1230)
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta authored Apr 16, 2024
1 parent 663125e commit c343ec4
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 30 deletions.
2 changes: 1 addition & 1 deletion examples/nodejs/get-set-batch-perf-test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"scripts": {
"prebuild": "eslint . --ext .ts",
"build": "tsc",
"start-test": "tsc && node dist/perf-test.js",
"start-test": "tsc && node --heapsnapshot-signal=SIGUSR2 dist/perf-test.js",
"test": "jest",
"lint": "eslint . --ext .ts",
"format": "eslint . --ext .ts --fix"
Expand Down
151 changes: 122 additions & 29 deletions examples/nodejs/get-set-batch-perf-test/perf-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import {
CacheSet,
DefaultMomentoLoggerFactory,
DefaultMomentoLoggerLevel,
GetBatch,
MomentoLogger,
MomentoLoggerFactory,
SetBatch,
} from '@gomomento/sdk';
import {createCache, flushCache, getCacheClient} from './utils/cache';
import {
Expand Down Expand Up @@ -32,6 +34,14 @@ class PerfTest {
this.testConfiguration = testConfiguration;
}

private logMemoryUsage(): NodeJS.Timer {
return setInterval(() => {
for (const [key, value] of Object.entries(process.memoryUsage())) {
// this.logger.info(`Memory usage by ${key}, ${value / 1000000}MB `);
}
}, 5000); // Log memory usage every 5 seconds
}

async run(): Promise<void> {
const momento = await getCacheClient(
this.options.loggerFactory,
Expand All @@ -40,63 +50,134 @@ class PerfTest {
);
await createCache(momento, this.cacheName, this.logger);

this.logger.info('Starting async set requests');
await this.runAsyncSetRequests(momento);
const memoryUsageLogger = this.logMemoryUsage();

try {
this.logger.info('Starting async set requests');
await this.runAsyncSetRequests(momento);

this.logger.info('Starting async get requests');
await this.runAsyncGetRequests(momento);
this.logger.info('Starting async get requests');
await this.runAsyncGetRequests(momento);

this.logger.info('Starting set batch requests');
await this.runSetBatchTests(momento);
this.logger.info('Starting set batch requests');
await this.runSetBatchTests(momento);

this.logger.info('Starting get batch requests');
await this.runGetBatchTests(momento);
this.logger.info('Starting get batch requests');
await this.runGetBatchTests(momento);

// flush the cache
await flushCache(momento, this.cacheName, this.logger);
// flush the cache
await flushCache(momento, this.cacheName, this.logger);
} finally {
clearInterval(memoryUsageLogger);
}
}

async runAsyncSetRequests(momento: CacheClient): Promise<void> {
for (const setConfig of this.testConfiguration.sets) {
this.logger.info(
`Beginning run for ASYNC_SETS, batch size ${setConfig.batchSize}, item size ${setConfig.itemSizeBytes}`
);
let numLoops = 0;
const context = initiatePerfTestContext();
while (getElapsedMillis(context.startTime) < this.testConfiguration.minimumRunDurationSecondsForTests * 1000) {
numLoops++;
await this.sendAsyncSetRequests(momento, context, setConfig);
}
calculateSummary(context, setConfig.batchSize, setConfig.itemSizeBytes, RequestType.ASYNC_SETS, this.logger);
this.logger.info(
`Completed run for ASYNC_SETS, batch size ${setConfig.batchSize}, item size ${
setConfig.itemSizeBytes
}; num loops: ${numLoops}, elapsed duration: ${getElapsedMillis(context.startTime)}ms`
);
}
}

async runAsyncGetRequests(momento: CacheClient): Promise<void> {
for (const getConfig of this.testConfiguration.gets) {
this.logger.info(
`Populating cache for ASYNC_GETS, batch size ${getConfig.batchSize}, item size ${getConfig.itemSizeBytes}`
);
const cachePopulationStartTime = process.hrtime();
// ensure that the cache is populated with the keys
await this.ensureCacheIsPopulated(momento, getConfig);
this.logger.info(
`Populated cache for ASYNC_GETS, batch size ${getConfig.batchSize}, item size ${
getConfig.itemSizeBytes
} in ${getElapsedMillis(cachePopulationStartTime)}ms`
);
this.logger.info(
`Beginning run for ASYNC_GETS, batch size ${getConfig.batchSize}, item size ${getConfig.itemSizeBytes}`
);
let numLoops = 0;
const context = initiatePerfTestContext();
while (getElapsedMillis(context.startTime) < this.testConfiguration.minimumRunDurationSecondsForTests * 1000) {
// this.logger.info(`Looping; elapsed millis: ${getElapsedMillis(context.startTime)}`);
numLoops++;
await this.sendAsyncGetRequests(momento, context, getConfig);
}
calculateSummary(context, getConfig.batchSize, getConfig.itemSizeBytes, RequestType.ASYNC_GETS, this.logger);
this.logger.info(
`Completed run for ASYNC_GETS, batch size ${getConfig.batchSize}, item size ${
getConfig.itemSizeBytes
}; num loops: ${numLoops}, elapsed duration: ${getElapsedMillis(context.startTime)}ms`
);
}
}

async runSetBatchTests(momento: CacheClient): Promise<void> {
for (const setConfig of this.testConfiguration.sets) {
if (setConfig.batchSize * setConfig.itemSizeBytes >= 5 * 1024 * 1024) {
this.logger.info(
`Skipping run for SET_BATCH, batch size ${setConfig.batchSize}, item size ${setConfig.itemSizeBytes} would exceed max request size of 5MB`
);
continue;
}
this.logger.info(
`Beginning run for SET_BATCH, batch size ${setConfig.batchSize}, item size ${setConfig.itemSizeBytes}`
);
let numLoops = 0;
const context = initiatePerfTestContext();
while (getElapsedMillis(context.startTime) < this.testConfiguration.minimumRunDurationSecondsForTests * 1000) {
numLoops++;
await this.sendSetBatchRequests(momento, context, setConfig);
}
calculateSummary(context, setConfig.batchSize, setConfig.itemSizeBytes, RequestType.SET_BATCH, this.logger);
this.logger.info(
`Completed run for SET_BATCH, batch size ${setConfig.batchSize}, item size ${
setConfig.itemSizeBytes
}; num loops: ${numLoops}, elapsed duration: ${getElapsedMillis(context.startTime)}ms`
);
}
}

async runGetBatchTests(momento: CacheClient): Promise<void> {
for (const getConfig of this.testConfiguration.gets) {
this.logger.info(
`Populating cache for GET_BATCH, batch size ${getConfig.batchSize}, item size ${getConfig.itemSizeBytes}`
);
const cachePopulationStartTime = process.hrtime();
// ensure that the cache is populated with the keys
await this.ensureCacheIsPopulated(momento, getConfig);
this.logger.info(
`Populated cache for GET_BATCH, batch size ${getConfig.batchSize}, item size ${
getConfig.itemSizeBytes
} in ${getElapsedMillis(cachePopulationStartTime)}ms`
);
this.logger.info(
`Beginning run for GET_BATCH, batch size ${getConfig.batchSize}, item size ${getConfig.itemSizeBytes}`
);
let numLoops = 0;
const context = initiatePerfTestContext();
while (getElapsedMillis(context.startTime) < this.testConfiguration.minimumRunDurationSecondsForTests * 1000) {
numLoops++;
await this.sendGetBatchRequests(momento, context, getConfig);
}
calculateSummary(context, getConfig.batchSize, getConfig.itemSizeBytes, RequestType.GET_BATCH, this.logger);
this.logger.info(
`Completed run for GET_BATCH, batch size ${getConfig.batchSize}, item size ${
getConfig.itemSizeBytes
}; num loops: ${numLoops}, elapsed duration: ${getElapsedMillis(context.startTime)}ms`
);
}
}

Expand All @@ -114,10 +195,13 @@ class PerfTest {
setPromises.push(setPromise);
context.totalItemSizeBytes += setConfig.itemSizeBytes;
}
await Promise.all(setPromises).then(() => {
const setDuration = getElapsedMillis(setStartTime);
context.asyncSetLatencies.recordValue(setDuration);
});
const setResponses = await Promise.all(setPromises);
const setDuration = getElapsedMillis(setStartTime);
const error = setResponses.find(response => response instanceof CacheSet.Error);
if (error !== undefined) {
throw new Error(`Error in async sets: ${error.toString()}`);
}
context.asyncSetLatencies.recordValue(setDuration);
}

private async sendAsyncGetRequests(
Expand All @@ -133,10 +217,13 @@ class PerfTest {
getPromises.push(getPromise);
context.totalItemSizeBytes += getConfig.itemSizeBytes;
}
await Promise.all(getPromises).then(() => {
const setDuration = getElapsedMillis(getStartTime);
context.asyncGetLatencies.recordValue(setDuration);
});
const getResponses = await Promise.all(getPromises);
const getDuration = getElapsedMillis(getStartTime);
const error = getResponses.find(response => response instanceof CacheSet.Error);
if (error !== undefined) {
throw new Error(`Error in async gets: ${error.toString()}`);
}
context.asyncGetLatencies.recordValue(getDuration);
}

private async sendSetBatchRequests(
Expand All @@ -153,12 +240,13 @@ class PerfTest {
const setBatchStartTime = process.hrtime();
const setBatchPromise = momento.setBatch(this.cacheName, items);

void setBatchPromise.then(() => {
const setBatchDuration = getElapsedMillis(setBatchStartTime);
context.setBatchLatencies.recordValue(setBatchDuration);
});
context.totalItemSizeBytes += setConfig.batchSize * setConfig.itemSizeBytes;
await setBatchPromise;
const setBatchResponse = await setBatchPromise;
if (setBatchResponse instanceof SetBatch.Error) {
throw new Error(`Error setting batch: ${setBatchResponse.toString()}`);
}
const setBatchDuration = getElapsedMillis(setBatchStartTime);
context.setBatchLatencies.recordValue(setBatchDuration);
}

private async sendGetBatchRequests(
Expand All @@ -169,12 +257,13 @@ class PerfTest {
const keys = Array.from({length: getConfig.batchSize}, (_, i) => `key-${i}`);
const getBatchStartTime = process.hrtime();
const getBatchPromise = momento.getBatch(this.cacheName, keys);
void getBatchPromise.then(() => {
const getBatchDuration = getElapsedMillis(getBatchStartTime);
context.getBatchLatencies.recordValue(getBatchDuration);
});
context.totalItemSizeBytes += getConfig.batchSize * getConfig.itemSizeBytes;
await getBatchPromise;
const getBatchResponse = await getBatchPromise;
if (getBatchResponse instanceof GetBatch.Error) {
throw new Error(`Error getting batch: ${getBatchResponse.toString()}`);
}
const getBatchDuration = getElapsedMillis(getBatchStartTime);
context.getBatchLatencies.recordValue(getBatchDuration);
}

private async ensureCacheIsPopulated(momento: CacheClient, getConfig: GetSetConfig) {
Expand Down Expand Up @@ -205,7 +294,11 @@ function generateConfigurations(batchSizes: number[], itemSizes: number[]): GetS
const configurations: GetSetConfig[] = [];
for (const batchSize of batchSizes) {
for (const itemSize of itemSizes) {
configurations.push({batchSize, itemSizeBytes: itemSize});
// exclude permutations where the total payload is greater than 1GB, they are not realistic and
// will cause memory issues
if (batchSize * itemSize < 1024 * 1024 * 1024) {
configurations.push({batchSize, itemSizeBytes: itemSize});
}
}
}
return configurations;
Expand Down

0 comments on commit c343ec4

Please sign in to comment.