Skip to content

Commit

Permalink
[fix][client] Fix reader message filtering issue during blue-green cl…
Browse files Browse the repository at this point in the history
…uster switch (#23693)

Co-authored-by: ruihongzhou <[email protected]>
(cherry picked from commit 34c2f30)
  • Loading branch information
hrzzzz authored and lhotari committed Dec 21, 2024
1 parent 9a050e2 commit 6afb550
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
Expand Down Expand Up @@ -912,6 +913,116 @@ public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMi
client2.close();
}

public void testMigrationWithReader() throws Exception {
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");

@Cleanup
PulsarClient client1 = PulsarClient.builder()
.serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/reader
Producer<byte[]> producer1 = client1.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("cluster1-1")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Reader<byte[]> reader1 =client1.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.subscriptionRolePrefix("s1")
.create();

AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());

// build backlog
reader1.close();
int n = 8;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}

@Cleanup
PulsarClient client2 = PulsarClient.builder()
.serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-2 producer
Producer<byte[]> producer2 = client2.newProducer()
.topic(topicName)
.enableBatching(false)
.producerName("cluster2-1")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());
assertTrue(topic2.getSubscriptions().isEmpty());

// migrate topic to cluster-2
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), null);
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl);
retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
topic1.checkClusterMigration().get();

sleep(1000);
producer1.sendAsync("test1".getBytes());

// producer is disconnected from cluster-1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
assertTrue(topic1.getProducers().isEmpty());

// producer is connected with cluster-2
retryStrategically((test) -> topic2.getProducers().size() == 2, 10, 500);
assertEquals(topic2.getProducers().size(), 2);

// try to consume backlog messages from cluster-1
reader1 = client1.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.subscriptionRolePrefix("s1")
.create();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = reader1.readNext();
assertEquals(msg.getData(), "test1".getBytes());
}

// after consuming all messages, reader should have disconnected from cluster-1 and reconnect with cluster-2
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());
assertTrue(topic1.getSubscriptions().isEmpty());

n = 4;
// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
producer1.send("test2".getBytes());
}

for (int i = 0; i < n; i++) {
assertEquals(reader1.readNext(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
}

client1.close();
client2.close();
}


@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception {
log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -3088,6 +3089,12 @@ boolean isAckReceiptEnabled() {
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}

@Override
protected void setRedirectedClusterURI(String serviceUrl, String serviceUrlTls) throws URISyntaxException {
super.setRedirectedClusterURI(serviceUrl, serviceUrlTls);
acknowledgmentsGroupingTracker.flushAndClean();
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

@VisibleForTesting
Expand Down

0 comments on commit 6afb550

Please sign in to comment.