From ef576d019b18dad5f0afa5a87dd8188632b7b681 Mon Sep 17 00:00:00 2001 From: esara Date: Thu, 6 Jun 2024 01:02:28 -0400 Subject: [PATCH] check kafka pkt length before parsing clientid --- .../ebpf/common/kafka_detect_transform.go | 99 ++++++++++++------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/pkg/internal/ebpf/common/kafka_detect_transform.go b/pkg/internal/ebpf/common/kafka_detect_transform.go index 154bc880a..8b4de35b9 100644 --- a/pkg/internal/ebpf/common/kafka_detect_transform.go +++ b/pkg/internal/ebpf/common/kafka_detect_transform.go @@ -42,7 +42,7 @@ func (k Operation) String() string { } } -const KafaMinLength = 14 +const KafkaMinLength = 14 // ProcessKafkaRequest processes a TCP packet and returns error if the packet is not a valid Kafka request. // Otherwise, return kafka.Info with the processed data. @@ -57,49 +57,29 @@ func ProcessPossibleKafkaEvent(pkt []byte, rpkt []byte) (*KafkaInfo, error) { func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) { k := &KafkaInfo{} - if len(pkt) < KafaMinLength { + if len(pkt) < KafkaMinLength { return k, errors.New("packet too short") } - header := &Header{ - MessageSize: int32(binary.BigEndian.Uint32(pkt[0:4])), - APIKey: int16(binary.BigEndian.Uint16(pkt[4:6])), - APIVersion: int16(binary.BigEndian.Uint16(pkt[6:8])), - CorrelationID: int32(binary.BigEndian.Uint32(pkt[8:12])), - ClientIDSize: int16(binary.BigEndian.Uint16(pkt[12:14])), + header, err := parseKafkaHeader(pkt) + if err != nil { + return k, err } - if !isValidKafkaHeader(header) { - return k, errors.New("invalid Kafka request header") + if len(pkt) < KafkaMinLength+int(header.ClientIDSize) { + return k, errors.New("packet too short") } - offset := KafaMinLength - if header.ClientIDSize > 0 { - clientID := pkt[offset : offset+int(header.ClientIDSize)] - if !isValidClientID(clientID, int(header.ClientIDSize)) { - return k, errors.New("invalid client ID") - } - offset += int(header.ClientIDSize) - k.ClientID = string(clientID) - } else if header.ClientIDSize < -1 { - return k, errors.New("invalid client ID size") + offset, err := processClientID(header, pkt, k) + if err != nil { + return k, err } - switch Operation(header.APIKey) { - case Produce: - ok, err := getTopicOffsetFromProduceOperation(header, pkt, &offset) - if !ok || err != nil { - return k, err - } - k.Operation = Produce - k.TopicOffset = offset - case Fetch: - offset += getTopicOffsetFromFetchOperation(header) - k.Operation = Fetch - k.TopicOffset = offset - default: - return k, errors.New("invalid Kafka operation") + err = processKafkaOperation(header, pkt, k, &offset) + if err != nil { + return k, err } + topic, err := getTopicName(pkt, offset) if err != nil { return k, err @@ -108,8 +88,23 @@ func ProcessKafkaRequest(pkt []byte) (*KafkaInfo, error) { return k, nil } +func parseKafkaHeader(pkt []byte) (*Header, error) { + header := &Header{ + MessageSize: int32(binary.BigEndian.Uint32(pkt[0:4])), + APIKey: int16(binary.BigEndian.Uint16(pkt[4:6])), + APIVersion: int16(binary.BigEndian.Uint16(pkt[6:8])), + CorrelationID: int32(binary.BigEndian.Uint32(pkt[8:12])), + ClientIDSize: int16(binary.BigEndian.Uint16(pkt[12:14])), + } + + if !isValidKafkaHeader(header) { + return nil, errors.New("invalid Kafka request header") + } + return header, nil +} + func isValidKafkaHeader(header *Header) bool { - if header.MessageSize < int32(KafaMinLength) || header.APIVersion < 0 { + if header.MessageSize < int32(KafkaMinLength) || header.APIVersion < 0 { return false } switch Operation(header.APIKey) { @@ -130,6 +125,40 @@ func isValidKafkaHeader(header *Header) bool { return header.ClientIDSize >= -1 } +func processClientID(header *Header, pkt []byte, k *KafkaInfo) (int, error) { + offset := KafkaMinLength + if header.ClientIDSize > 0 { + clientID := pkt[offset : offset+int(header.ClientIDSize)] + if !isValidClientID(clientID, int(header.ClientIDSize)) { + return 0, errors.New("invalid client ID") + } + offset += int(header.ClientIDSize) + k.ClientID = string(clientID) + } else if header.ClientIDSize < -1 { + return 0, errors.New("invalid client ID size") + } + return offset, nil +} + +func processKafkaOperation(header *Header, pkt []byte, k *KafkaInfo, offset *int) error { + switch Operation(header.APIKey) { + case Produce: + ok, err := getTopicOffsetFromProduceOperation(header, pkt, offset) + if !ok || err != nil { + return err + } + k.Operation = Produce + k.TopicOffset = *offset + case Fetch: + *offset += getTopicOffsetFromFetchOperation(header) + k.Operation = Fetch + k.TopicOffset = *offset + default: + return errors.New("invalid Kafka operation") + } + return nil +} + // nolint:cyclop func isValidKafkaString(buffer []byte, maxBufferSize, realSize int, printableOk bool) bool { for j := 0; j < maxBufferSize; j++ {