From c3606137e6f2377b9795014350ecc9fabf5a9db2 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Wed, 22 Jun 2022 13:43:39 +0200 Subject: [PATCH 1/2] Fix invalid KafkaMessage type The `size` attribute is only ever present on pre-0.10 `Messages`, and never on the new `Record`. The field was mistakenly defined as being non-optional. Providing a meaningful `size` field for Record is tricky because in RecordBatch all the records are compressed together, rather than individually as is the case with the old Message protocol. Therefore you can only calculate a size for the uncompressed record, which isn't very useful since you most likely care about the size because you want to understand the size over the network. --- types/index.d.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index ff168b24c..0f90bdeb9 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -631,16 +631,26 @@ export type Broker = { }): Promise } -export type KafkaMessage = { +interface MessageSetEntry { key: Buffer | null value: Buffer | null timestamp: string + attributes: number + offset: string size: number +} + +interface RecordBatchEntry { + key: Buffer | null + value: Buffer | null + timestamp: string attributes: number offset: string - headers?: IHeaders + headers: IHeaders } +export type KafkaMessage = MessageSetEntry | RecordBatchEntry + export interface ProducerRecord { topic: string messages: Message[] From e1b016e171e0246c1115d689d8b7e7c7a28cd068 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 28 Jun 2022 11:16:40 +0200 Subject: [PATCH 2/2] Make it possible to narrow between message types --- types/index.d.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/types/index.d.ts b/types/index.d.ts index 0f90bdeb9..23f33ba38 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -638,6 +638,7 @@ interface MessageSetEntry { attributes: number offset: string size: number + headers?: never } interface RecordBatchEntry { @@ -647,6 +648,7 @@ interface RecordBatchEntry { attributes: number offset: string headers: IHeaders + size?: never } export type KafkaMessage = MessageSetEntry | RecordBatchEntry