Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When maxBatch, maxBytes, maxExpires are exceeded there is only a timeout but no status message #1194

Open
MauriceVanVeen opened this issue Aug 9, 2024 · 1 comment · May be fixed by #1195
Labels
defect Suspected defect such as a bug or regression

Comments

@MauriceVanVeen
Copy link
Member

Observed behavior

When setting maxBatch, maxBytes or maxExpires on the ConsumerConfiguration, and we exceed this value while pulling for messages, we don't get informed about this and only wait for the timeout set by expiresIn.

Expected behavior

Developer is made aware of exceeding these values, which prompts them to fix the code and pull less messages (or smaller expiresIn in the case of maxExpires). By logging the status message:

Aug 09, 2024 12:07:58 PM io.nats.client.impl.ErrorListenerLoggerImpl pullStatusWarning
WARNING: pullStatusWarning, Connection: 13, Subscription: 631365415, Consumer Name: 8V2ygPms7b, Status:Status{code=409, message='Exceeded MaxRequestBatch of 1'}

Server and client version

server 2.10.18
jnats main/2.20.0

Host environment

No response

Steps to reproduce

public class Test {
    public static void main(String[] args) throws Exception {
        try (Connection nc = Nats.connect()) {
            JetStreamManagement jsm = nc.jetStreamManagement();

            String streamName = "EVENTS";
            jsm.addStream(StreamConfiguration.builder()
                    .name(streamName)
                    .subjects("event.>")
                    .build());

            StreamContext streamContext = nc.getStreamContext(streamName);
            
            // Setting maxBatch=1, so we shouldn't allow fetching more messages at once.
            ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder().maxBatch(1).build();
            ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(consumerConfig);

            int count = 0;
            
            // Fetching a batch of 100 messages is not allowed, so we rightfully don't get any messages and wait for timeout.
            // But we don't get informed about the status message.
            try (FetchConsumer fetchConsumer = consumerContext.fetchMessages(100)) {
                Message msg;
                while ((msg = fetchConsumer.nextMessage()) != null) {
                    msg.ack();
                    count++;
                }
            }

            System.out.printf("Received %d messages.", count);
        }
    }
}

Prints only the following and does not inform of any exceeded maximums:

Received 0 messages.
@MauriceVanVeen MauriceVanVeen added the defect Suspected defect such as a bug or regression label Aug 9, 2024
@MauriceVanVeen MauriceVanVeen changed the title When maxBatch, maxBytes, maxExpires` are exceeded there is only a timeout but no status message When maxBatch, maxBytes, maxExpires are exceeded there is only a timeout but no status message Aug 9, 2024
@scottf
Copy link
Contributor

scottf commented Aug 13, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants