From 69231727d745fa6008099c48812281c7e3a79f26 Mon Sep 17 00:00:00 2001 From: Tzach Shabtay Date: Mon, 26 Apr 2021 23:50:25 -0400 Subject: [PATCH] performance improvement --- package-lock.json | 33 ++-- package.json | 1 + src/client/kafka/messages/fetcher.tsx | 16 +- .../kafka/messages/multi_topics_input.tsx | 2 +- .../kafka/messages/single_topic_input.tsx | 2 +- src/server/server.ts | 180 ++++++++++++------ 6 files changed, 160 insertions(+), 74 deletions(-) diff --git a/package-lock.json b/package-lock.json index 68fdce6..cb22753 100644 --- a/package-lock.json +++ b/package-lock.json @@ -143,7 +143,6 @@ "version": "1.19.0", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.0.tgz", "integrity": "sha1-BoWzxH6zAG/+0RfN1VFkth+AU48=", - "dev": true, "requires": { "@types/connect": "*", "@types/node": "*" @@ -153,7 +152,6 @@ "version": "3.4.34", "resolved": "https://registry.npmjs.org/@types/connect/-/connect-3.4.34.tgz", "integrity": "sha1-FwpAIjptZmAG2TyhKK8r6x2bGQE=", - "dev": true, "requires": { "@types/node": "*" } @@ -188,7 +186,6 @@ "version": "4.17.11", "resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.11.tgz", "integrity": "sha1-3r48qm+OX82pa0e9VOL0DE7llUU=", - "dev": true, "requires": { "@types/body-parser": "*", "@types/express-serve-static-core": "^4.17.18", @@ -200,7 +197,6 @@ "version": "4.17.19", "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.19.tgz", "integrity": "sha1-AKz8FjLnKaysTxUw6eFvbdFQih0=", - "dev": true, "requires": { "@types/node": "*", "@types/qs": "*", @@ -227,14 +223,12 @@ "@types/mime": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz", - "integrity": "sha1-k+Jb+e51/g/YC1lLxP6w6GIRG1o=", - "dev": true + "integrity": "sha1-k+Jb+e51/g/YC1lLxP6w6GIRG1o=" }, "@types/node": { "version": "12.20.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-12.20.7.tgz", - "integrity": "sha1-HLYf0MhcuH5yjEMQe1/YK2m8nvg=", - "dev": true + "integrity": "sha1-HLYf0MhcuH5yjEMQe1/YK2m8nvg=" }, "@types/prop-types": { "version": "15.7.3", @@ -244,14 +238,12 @@ "@types/qs": { "version": "6.9.6", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.6.tgz", - "integrity": "sha1-35w8izGiR+wxXmmWVmvjFx30s7E=", - "dev": true + "integrity": "sha1-35w8izGiR+wxXmmWVmvjFx30s7E=" }, "@types/range-parser": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.3.tgz", - "integrity": "sha1-fuMwunyq+5gJC+zoal7kQRWQTCw=", - "dev": true + "integrity": "sha1-fuMwunyq+5gJC+zoal7kQRWQTCw=" }, "@types/react": { "version": "17.0.3", @@ -317,7 +309,6 @@ "version": "1.13.9", "resolved": "https://registry.npmjs.org/@types/serve-static/-/serve-static-1.13.9.tgz", "integrity": "sha1-qs8oqFoF7imhH7fD6tk1rFbzPk4=", - "dev": true, "requires": { "@types/mime": "^1", "@types/node": "*" @@ -1889,6 +1880,11 @@ "ee-first": "1.1.1" } }, + "on-headers": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/on-headers/-/on-headers-1.0.2.tgz", + "integrity": "sha1-dysK5qqlJcOZ5Imt+tkMQD6zwo8=" + }, "once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -2436,6 +2432,17 @@ "send": "0.17.1" } }, + "server-timing": { + "version": "3.3.1", + "resolved": "https://registry.npmjs.org/server-timing/-/server-timing-3.3.1.tgz", + "integrity": "sha1-4hxe8058LJHSCv31JZFUQpMHN4E=", + "requires": { + "@types/express": "^4.17.3", + "@types/node": "^12.12.30", + "minimist": "^1.2.5", + "on-headers": "^1.0.2" + } + }, "setimmediate": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/setimmediate/-/setimmediate-1.0.5.tgz", diff --git a/package.json b/package.json index b717fa1..7b7a06d 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "react-dom": "^17.0.2", "react-json-view": "^1.21.1", "react-router-dom": "^5.2.0", + "server-timing": "^3.3.1", "uuid": "^8.1.0" } } diff --git a/src/client/kafka/messages/fetcher.tsx b/src/client/kafka/messages/fetcher.tsx index c083ffb..5413c84 100644 --- a/src/client/kafka/messages/fetcher.tsx +++ b/src/client/kafka/messages/fetcher.tsx @@ -157,7 +157,11 @@ export class Fetcher extends React.Component { await this.updateSearch() if (cancelToken.Aborted) return this.props.url.Subscribe(this.onUrlChanged) - await this.fetchMessages(10000, cancelToken) + let timeout = 10000 + if (this.state.limit && this.state.limit > 100) { + timeout = 20000 + } + await this.fetchMessages(timeout, cancelToken) } onFetchMessagesClicked = async () => { @@ -238,7 +242,11 @@ export class Fetcher extends React.Component { if (offsets === undefined) { return } - await this.setStateAsync({offset: parseInt(offsets.high) - this.state.limit}) + let offset = parseInt(offsets.high) - this.state.limit + if (offset < parseInt(offsets.low)) { + offset = parseInt(offsets.low) + } + await this.setStateAsync({offset}) out = await this.fetchMessagesForPartition(topic, timeout, partition, out, cancelToken) } } @@ -321,8 +329,8 @@ export class Fetcher extends React.Component { } const max = cursor + this.state.limit let limit = this.state.limit - if (limit > 1000) { - limit = 1000 + if (limit > 10000) { + limit = 10000 } if (cursor >= max) { this.props.onDataFetched(out) diff --git a/src/client/kafka/messages/multi_topics_input.tsx b/src/client/kafka/messages/multi_topics_input.tsx index 08e28c6..4374b8b 100644 --- a/src/client/kafka/messages/multi_topics_input.tsx +++ b/src/client/kafka/messages/multi_topics_input.tsx @@ -115,7 +115,7 @@ export class MultiTopicsInput extends React.Component { toTime={this.props.toTime} searchBy={this.props.searchBy} onDataFetched={this.props.onDataFetched} - onDataFetchStarted={() => { this.setState({loadingMessages: true}); this.props.onDataFetchStarted} } + onDataFetchStarted={() => { this.setState({loadingMessages: true}); this.props.onDataFetchStarted(AllPartitions)} } onDataFetchCompleted={() => this.setState({loadingMessages: false})} loadingMessages={this.state.loadingMessages} error={this.state.error ?? ""} diff --git a/src/client/kafka/messages/single_topic_input.tsx b/src/client/kafka/messages/single_topic_input.tsx index 4c9359f..578e636 100644 --- a/src/client/kafka/messages/single_topic_input.tsx +++ b/src/client/kafka/messages/single_topic_input.tsx @@ -134,7 +134,7 @@ export class SingleTopicInput extends React.Component { toTime={this.props.toTime} searchBy={this.props.searchBy} onDataFetched={this.props.onDataFetched} - onDataFetchStarted={() => { this.setState({loadingMessages: true}); this.props.onDataFetchStarted} } + onDataFetchStarted={() => { this.setState({loadingMessages: true}); this.props.onDataFetchStarted(this.state.partition)} } onDataFetchCompleted={() => this.setState({loadingMessages: false})} loadingMessages={this.state.loadingMessages} error={this.state.error ?? ""} diff --git a/src/server/server.ts b/src/server/server.ts index c13db0f..1312c5b 100644 --- a/src/server/server.ts +++ b/src/server/server.ts @@ -1,7 +1,8 @@ import express from "express"; +const serverTiming = require('server-timing'); import http from "http"; import path from "path"; -import { Kafka, ResourceTypes, DescribeConfigResponse, Consumer, Admin, GroupDescriptions } from "kafkajs"; +import { Kafka, ConfigResourceTypes, DescribeConfigResponse, Consumer, Admin, GroupDescriptions, EachBatchPayload } from "kafkajs"; import { SchemaRegistry, SchemaVersion } from '@ovotech/avro-kafkajs'; import { Schema, Type } from "avsc"; import { v4 as uuidv4 } from 'uuid'; @@ -10,7 +11,11 @@ import { GetTopicsResult, GetTopicResult, TopicsOffsets, ConsumerOffsets, TopicC import { SearchStyle, Includes } from "../shared/search"; const fetch = require("node-fetch"); -type TopicQueryInput = { topic: string, partition: number, limit: number, offset: number, search: string, timeout?: number, searchStyle: SearchStyle} +console.log(`KAFKA_URLS=${KAFKA_URLS}`) +console.log(`SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL}`) +console.log(`KAFKA_CONNECT_URL=${KAFKA_CONNECT_URL}`) + +type TopicQueryInput = { topic: string, partition: number, limit: number, offset: number, search: string, timeout?: number, searchStyle: SearchStyle, trace: boolean} const schemaRegistry = new SchemaRegistry({ uri: SCHEMA_REGISTRY_URL }); @@ -44,6 +49,8 @@ const kafka = new KafkaClient(); const app = express(); +app.use(serverTiming()); + app.set("view engine", "ejs"); app.set("views", "public"); @@ -215,6 +222,7 @@ app.get("/api/messages/:topic/:partition", async (req, res) => { const search = req.query.search ? req.query.search as string : "" const searchStyle = req.query.search_style ? req.query.search_style as SearchStyle : "" const timeout = req.query.timeout ? parseInt(req.query.timeout.toString()) : 20000 + const trace = req.query.trace === `true` const partition = parseInt(req.params.partition) const partitions = await withRetry("fetchTopicOffsets", () => kafka.Admin.fetchTopicOffsets(topic)) for (const partitionOffsets of partitions) { @@ -233,7 +241,7 @@ app.get("/api/messages/:topic/:partition", async (req, res) => { res.status(200).json({messages: []}) return } - const messages: GetTopicMessagesResult = await getMessages({topic, partition, limit, offset, search, searchStyle, timeout}) + const messages: GetTopicMessagesResult = await getMessages({topic, partition, limit, offset, search, searchStyle, timeout, trace}, res) res.status(200).json(messages) return } @@ -371,7 +379,7 @@ const getTopicConfig = async (topic: string): Promise => includeSynonyms: false, resources: [ { - type: ResourceTypes.TOPIC, + type: ConfigResourceTypes.TOPIC, name: topic } ] @@ -395,15 +403,26 @@ const getTopicConsumerGroups = async (topic: string): Promise => { - const groupId = `krowser-${Date.now()}=${uuidv4()}` - const consumer = kafka.Kafka.consumer({ groupId }) +const endBatch = (res: any, batchIdx: number) => { + res.endTime(`batch_${batchIdx}`); + res.startTime(`time_to_batch${batchIdx+1}`, `time to batch ${batchIdx+1}`) +} + +const getMessages = async (input: TopicQueryInput, res: any): Promise => { + const groupId = `krowser-${Date.now()}-${uuidv4()}` + res.startTime('connect_kafka', 'connect kafka'); + const consumer = kafka.Kafka.consumer({ groupId, allowAutoTopicCreation: false }) await consumer.connect() + res.endTime('connect_kafka'); const messages: TopicMessage[] = [] let numConsumed = 0 + let batchIdx = 0 + const endOffset = input.offset + input.limit - 1 console.log(`Querying topic ${input.topic} (partition ${input.partition}) at offset=${input.offset}, limit=${input.limit}`) - consumer.subscribe({ topic: input.topic, fromBeginning: true }) + res.startTime('subscribe', 'subscribe'); + consumer.subscribe({ topic: input.topic, fromBeginning: false }) + res.endTime('subscribe'); const consumed: Set = new Set() let hasTimeout = false const p = new Promise(async (resolve, reject) => { @@ -411,78 +430,129 @@ const getMessages = async (input: TopicQueryInput): Promise => { hasTimeout = true resolve() }, input.timeout || 20000); + res.startTime(`time_to_batch${batchIdx+1}`, `time to batch ${batchIdx+1}`) await consumer.run({ autoCommit: false, - eachMessage: async ({ topic, partition, message }) => { - if (partition !== input.partition) { - console.log(`ignoreing message from partition ${partition} (offset ${message.offset}), expecting partition ${input.partition}`) + eachBatchAutoResolve: true, + eachBatch: async (payload: EachBatchPayload) => { + batchIdx += 1 + res.endTime(`time_to_batch${batchIdx}`) + if (payload.batch.partition !== input.partition) { + console.log(`Ignoring batch from partition ${payload.batch.partition}, expecting partition ${input.partition}`) + endBatch(res, batchIdx) return } - console.log(`---MESSAGE: ${message.offset}---`) - let schemaType : Type | undefined = undefined; - if (message.value === null) { - console.log(`Message value is null`) - } else { - try { - const { type, value } = await schemaRegistry.decodeWithType(message.value); - message.value = value; - schemaType = type; - } catch (error) { - console.log(`Not an avro message? error: ${error}`); - } + if (payload.batch.topic !== input.topic) { + console.log(`Ignoring batch from a different topic: ${payload.batch.topic} (expecting ${input.topic})`) + endBatch(res, batchIdx) + return } - const value = message.value ? message.value.toString() : ""; - const key = message.key ? message.key.toString() : ""; - console.log({ - partition, - offset: message.offset, - value: value, - schemaType: schemaType, - key: key, - }) - - if (topic !== input.topic) { - console.log(`Ignoring message from a different topic: ${topic} (expecting ${input.topic})`) + const firstOffset = payload.batch.firstOffset() + if (firstOffset === null) { + console.log(`Ignoring batch with no first offset`) + endBatch(res, batchIdx) return } - - if (consumed.has(message.offset)) { - console.log(`Ignoring duplicate message from offset ${message.offset}`) + const low = parseInt(firstOffset) + if (low > endOffset) { + console.log(`Ignoring batch with a too high offset ${low} (expecting ${input.offset}-${endOffset})`) + endBatch(res, batchIdx) return } - consumed.add(message.offset) - - const offset = parseInt(message.offset) - if (offset < input.offset) { - console.log(`Ignoring message from an old offset: ${offset} (expecting at least ${input.offset})`) + const lastOffset = payload.batch.lastOffset() + if (lastOffset === null) { + console.log(`Ignoring batch with no last offset`) + endBatch(res, batchIdx) return } - numConsumed++ - let filteredOut = false - if (input.search) { - const text: string = `${value},${key},${schemaType?.name ?? ""}` - if (!Includes(text, input.search, input.searchStyle)) { - filteredOut = true - console.log(`Ignoring message from offset ${message.offset}, filtered out by search`) - } + const high = parseInt(lastOffset) + if (high < input.offset) { + console.log(`Ignoring batch with a too low offset ${high} (expecting ${input.offset}-${endOffset})`) + endBatch(res, batchIdx) + return } - if (!filteredOut) { - messages.push({ topic, partition, message, key, value, schemaType }) + console.log(`---Batch: ${payload.batch.firstOffset()} - ${payload.batch.lastOffset()} (len: ${payload.batch.messages.length})`) + res.startTime(`batch_${batchIdx}`, `batch ${batchIdx}`); + for (const message of payload.batch.messages) { + if (input.trace) { + console.log(`---MESSAGE: ${message.offset}---`) + } + let schemaType : Type | undefined = undefined; + if (message.value === null) { + console.log(`Message value is null`) + } else { + try { + const { type, value } = await schemaRegistry.decodeWithType(message.value); + message.value = value; + schemaType = type; + } catch (error) { + if (input.trace) { + console.log(`Not an avro message? error: ${error}`); + } + } + } + const value = message.value ? message.value.toString() : ""; + const key = message.key ? message.key.toString() : ""; + if (input.trace) { + console.log({ + partition: payload.batch.partition, + offset: message.offset, + value: value, + schemaType: schemaType, + key: key, + }) + } + + if (consumed.has(message.offset)) { + console.log(`Ignoring duplicate message from offset ${message.offset}`) + continue + } + consumed.add(message.offset) + + const offset = parseInt(message.offset) + if (offset < input.offset) { + console.log(`Ignoring message from an old offset: ${offset} (expecting at least ${input.offset})`) + continue + } + numConsumed++ + let filteredOut = false + if (input.search) { + const text: string = `${value},${key},${schemaType?.name ?? ""}` + if (!Includes(text, input.search, input.searchStyle)) { + filteredOut = true + if (input.trace) { + console.log(`Ignoring message from offset ${message.offset}, filtered out by search`) + } + } + } + if (!filteredOut) { + messages.push({ topic: payload.batch.topic, partition: payload.batch.partition, message, key, value, schemaType }) + } + if (numConsumed >= input.limit || offset >= endOffset) { + break + } } - if (numConsumed >= input.limit) { + if (numConsumed >= input.limit || high >= endOffset) { resolve() } - }, + endBatch(res, batchIdx) + } }) }) + res.startTime('seek', 'seek'); consumer.seek({ topic: input.topic, partition: input.partition, offset: input.offset.toString() }) + res.endTime('seek'); try { + res.startTime('consume', 'consume'); await p; + res.endTime('consume'); return { messages, hasTimeout } } finally { + res.startTime('cleanupConsumer', 'cleanup consumer'); cleanupConsumer(consumer, groupId) //not awaiting this as we don't want to block the response + res.endTime('cleanupConsumer'); } }