Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed: Increase maximum message sizes. #76258

Closed
miretskiy opened this issue Feb 8, 2022 · 7 comments · Fixed by #76265
Closed

changefeed: Increase maximum message sizes. #76258

miretskiy opened this issue Feb 8, 2022 · 7 comments · Fixed by #76265
Assignees
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc

Comments

@miretskiy
Copy link
Contributor

miretskiy commented Feb 8, 2022

Sarama library used to write to kafka restricts a message to be at most
MaxMessageBytes bytes -- which is set to 1MB.
In addition, sarama imposes a limit of MaxRequestSize for a request (100MB).

Those 2 settings are currently not configurable, and we have customers who start storing very large
(5+MB) JSON blobs that can easily exceed MaxMessageBytes.

We need to make changes to support those messages. One way to do this would be to extend
kafka_sink_config to support those options. But perhaps, a better approach is to just make those
"unlimited". We already limit amount of memory that can be consumed by changefeed, and thus
if we were able to ingest the message, and convert it to appropriate format (avro or json), then perhaps
the library itself should not apply any additional limits and any failures should come from the downstream
rejecting those messages.

Epic CRDB-8665

@miretskiy miretskiy added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc labels Feb 8, 2022
@blathers-crl
Copy link

blathers-crl bot commented Feb 8, 2022

cc @cockroachdb/cdc

@blathers-crl blathers-crl bot added the A-cdc Change Data Capture label Feb 8, 2022
@daniel-crlabs
Copy link
Contributor

daniel-crlabs commented Feb 8, 2022

I'm not sure if this is related, but just wanted to point out I ran into this GH issue as well, when troubleshooting this same problem. The GH is from Shopify/sarama Producer doesn't respect MaxMessageBytes limit for uncompressed requests

@kmanamcheri
Copy link

@miretskiy The default value of MaxMessageBytes as documented in sarama/config.go is 1 MB not 10 MB which makes it even more of a tight limit.

This issue combined with #65211 (CDC changefeed to Kafka retrying backfill non stop when Kafka errors out) makes the changefeeds system effectively not usable even if there is a single row which is longer than 1MB. The producer errors out, and the cdc job restarts from the beginning and this ends up in an infinite loop just dumping data over and over again into Kafka.

A simple quick fix would be to pass along the max message bytes config from the user into sarama config. A nicer fix would be to query Kafka server to ask for the server settings and set it automatically

@miretskiy
Copy link
Contributor Author

Right; sorry 1MB.

@miretskiy
Copy link
Contributor Author

@kmanamcheri Thank you for bringing this up. The problem of looping on the same message is known.
We will address it with some sort of a "dead letter queue" type solution. This particular issue should deal with kafka side.
If a message is rejected, then it should be rejected by remote host, and not by sarama library.

@kmanamcheri
Copy link

Yup, yes I like the dead letter queue system. That makes sense.

I am asking if we can make a small fix for this (to enable passing in a the max message bytes config) and release it as a patch :)

That's why I brought up the issue of this is a completely blocker for us since the changefeed is not usable at all, and the fix of adding a single flag to pass through the max message bytes seems like a relatively low-risk fix.

@miretskiy
Copy link
Contributor Author

@amruss -- see re DLQ above.

@miretskiy miretskiy self-assigned this Feb 8, 2022
craig bot pushed a commit that referenced this issue Feb 9, 2022
75519: cli: add tracing to `debug send-kv-batch` r=andreimatei,erikgrinaker a=knz

This extends the `debug send-kv-batch` CLI utility to optionally
collect and print a trace of the remote processing.

(big thanks to @andreimatei for the help)



76213: spanconfig: replace roachpb.SpanConfigEntry in package spanconfig  r=arulajmani a=arulajmani

See individual commits for details. 

Polished up and carved out from #76138. 

In the next patch, we'll modify the existing span configuration RPC arguments to something similar in the draft PR above. Then, we'll make `spanconfig.Target` a union over system targets and spans (instead of the alias for roachpb.Spans as in this PR). We'll also add all the encoding and decoding methods for system targets that we've seen in prior draft patches. 

76254: backupccl: flip `bulkio.backup.split_keys_on_timestamps` to true r=dt a=adityamaru

Release note (sql change): Release note (sql change): BACKUPs of ranges containing extremely
large numbers of revisions to a single row no longer fail with
errors related to exceeding size limit.

76265: changefeedccl: Increase message size limits for kafka sink. r=miretskiy a=miretskiy

Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes #76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: arulajmani <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
@craig craig bot closed this as completed in e109b4b Feb 9, 2022
blathers-crl bot pushed a commit that referenced this issue Feb 9, 2022
Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes #76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.
miretskiy pushed a commit to miretskiy/cockroach that referenced this issue Feb 9, 2022
Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes cockroachdb#76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.
miretskiy pushed a commit to miretskiy/cockroach that referenced this issue Feb 9, 2022
Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes cockroachdb#76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.
RajivTS pushed a commit to RajivTS/cockroach that referenced this issue Mar 6, 2022
Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes cockroachdb#76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants