Skip to content

Commit

Permalink
[improve][broker] Support namespace-level configuration of migratedCl…
Browse files Browse the repository at this point in the history
…usterUrl in blue-green migration feature
  • Loading branch information
ruihongzhou committed Dec 27, 2024
1 parent 1967a93 commit 572b6f4
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -961,7 +962,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
policies -> new LocalPolicies(policies.bundles,
bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
policies.migrated))
policies.migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
bookieAffinityGroup,
null));
Expand Down Expand Up @@ -1781,7 +1783,8 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup,
policies.migrated))
policies.migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))
);
Expand Down Expand Up @@ -1819,7 +1822,8 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
null,
policies.migrated));
policies.migrated,
policies.migratedClusterUrl));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e);
Expand Down Expand Up @@ -2772,16 +2776,37 @@ protected void internalEnableMigration(boolean migrated) {
policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated))
migrated,
policies.migratedClusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated)));
null, null, migrated, null)));
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
throw new RestException(e);
}
}

protected void internalUpdateMigrationState(boolean migrated, ClusterUrl clusterUrl) {
validateSuperUserAccess();
try {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> oldPolicies.map(
policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated,
clusterUrl))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated, clusterUrl)));
log.info("Successfully updated migration state on namespace {}, migrated={}, clusterUrl={}",
namespaceName, migrated, clusterUrl);
} catch (Exception e) {
log.error("Failed to update migration state on namespace {}, migrated={}, clusterUrl={}",
namespaceName, migrated, clusterUrl, e);
throw new RestException(e);
}
}

protected Policies getDefaultPolicesIfNull(Policies policies) {
if (policies == null) {
policies = new Policies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -1722,6 +1723,22 @@ public void enableMigration(@PathParam("property") String property,
internalEnableMigration(migrated);
}

@POST
@Path("/{property}/{cluster}/{namespace}/migrationState")
@ApiOperation(hidden = true, value = "Update migration state for a namespace")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void updateMigrationState(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("migrated") boolean migrated,
ClusterUrl clusterUrl) {
validateNamespaceName(property, cluster, namespace);
internalUpdateMigrationState(migrated, clusterUrl);
}

@PUT
@Path("/{property}/{cluster}/{namespace}/policy")
@ApiOperation(value = "Creates a new namespace with the specified policies")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -3019,6 +3020,21 @@ public void enableMigration(@PathParam("tenant") String tenant,
internalEnableMigration(migrated);
}

@POST
@Path("/{tenant}/{namespace}/migrationState")
@ApiOperation(hidden = true, value = "Update migration state for a namespace")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void updateMigrationState(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("migrated") boolean migrated,
ClusterUrl clusterUrl) {
validateNamespaceName(tenant, namespace);
internalUpdateMigrationState(migrated, clusterUrl);
}

@POST
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,33 +1363,35 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
CompletableFuture<Optional<ClusterUrl>> result = new CompletableFuture<>();
pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources()
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled) -> {
Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated()
|| isNamespaceMigrationEnabled))
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
.thenCombine(pulsar.getPulsarResources().getLocalPolicies()
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject()),
((clusterData, nsLocalPolicies) -> {
boolean isNamespaceMigrated = false;
if (nsLocalPolicies.isPresent() && nsLocalPolicies.get().migrated) {
isNamespaceMigrated = true;
ClusterUrl clusterUrl = nsLocalPolicies.get().migratedClusterUrl;
if (clusterUrl != null && !clusterUrl.isEmpty()) {
return Optional.of(clusterUrl);
}
}
if (clusterData.isPresent() && (clusterData.get().isMigrated() || isNamespaceMigrated)) {
return Optional.ofNullable(clusterData.get().getMigratedClusterUrl());
}
return Optional.empty();
}))
.thenAccept(res -> {
// cluster policies future is completed by metadata-store thread and continuing further
// processing in the same metadata store can cause deadlock while creating topic as
// create topic path may have blocking call on metadata-store. so, complete future on a
// separate thread to avoid deadlock.
pulsar.getExecutor().execute(() -> result.complete(res));
pulsar.getExecutor().execute(() -> result.complete((Optional<ClusterUrl>) res));
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> result.completeExceptionally(ex.getCause()));
return null;
});
return result;
}

private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
return pulsar.getPulsarResources().getLocalPolicies()
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().migrated);
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, String topic) {
try {
return getMigratedClusterUrlAsync(pulsar, topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public LocalPolicies toLocalPolicies() {
return new LocalPolicies(this.getBundlesData(),
localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().migrated).orElse(false));
localPolicies.map(lp -> lp.getLeft().migrated).orElse(false),
localPolicies.map(lp -> lp.getLeft().migratedClusterUrl).orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,111 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp
client3.close();
}

@Test
public void testNamespaceMigrationWithNamespaceLevelMigratedClusterUrl() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic");

// cluster1 producer/consumer
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());

// build backlog
consumer1.close();
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}

// cluster2 producer/consumer
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());
assertTrue(topic2.getSubscriptions().isEmpty());

// cluster3 producer/consumer
@Cleanup
PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer3 = client3.newProducer().topic(topicName).enableBatching(false)
.producerName("producer3").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic3 = (AbstractTopic) pulsar3.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic3.getProducers().isEmpty());
assertTrue(topic3.getSubscriptions().isEmpty());

// Set the migratedCluster at both the cluster level and the namespace level. Since the configuration at
// the namespace level has a higher priority, the topic will be migrated to cluster3.
ClusterUrl cluster2MigratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(),
pulsar2.getWebServiceAddressTls(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
ClusterUrl cluster3MigratedUrl = new ClusterUrl(pulsar3.getWebServiceAddress(),
pulsar3.getWebServiceAddressTls(), pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, cluster2MigratedUrl);
admin1.namespaces().updateMigrationState(namespace, true, cluster3MigratedUrl);

retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
topic1.checkClusterMigration().get();

sleep(1000);
producer1.sendAsync("test1".getBytes());

// producer1 is disconnected from cluster1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
assertTrue(topic1.getProducers().isEmpty());
// producer1 is not connected to cluster2
retryStrategically((test) -> topic2.getProducers().size() == 1, 10, 500);
assertEquals(topic2.getProducers().size(), 1);
// producer1 is connected to cluster3
retryStrategically((test) -> topic3.getProducers().size() == 2, 10, 500);
assertEquals(topic3.getProducers().size(), 2);

// try to consume backlog messages from cluster1
consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
}
// after consuming all messages, consumer1 should have disconnected from cluster1 and reconnected to cluster3
retryStrategically((test) -> !topic3.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic3.getSubscriptions().isEmpty());

// publish messages to cluster3 and consume them
for (int i = 0; i < n; i++) {
producer1.sendAsync("test2".getBytes());
producer3.sendAsync("test2".getBytes());
}
for (int i = 0; i < n * 2; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test2".getBytes());
consumer1.acknowledge(msg);
}

client1.close();
client2.close();
client3.close();
}

static class TestBroker extends MockedPulsarServiceBaseTest {

private String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -4676,6 +4677,28 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException;

/**
* Update migration state for a namespace.
* @param namespace
* Namespace name
* @param migrated
* Flag to determine namespace is migrated or not
* @param clusterUrl
* Cluster url data
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateMigrationState(String namespace, boolean migrated, ClusterUrl clusterUrl) throws PulsarAdminException;

/**
* Update migration state for a namespace asynchronously.
*/
CompletableFuture<Void> updateMigrationStateAsync(String namespace, boolean migrated, ClusterUrl clusterUrl);

/**
* Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously.
*/
Expand Down
Loading

0 comments on commit 572b6f4

Please sign in to comment.