diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index e6a7d049366e4..167c154c1fd88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; @@ -911,6 +912,116 @@ public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMi client2.close(); } + public void testMigrationWithReader() throws Exception { + final String topicName = BrokerTestUtil + .newUniqueName("persistent://" + namespace + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder() + .serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-1 producer/reader + Producer producer1 = client1.newProducer() + .topic(topicName) + .enableBatching(false) + .producerName("cluster1-1") + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Reader reader1 =client1.newReader() + .topic(topicName) + .startMessageId(MessageId.earliest) + .subscriptionRolePrefix("s1") + .create(); + + AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(topic1.getProducers().isEmpty()); + assertFalse(topic1.getSubscriptions().isEmpty()); + + // build backlog + reader1.close(); + int n = 8; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-2 producer + Producer producer2 = client2.newProducer() + .topic(topicName) + .enableBatching(false) + .producerName("cluster2-1") + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(topic2.getProducers().isEmpty()); + assertTrue(topic2.getSubscriptions().isEmpty()); + + // migrate topic to cluster-2 + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), null); + admin1.clusters().updateClusterMigration("r1", true, migratedUrl); + assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + topic1.checkClusterMigration().get(); + + sleep(1000); + producer1.sendAsync("test1".getBytes()); + + // producer is disconnected from cluster-1 + retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500); + assertTrue(topic1.getProducers().isEmpty()); + + // producer is connected with cluster-2 + retryStrategically((test) -> topic2.getProducers().size() == 2, 10, 500); + assertEquals(topic2.getProducers().size(), 2); + + // try to consume backlog messages from cluster-1 + reader1 = client1.newReader() + .topic(topicName) + .startMessageId(MessageId.earliest) + .subscriptionRolePrefix("s1") + .create(); + for (int i = 0; i < n; i++) { + Message msg = reader1.readNext(); + assertEquals(msg.getData(), "test1".getBytes()); + } + + // after consuming all messages, reader should have disconnected from cluster-1 and reconnect with cluster-2 + retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); + assertFalse(topic2.getSubscriptions().isEmpty()); + assertTrue(topic1.getSubscriptions().isEmpty()); + + n = 4; + // publish messages to cluster-2 and consume them + for (int i = 0; i < n; i++) { + producer1.send("test2".getBytes()); + } + + for (int i = 0; i < n; i++) { + assertEquals(reader1.readNext(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + } + + client1.close(); + client2.close(); + } + + @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e01c6d4643b08..4d1b51e34db73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -37,6 +37,7 @@ import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -3107,6 +3108,12 @@ boolean isAckReceiptEnabled() { && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } + @Override + protected void setRedirectedClusterURI(String serviceUrl, String serviceUrlTls) throws URISyntaxException { + super.setRedirectedClusterURI(serviceUrl, serviceUrlTls); + acknowledgmentsGroupingTracker.flushAndClean(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); @VisibleForTesting