Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribeWithPuplishes.subscribeSIngleFuture interferes when further processing publishes #544

Open
DerSchwilk opened this issue Sep 9, 2022 · 1 comment

Comments

@DerSchwilk
Copy link

When calling subscribeSingleFuture() on a FlowableWithSingle<MqttXPublish, MqttXSubAck> i would expect that the original flowable behaves the same as if I would process it with the subscribeSingleFuture() call. If I run the subscribeSingleFuture() the flow should still process all MqttPublishes received by the subscription.

We publish >20 message to the client. When running subscribeSingleFuture() on the FlowableWithSingle and manually acking the publishes on the original flowable, the original flowable only emits 20 elements, capped by the max in-flight messages of mosquitto.

To Reproduce

In our scenario we want to evaluate the SubAck prior to processing and consuming the publishes.

  1. Publish n messages with QoS > 0 to a mosquitto instance with max in-flight configured to < n
  2. Consume the messages by an RXClient with subscribePublishes(Mqtt5Subscribe, true)
  3. Store the returned FlowableWithSingle in a variable
  4. Run subscribeSingleFuture() on it
  5. Acknowledge the messages in a mapping stage on the stored FlowableWithSingle
  6. Consume the stored FlowableWithSingle

`final var flowableWithSingle =
client.subscribePublishes(Mqtt5Subscribe.builder().topicFilter(TOPIC).qos(MqttQos.EXACTLY_ONCE).build(),
true);

    flowableWithSingle.subscribeSingleFuture();

    flowableWithSingle.doOnNext(n -> {
        LOGGER.info("Got next");
        n.acknowledge();
    }).ignoreElements().blockingAwait();`

Without calling the subscribeSingleFuture() the messages are acked correctly and every message is received.

Details

  • Affected HiveMQ MQTT Client version(s): 1.3.0
  • Used JVM version: Eclipse Temurin 17.0.3
  • Used OS (name and version): macOs Monterey 12.5.1 (21G83)
  • Used MQTT version: 3 and 5
  • Used MQTT broker (name and version): eclipse-mosquitto:1.6.15
@thjaeckle
Copy link

@DerSchwilk and I encountered that behaviour while including the reactive API of HiveMQ client in Eclipse Ditto.
We run our system-tests with Eclipse Mosquito, that's where we noticed the strange behaviour change when using FlowableWithSingle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants