Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Request hang when produce message of 10M size #763

Open
1 of 7 tasks
jaime0815 opened this issue Apr 19, 2022 · 5 comments
Open
1 of 7 tasks

Client Request hang when produce message of 10M size #763

jaime0815 opened this issue Apr 19, 2022 · 5 comments

Comments

@jaime0815
Copy link

jaime0815 commented Apr 19, 2022

Description

I tested to produce different size messages, the request will hang when message size reach 10Mb

How to reproduce

package main

import (
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/kafka"
        "math/rand"
        "time"
)

const (
        bootstrapServers = "pkc-xxxxx.ap-southeast-1.aws.confluent.cloud:9092"
        ccloudAPIKey     = ""
        ccloudAPISecret  = ""
)

func getBytes(size int) []byte {
        token := make([]byte, size * 1024 * 1024)
        rand.Read(token)
        return token
}

func main() {
        rand.Seed(time.Now().UnixNano())
        topic := "test"
        config := &kafka.ConfigMap{
                "bootstrap.servers":       bootstrapServers,
                "sasl.mechanisms":         "PLAIN",
                "security.protocol":       "SASL_SSL",
                "message.max.bytes":        10485760,
                "sasl.username":           ccloudAPIKey,
                //"debug":                 "all",
                "sasl.password":           ccloudAPISecret}


        p, err := kafka.NewProducer(config)
        defer p.Close()
        if err != nil {
                panic(err)
        }


        deliveryChan := make(chan kafka.Event, 1)
        for _, size := range []int{1, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 20, 50, 80, 100} {
                word1bm := getBytes(size)
                fmt.Printf("start send  %dM size: %d\n", size, len(word1bm))
                p.Produce(&kafka.Message{
                        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
                        Value:          word1bm,
                }, deliveryChan)

                e := <- deliveryChan
                switch ev := e.(type) {
                case *kafka.Message:
                        if ev.TopicPartition.Error != nil {
                                fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                        } else {
                                fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                        }
                }

                p.Flush(50000)
                fmt.Printf("finish send  %dM size\n", size)
        }

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion(v1.8.2)):
  • Apache Kafka broker version: confluent cloud
  • Client configuration: ConfigMap{...}
  • Operating system: cnetos7
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@mhowlett
Copy link
Contributor

The maximum message size on confluent cloud for basic and standard clusters is 8Mb. For dedicated clusters it is 20Mb. https://docs.confluent.io/cloud/current/clusters/cluster-types.html#cloud-cluster-types

However, the client should not hang, you should get an error. Feel free to paste debug logs, that would be interesting.

@jaime0815
Copy link
Author

This is log client.log that running on the dedicated cluster.

@jaime0815
Copy link
Author

it will not occur hang after config message.max.bytes to 104857600, so the root cause seems to be caused when message.max.bytes on the client-side less than on the server-side.

@mhowlett
Copy link
Contributor

try setting message.max.bytes to 20000000 (the default is 10000000)

p.Produce returns an error value, which you need to check, in addition to possibly returning an error via the delivery channel. Since librdkafka can determine the message to large error immediately, it is exposed via the return value, not the delivery channel. (IMO an improved API would only use one mechanism to indicate all errors).

relevant... golang/go#20803

marking this as enhancement as the produce example does not use the error value returned, and may exhibit the same issue.

@jaime0815
Copy link
Author

@mhowlett it works well, I can get a "Message size too large" error when checking p.Produce return value

@mhowlett mhowlett added the HIGH label Nov 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants