-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Metrics for Memory Statistics #554
Conversation
@@ -61,12 +73,22 @@ export default class StreamHandler { | |||
indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => { | |||
console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e); | |||
}); | |||
indexer.writeLog(functionName, this.executorContext.block_height, `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}`).catch((e) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible for an indexer to crash before it even gets a block to process. At which point a V1 indexer will end up logging the failure under block height 0. A V2 indexer would log it under start block height, which would be accurate.
await sleep(100); | ||
continue; | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a try catch just in case. Redis hasn't failed us yet but if it does ever timeout, I would like to have this code retry and not crash.
} | ||
} | ||
|
||
async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise<void> { | ||
const indexer = new Indexer(); | ||
const isHistorical = workerContext.streamType === 'historical'; | ||
let streamMessageId = ''; | ||
let indexerName = ''; | ||
let indexerName = streamKey.split(':')[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was sneaky. We set the indexerName only after we read indexer config for the first time, which itself only happens when a block is ready to be processed. So, metrics under finally were being written under indexerName of empty, for the short period before a block is ready (During fresh start of Runner). Setting an initial value off the streamKey works, since its the same value. Plus, we will get to refactor this to just take in the input config after.
@@ -114,6 +119,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri | |||
} | |||
const block = queueMessage.block; | |||
currBlockHeight = block.blockHeight; | |||
const blockHeightMessage: WorkerMessage = { type: WorkerMessageType.BLOCK_HEIGHT, data: currBlockHeight }; | |||
parentPort?.postMessage(blockHeightMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to put this under finally but decided to put this before runFunctions and as soon as we get the block height so that if runFunctions somehow crashes the executor, we put the error on the correct block height.
This could be me overcorrecting though. We can also put it in finally and change the structure of WorkerMessage to get both a block height AND metric data.
Hey @morgsmccauley I need to get these in to test fixes for the memory errors today, since it keeps breaking prod. I'm gonna merge them since the changes are minor. Please look over it again whenever you can! |
Runner has crashed previously due to Out of Memory errors. It's unclear what settings need to be increased to resolve the error, and if the adjustments properly address the underlying problems. To help understand the limits of the Runner service, I've added metrics to capture individual worker heap sizes, which contribute largely to any OOM related problems. In addition to getting each worker's usage, we also can sum them to get overall heap allocation and utilization. I've also logged metrics for the prefetch queue size so that we can understand the relation between prefetch and the memory foot print.
In addition some smaller tasks were addressed as well: