Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit timeout #1106

Closed
ouertani opened this issue Nov 15, 2023 · 14 comments
Closed

Commit timeout #1106

ouertani opened this issue Nov 15, 2023 · 14 comments

Comments

@ouertani
Copy link
Contributor

Start getting zio.kafka.consumer.Consumer$CommitTimeout$: Commit timeout after upgrading zio kafka from 2.2 to 2.6.0

@erikvanoosten
Copy link
Collaborator

@ouertani What are your consumer settings?

@ouertani
Copy link
Contributor Author

@ouertani What are your consumer settings?

consumer {
    security.protocol="SASL_SSL"
    sasl.mechanism="PLAIN"
    key.deserializer="org.apache.kafka.common.serialization.StringDeserializer"
    value.deserializer="org.apache.kafka.common.serialization.StringDeserializer"
    group.id="XYZdev"
    auto.offset.reset="earliest"
    max.poll.records="20"
}

@ouertani
Copy link
Contributor Author

@erikvanoosten this behavior is reproducible with versions 2.3.0 2.3.1 2.3.2 2.5.0

@erikvanoosten
Copy link
Collaborator

Sorry, I was referring to the zio-kafka ConsumerSettings. In particular, what is the pollInterval?

Ah, in 2.3 also. Let's see what could have caused that...

@erikvanoosten
Copy link
Collaborator

@ouertani I have no idea. Can you create a minimal reproducer please?

@ouertani
Copy link
Contributor Author

Here is more details:

 INFO  o.a.k.c.consumer.ConsumerConfig ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.include.jmx.reporter = true
	auto.offset.reset = earliest
	bootstrap.servers = [xyz.servicebus.windows.net:9093]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = xyz
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = xyzdev
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 20
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.connect.timeout.ms = null
	sasl.login.read.timeout.ms = null

@ouertani
Copy link
Contributor Author

ouertani commented Nov 15, 2023

@erikvanoosten The consumer is on eventhub what is important that messages have a big sizes with 10+ parallel consumers/pods. I get this from RN:

Because of these changes, the broker will be polled more often which can cause higher CPU and memory consumption. You may need to re-tune the consumer settings. For example, you can lower max.poll.records when there is high memory pressure, and you can increase pollTimeout when you observe high CPU usage and latency is not a concern.

@erikvanoosten
Copy link
Collaborator

How large are the messages roughly?
Did you already try increasing commit time out via ConsumerSettings.commitTimeout (since zio-kafka 2.4)? The default is 15s.

@ouertani
Copy link
Contributor Author

message size less than 1Mb roughly around 600kb. I will try to update timeout while this is wired as the same behavior with 2.3.0

@ouertani
Copy link
Contributor Author

@erikvanoosten indeed another java.lang.OutOfMemoryError: Java heap space is raised instead

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Nov 15, 2023

With such large messages it doesn't make sense to prefetch. You'll go OOM quickly.
As described in #1091, you can ConsumerSettings.partitionPreFetchBufferLimit(1) with zio-kafka 2.6.0 and ConsumerSettings.withoutPartitionPreFetching with zio-kafka 2.6.1 (not yet released).

Alternatively, you can write your own FetchStrategy.

@ouertani
Copy link
Contributor Author

What was wired for me that it was working for month withs 2.2 version without issues.

@erikvanoosten
Copy link
Collaborator

What was wired for me that it was working for month withs 2.2 version without issues.

I am happy that disabling prefetch works!

After 2.2 we made performance optimization by pre-fetching quicker and more. Unfortunately this doesn't work well for some less common use cases such as large messages.

@ouertani
Copy link
Contributor Author

Thank you @erikvanoosten

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants