Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka scaler: excludePersistentLag not working for partitions with invalid offset (that is -1) #5274

Open
rakesh-ism opened this issue Dec 11, 2023 · 14 comments
Labels
bug Something isn't working

Comments

@rakesh-ism
Copy link

Report

When I set excludePersistentLag flag to true, it does not excludes persistent lags for the partition with invalid offset (that is -1).

Expected Behavior

If the lags for these partitions with invalid offset is persistent, it should be ignored in the custom metrics.

Actual Behavior

In my test set up, this resulted in no of kafka consumer pods to set to max.

Steps to Reproduce the Problem

  1. In the setup, you should have some partitions that has not been consumed till now. (Invalid offset)
  2. There should be no new message coming to the partition.
  3. Old messages are not in kafka buffer as they are already expired.

Use kafka scalar to scale the consumer deployment.

I know this is might not be a real production set up. I had this set up in my sandbox landscape for . And it took me hard time to find out what was the problem.

Logs from KEDA operator

example

KEDA Version

2.12.1

Kubernetes Version

None

Platform

Other

Scaler Details

Kafka

Anything else?

No response

@rakesh-ism rakesh-ism added the bug Something isn't working label Dec 11, 2023
@zroubalik
Copy link
Member

@dttung2905 What do you think about this?

@dttung2905
Copy link
Contributor

@zroubalik I think its a good point that @rakesh-ism bring up and imo we can implement the logic. I do agree that sometimes we do get negative offset lag for kafka consumer (quite common from my experience managing kafka actually 😞 ) I will create a PR tomorrow morning for this ( its late in my timezone now haha )

@dttung2905
Copy link
Contributor

dttung2905 commented Dec 13, 2023

@rakesh-ism could you help to send over a few things ?

  • your kafka scaler config that you use.
  • logs from the keda operator.
    From my quick look at the code, if your consumer offset for a partition is -1, it should be caught within this code block already 🤔
    consumerOffset := block.Offset
    if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
    retVal := int64(1)
    if s.metadata.scaleToZeroOnInvalidOffset {
    retVal = 0
    }
    msg := fmt.Sprintf(
    "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
    topic, s.metadata.group, partitionID, retVal)
    s.logger.V(1).Info(msg)
    return retVal, retVal, nil
    }
    if _, found := topicPartitionOffsets[topic]; !found {
    return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic)
    }
    latestOffset := topicPartitionOffsets[topic][partitionID]
    if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
    return latestOffset, latestOffset, nil
    }

@rakesh-ism
Copy link
Author

rakesh-ism commented Jan 3, 2024

@dttung2905, Thanks for taking it up.
Attached is the keda config I used.
keda.txt

In my case, offsetresetpolicy is "earliest", the above code is bypassed. and code branch at line no 684 is executed.

@dttung2905
Copy link
Contributor

Thanks for sharing with us the config file.
Hmm this is weird ! Could you share any long from the controller too ? If you can share the steps / setup to replicate the problem that would be great for us 🙏

Copy link

stale bot commented Mar 4, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label Mar 4, 2024
@rakesh-ism
Copy link
Author

Got busy. Need some more time to reproduce.

@stale stale bot removed the stale All issues that are marked as stale due to inactivity label Mar 6, 2024
@dttung2905
Copy link
Contributor

@rakesh-ism sure. Let me know if you manage to reproduce it 🙏

Copy link

stale bot commented May 9, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label May 9, 2024
Copy link

stale bot commented May 16, 2024

This issue has been automatically closed due to inactivity.

@stale stale bot closed this as completed May 16, 2024
@rakesh-ism
Copy link
Author

rakesh-ism commented Jun 24, 2024

Hi
The issue got reproduced in my system again. I did some analysis by adding some debug logging in the kafka scaler and found that we have a topic in the consumer group where the lag is like below. The below is output from kafka-consumer-groups.sh...

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group1 topic1 0 80 80 0 rdkafka-xxx-xxx-xxx-xxx-xxx /xx.xx.xx.xx rdkafka
group1 topic1 1 - 60 - rdkafka-xxx-xxx-xxx-xxx-xxy /100.100.17.69 rdkafka
group1 topic1 2 - 96 - rdkafka-xxx-xxx-xxx-xxx-xxz /100.100.14.23 rdkafka

Because of this the lag comes as 156.
Here in our case, the topic1 get messages, once in 2-3 months or more. So if we create a new consumer (new group), the lag will be - for some time for all partitions and then suddenly we get one message and lag will be 0 in one partition and will be '-' in other partitions and then the lag for other partitions will start getting counted.
Can you please check?

@zroubalik zroubalik reopened this Jun 25, 2024
@stale stale bot removed the stale All issues that are marked as stale due to inactivity label Jun 25, 2024
@zroubalik
Copy link
Member

@dttung2905 PTAL 🙏 :)

@rakesh-ism
Copy link
Author

rakesh-ism commented Aug 17, 2024

Any updates please?

@danielmotaleite
Copy link

TLDR: update at least to keda 2.15.x, enable scaleToZeroOnInvalidOffset: "true" and add a cron trigger to start the consumer at least one a day/hour (depending of your requirements). Long term fix below, by forcing one message in all partitions.

I also have this problem and found that this option fix this:

scaleToZeroOnInvalidOffset: "true"

So this option will consider lag zero when the lag is below zero or non-existent, fixing this problem.
This may cause other problem, like if you scale to zero and a message arrives to a partition without lag, as there is no consumer, it will not be consumed nor detected (as there is no consumer lag yet). If a new message arrives to the partition that already have a lag metric, everything works correctly, so the danger is only on the partitions without lag.

So this option is safe is your minimum consumer is at least one, but if scale to zero, you may need to start the consumer (maybe via cron trigger) to force the consumer to start every few hours/days to make sure you don't have any stale message in the partitions without lag.

when all partitions already have lag metric, you can disable this option.
So a better long run solution maybe probably use kcat to force produce a message to all partitions, to force the consumer to work at least once in all partitions. Of course, either prepare your consumer for a (invalid) test message, or make sure you inject a valid messaged, or else you may lock your consumer with a broken message that never gets consumed

Kafka lag negative is usually a quick corner case of a active consumer, it is usually now a major issue and the default scaleToZeroOnInvalidOffset: "false" already takes care of this. Please make sure you use the latest keda version, as older versions may not have this option and the version 2.15.0 add a fix for when the offsetResetPolicy: "earliest" not applying the scaleToZeroOnInvalidOffset workaround.

So for my side, i updated keda, enabled that option and it solved my issue. i them enabled cron to start the consumer once a day, just in case for the topics with very little activity.
i'm also waiting for the devs to add a discard processing when the message is of type { "type": "debug" } (payload dependent of your consumer) to manually initialize all partitions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Proposed
Development

No branches or pull requests

4 participants