Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot produce big messages #181

Closed
turtledev1 opened this issue Nov 22, 2022 · 9 comments
Closed

Cannot produce big messages #181

turtledev1 opened this issue Nov 22, 2022 · 9 comments
Labels
❓ Question Further information is requested

Comments

@turtledev1
Copy link

I'm trying to produce a big kafka message in my tests, but I get a really weird error

... 57 55 2 18 115 101 99 116 105 111 110 52 57 12 115 101 97 116 49 55 24 115 101 97 116 82 101 102 52 57 57 57 56 2 18 115 101 99 116 105 111 110 52 57 12 115 101 97 116 49 56 24 115 101 97 116 82 101 102 52 57 57 57 57 2 18 115 101 99 116 105 111 110 52 57 12 115 101 97 116 49 57 24 115 101 97 116 82 101 102 53 48 48 48 48 2 18 115 101 99 116 105 111 110 52 57 0 0 0] [] {0 0 <nil>}} []})
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at insertion_simulation (file:///Users/marc-andre.bouvrette/Documents/Entry/k6-performance-tests/build/insertion-simulation.js:198:17(39))
        at native  executor=per-vu-iterations scenario=default source=stacktrace

Note: I didn't pasted the whole error because it's too big, but it's basically a bunch of numbers like these.

Here is my code

import { Connection, SchemaRegistry, SCHEMA_TYPE_AVRO, Writer } from 'k6/x/kafka';
import { config } from './config/config';
import { generateData } from './data';

const brokers = config.kafka.brokerList;
const schemaRegistry = new SchemaRegistry({
    url: config.kafka.schemaRegistryUrl
});
const topic = config.kafka.topic;

const writer = new Writer({
    brokers: brokers,
    topic: topic,
    compression: 'zstd',
    autoCreateTopic: true,
});

const connection = new Connection({
    address: brokers[0],
});

const valueSchemaObject = schemaRegistry.getSchema({
    subject: `${topic}-value`,
    schemaType: SCHEMA_TYPE_AVRO,
});

export function teardown() {
    writer.close();
    connection.close();
}

export default function () {
    for (let index = 0; index < 1; index++) {
        const messages = [
            {
                value: schemaRegistry.serialize({
                    data: generateData(), // a big JSON (around 30k items in an array). Works in my java project Also works with 27k items
                    schema: valueSchemaObject,
                    schemaType: SCHEMA_TYPE_AVRO,
                }),
            },
        ];
        writer.produce({ messages: messages });
    }
}
@mostafa
Copy link
Owner

mostafa commented Nov 22, 2022

Hey @turtledev1,
Since you're using zstd compression, you need to set compression type on your topic config, considering you're creating one in your script, like this:

connection.createTopic({
topic: topic,
configEntries: [
{
configName: "compression.type",
configValue: CODEC_SNAPPY,
},
],
});

Also, you need to import CODEC_ZSTD and provide it too in place of "zstd".

Let me know if you aren't creating the topic in the test, so I can dig deeper into this.

@turtledev1
Copy link
Author

My topic already exists so I'm not creating it. Also, the zstd compression was my first attempt at fixing this because I thought the messages were too big. I have the same issue without the compression.

I know my setup works because if I generate a smaller message, everything works as expected and my consumer coreectly receives the message.

@mostafa
Copy link
Owner

mostafa commented Nov 23, 2022

@turtledev1

  1. Have you set zstd compression in the option when creating the topic in the first place?
  2. Can you give me the sample of generateData function, so I can reproduce this locally?

@turtledev1
Copy link
Author

turtledev1 commented Nov 23, 2022

Let's forget about the compression, I have the same result without it.

Here is my complete code:

// test.ts

import { Connection, SchemaRegistry, SCHEMA_TYPE_AVRO, Writer } from 'k6/x/kafka';
import { config } from './config/config';
import { generateData } from './data';

const brokers = config.kafka.brokerList;
const schemaRegistry = new SchemaRegistry({
    url: config.kafka.schemaRegistryUrl
});
const topic = config.kafka.topic;

const writer = new Writer({
    brokers: brokers,
    topic: topic,
    autoCreateTopic: true,
});

const connection = new Connection({
    address: brokers[0],
});

const valueSchemaObject = schemaRegistry.getSchema({
    subject: `${topic}-value`,
    schemaType: SCHEMA_TYPE_AVRO,
});

export function teardown() {
    writer.close();
    connection.close();
}

export default function () {
    for (let index = 0; index < 1; index++) {
        const messages = [
            {
                value: schemaRegistry.serialize({
                    data: generateData(50, 50, 20),
                    schema: valueSchemaObject,
                    schemaType: SCHEMA_TYPE_AVRO,
                }),
            },
        ];
        writer.produce({ messages: messages });
    }
}
// data.ts

export function generateData(sectionNumber: number, rowNumber: number, seatNumber: number) {
    let seatRef = 1;
    const sections = [];
    for (let sectionId = 0; sectionId < sectionNumber; sectionId++) {
        const sectionName = `section${sectionId}`;
        const rows = [];
        for (let rowId = 0; rowId < rowNumber; rowId++) {
            const rowName = `row${rowId}`;
            const seats = [];
            for (let seatId = 0; seatId < seatNumber; seatId++) {
                seats.push({
                    name: `seat${seatId}`,
                    group: {
                        'string': sectionName,
                    },
                    ref: `seatRef${seatRef}`,
                });
                seatRef++;
            }
            rows.push({
                name: rowName,
                seats: seats,
            });
        }
        sections.push({
            name: sectionName,
            rows: rows,
        });
    }
    return {
        id: '11ead359-db45-dbaa-9137-3b6a288b93c6',
        sections: sections
    };
}

And here is the schema in the registry:

{
  "fields": [
    {
      "name": "id",
      "type": {
        "logicalType": "uuid",
        "type": "string"
      }
    },
    {
      "default": [],
      "name": "sections",
      "type": {
        "items": {
          "fields": [
            {
              "name": "name",
              "type": {
                "avro.java.string": "String",
                "type": "string"
              }
            },
            {
              "default": [],
              "name": "rows",
              "type": {
                "items": {
                  "fields": [
                    {
                      "name": "name",
                      "type": {
                        "avro.java.string": "String",
                        "type": "string"
                      }
                    },
                    {
                      "default": [],
                      "name": "seats",
                      "type": {
                        "items": {
                          "fields": [
                            {
                              "name": "name",
                              "type": {
                                "avro.java.string": "String",
                                "type": "string"
                              }
                            },
                            {
                              "name": "ref",
                              "type": {
                                "avro.java.string": "String",
                                "type": "string"
                              }
                            },
                            {
                              "default": null,
                              "name": "group",
                              "type": [
                                "null",
                                {
                                  "avro.java.string": "String",
                                  "type": "string"
                                }
                              ]
                            }
                          ],
                          "name": "Seat",
                          "type": "record"
                        },
                        "type": "array"
                      }
                    }
                  ],
                  "name": "Row",
                  "type": "record"
                },
                "type": "array"
              }
            }
          ],
          "name": "Section",
          "type": "record"
        },
        "type": "array"
      }
    }
  ],
  "name": "Manifest",
  "namespace": "venue.avro.manifest",
  "type": "record"
}

With generateData(50, 50, 20) I have the issue, with generateData(50, 50, 10) everything works as expected

@mostafa
Copy link
Owner

mostafa commented Nov 23, 2022

@turtledev1

I slightly modified your script and found out that you're producing Kafka message that are bigger than the Kafka's 1 MB limit. As you can see below, when calling generateData with (50, 50, 20) you are producing a 1.45 MB message. You need to increase message.max.bytes to be able to produce bigger messages that Kafka accepts.

Script
import { SchemaRegistry, SCHEMA_TYPE_AVRO } from "k6/x/kafka";

const schemaRegistry = new SchemaRegistry();
const valueSchemaObject = JSON.stringify({
    type: "record",
    name: "Value",
    namespace: "dev.mostafa.xk6.kafka",
    fields: [
        {
            name: "id",
            type: {
                logicalType: "uuid",
                type: "string",
            },
        },
        {
            default: [],
            name: "sections",
            type: {
                items: {
                    fields: [
                        {
                            name: "name",
                            type: {
                                "avro.java.string": "String",
                                type: "string",
                            },
                        },
                        {
                            default: [],
                            name: "rows",
                            type: {
                                items: {
                                    fields: [
                                        {
                                            name: "name",
                                            type: {
                                                "avro.java.string": "String",
                                                type: "string",
                                            },
                                        },
                                        {
                                            default: [],
                                            name: "seats",
                                            type: {
                                                items: {
                                                    fields: [
                                                        {
                                                            name: "name",
                                                            type: {
                                                                "avro.java.string": "String",
                                                                type: "string",
                                                            },
                                                        },
                                                        {
                                                            name: "ref",
                                                            type: {
                                                                "avro.java.string": "String",
                                                                type: "string",
                                                            },
                                                        },
                                                        {
                                                            default: null,
                                                            name: "group",
                                                            type: [
                                                                "null",
                                                                {
                                                                    "avro.java.string": "String",
                                                                    type: "string",
                                                                },
                                                            ],
                                                        },
                                                    ],
                                                    name: "Seat",
                                                    type: "record",
                                                },
                                                type: "array",
                                            },
                                        },
                                    ],
                                    name: "Row",
                                    type: "record",
                                },
                                type: "array",
                            },
                        },
                    ],
                    name: "Section",
                    type: "record",
                },
                type: "array",
            },
        },
    ],
    name: "Manifest",
    namespace: "venue.avro.manifest",
    type: "record",
});

// https://stackoverflow.com/a/71209062/6999563
function bytesForHuman(bytes, decimals = 2) {
    let units = ["B", "KB", "MB", "GB", "TB", "PB"];

    let i = 0;
    for (i; bytes > 1024; i++) {
        bytes /= 1024;
    }

    return parseFloat(bytes.toFixed(decimals)) + " " + units[i];
}

function generateData(sectionNumber, rowNumber, seatNumber) {
    let seatRef = 1;
    const sections = [];
    for (let sectionId = 0; sectionId < sectionNumber; sectionId++) {
        const sectionName = `section${sectionId}`;
        const rows = [];
        for (let rowId = 0; rowId < rowNumber; rowId++) {
            const rowName = `row${rowId}`;
            const seats = [];
            for (let seatId = 0; seatId < seatNumber; seatId++) {
                seats.push({
                    name: `seat${seatId}`,
                    group: {
                        string: sectionName,
                    },
                    ref: `seatRef${seatRef}`,
                });
                seatRef++;
            }
            rows.push({
                name: rowName,
                seats: seats,
            });
        }
        sections.push({
            name: sectionName,
            rows: rows,
        });
    }
    return {
        id: "11ead359-db45-dbaa-9137-3b6a288b93c6",
        sections: sections,
    };
}

export default function () {
    const smallMsg = schemaRegistry.serialize({
        data: generateData(50, 50, 10),
        schema: { schema: valueSchemaObject },
        schemaType: SCHEMA_TYPE_AVRO,
    });
    const bigMsg = schemaRegistry.serialize({
        data: generateData(50, 50, 20),
        schema: { schema: valueSchemaObject },
        schemaType: SCHEMA_TYPE_AVRO,
    });
    console.log("Big message size: ", bigMsg.length, " bytes");
    console.log("Big message size (humanized): ", bytesForHuman(bigMsg.length));

    console.log("Small message size: ", smallMsg.length, " bytes");
    console.log("Small message size (humanized): ", bytesForHuman(smallMsg.length));

    // https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html
    console.log("message.max.bytes: ", bytesForHuman(1048588));
}
Console output
$ ./k6 run test.ts

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: test.ts
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] Big message size:  1524028  bytes             source=console
INFO[0000] Big message size (humanized):  1.45 MB        source=console
INFO[0000] Small message size:  754028  bytes            source=console
INFO[0000] Small message size (humanized):  736.36 KB    source=console
INFO[0000] message.max.bytes:  1 MB                      source=console

running (00m00.6s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.6s/10m0s  1/1 iters, 1 per VU

     data_received........: 0 B 0 B/s
     data_sent............: 0 B 0 B/s
     iteration_duration...: avg=600.34ms min=600.34ms med=600.34ms max=600.34ms p(90)=600.34ms p(95)=600.34ms
     iterations...........: 1   1.662717/s

I consider this ticket closed.

@mostafa mostafa closed this as completed Nov 23, 2022
@mostafa mostafa added the ❓ Question Further information is requested label Nov 23, 2022
@turtledev1
Copy link
Author

turtledev1 commented Nov 23, 2022

Sorry to readd to this, but even with compression, I have the same error, even though the message is way smaller.

There also seem to be a misconception on your side that the compression type needs to be on the topic level for it to work, but it's not true if the topic has the 'producer' compression type, which our has:
image

(see https://kafka.apache.org/documentation/#topicconfigs_compression.type).

This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.

Also as a proof of this, we currently produce and consume messages that are at least twice the size as the messages that fail in my example.

Maybe the compression is not used correctly in the writer?

@mostafa
Copy link
Owner

mostafa commented Nov 23, 2022

@turtledev1

  1. True. You need to set the compression type on the writer with compression key, as you did in your original message. Otherwise the writer cannot figure out the compression type by itself.
  2. I will double-check to see if I can produce a large compressed message that doesn't exceed 1 MB.

In the mean time, can you check the output size of the messages produced by your writer in your own application with the same amount of raw data (1.45 MB).

@mostafa
Copy link
Owner

mostafa commented Nov 23, 2022

@turtledev1
I ran some tests and found out that the compression is working perfectly when message is produced to Kafka. However, you need to increase the batchBytes in your writer config to a value bigger than or equal to your possible uncompressed message size. I set the value to 1024 * 1024 * 2 (2 MB) and the test ran successfully. Needless to say that if you are using a loop to produce messages, you should consider setting the batchBytes to the total number of messages multiplied by the size of each individual key/value in a message.

Also, the message you received in the terminal is MessageTooLargeError from the kafka-go library nagging about the data size being bigger than it expects.

@turtledev1
Copy link
Author

Omg thank you so much! I confirm that this works! Really appreciate the time you took to help me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓ Question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants