diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index a487291dd..e0dfb449f 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -138,7 +138,6 @@ export default class Indexer { } async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { - console.error('SHOULD NOT BE CALLED'); if (!isHistorical) { const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); if (cachedMessage) { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index ac29493d9..cd5bcdbbe 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -33,42 +33,41 @@ void (async function main () { if (streamType === 'real-time') { await handleHistoricalStream(streamKey); } - + // let next = '0'; // while (true) { - // const startTime = performance.now(); // try { + // const startTime = performance.now(); // const streamType = redisClient.getStreamType(streamKey); - // if (streamType === 'real-time') { - // const messages = await redisClient.getNextStreamMessage(streamKey); - // const indexerConfig = await redisClient.getStreamStorage(streamKey); - - // indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; - - // if (messages == null) { - // await sleep(1000); - // continue; - // } - - // const [{ id, message }] = messages; - - // const functions = { - // [indexerName]: { - // account_id: indexerConfig.account_id, - // function_name: indexerConfig.function_name, - // code: indexerConfig.code, - // schema: indexerConfig.schema, - // provisioned: false, - // }, - // }; - // await indexer.runFunctions(Number(message.block_height), functions, false, { - // provision: true, - // }); - - // await redisClient.deleteStreamMessage(streamKey, id); + // const messages = await redisClient.getNextStreamMessage(streamKey, 1, next); + // const indexerConfig = await redisClient.getStreamStorage(streamKey); + + // indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; + + // if (messages == null) { + // await sleep(1000); + // continue; // } - // const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + // const [{ id, message }] = messages; + + // const functions = { + // [indexerName]: { + // account_id: indexerConfig.account_id, + // function_name: indexerConfig.function_name, + // code: indexerConfig.code, + // schema: indexerConfig.schema, + // provisioned: false, + // }, + // }; + // await indexer.runFunctions(Number(message.block_height), functions, false, { + // provision: true, + // }); + + // // await redisClient.deleteStreamMessage(streamKey, id); + // next = incrementId(id); + + // const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey, next); // parentPort?.postMessage({ // type: 'UNPROCESSED_STREAM_MESSAGES', @@ -91,6 +90,7 @@ void (async function main () { // } })(); +// eslint-disable-next-line @typescript-eslint/no-unused-vars async function handleHistoricalStream (streamKey: string): Promise { void historicalStreamerMessageQueueProducer(queue, streamKey); void historicalStreamerMessageQueueConsumer(queue, streamKey); @@ -150,7 +150,7 @@ async function historicalStreamerMessageQueueConsumer (queue: Array