Skip to content

Commit

Permalink
[fix][broker] Fix bug causing loss of migrated information when setti…
Browse files Browse the repository at this point in the history
…ng other localPolicies in namespace (#23764)

Co-authored-by: ruihongzhou <[email protected]>
(cherry picked from commit bbe2cab)
  • Loading branch information
hrzzzz authored and lhotari committed Dec 21, 2024
1 parent 2117be7 commit 0f15a36
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,8 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
LocalPolicies localPolicies = oldPolicies.map(
policies -> new LocalPolicies(policies.bundles,
bookieAffinityGroup,
policies.namespaceAntiAffinityGroup))
policies.namespaceAntiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
bookieAffinityGroup,
null));
Expand Down Expand Up @@ -1779,7 +1780,8 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup))
antiAffinityGroup,
policies.migrated))
.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))
);
Expand Down Expand Up @@ -1816,7 +1818,8 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
null));
null,
policies.migrated));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e);
Expand Down Expand Up @@ -2765,10 +2768,13 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
policies.migrated = migrated;
return policies;
});
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, oldPolicies -> oldPolicies.map(
policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
policies.namespaceAntiAffinityGroup,
migrated))
.orElseGet(() -> new LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
null, null, migrated)));
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public BundlesData getBundlesData() {
public LocalPolicies toLocalPolicies() {
return new LocalPolicies(this.getBundlesData(),
localPolicies.map(lp -> lp.getLeft().bookieAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null));
localPolicies.map(lp -> lp.getLeft().namespaceAntiAffinityGroup).orElse(null),
localPolicies.map(lp -> lp.getLeft().migrated).orElse(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
Expand Down Expand Up @@ -2195,4 +2196,30 @@ public void testDispatcherPauseOnAckStatePersistent() throws Exception {

admin.namespaces().deleteNamespace(namespace);
}

public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate() throws Exception {
String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace");
admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));

admin.namespaces().updateMigrationState(namespace, true);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);

String bookieAffinityGroupPrimary = "group1";
admin.namespaces().setBookieAffinityGroup(namespace,
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build());
assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(),
bookieAffinityGroupPrimary);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);

String namespaceAntiAffinityGroup = "group2";
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), namespaceAntiAffinityGroup);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);

admin.namespaces().deleteBookieAffinityGroup(namespace);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);

admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,29 @@ public class LocalPolicies {
public final BookieAffinityGroupData bookieAffinityGroup;
// namespace anti-affinity-group
public final String namespaceAntiAffinityGroup;
public boolean migrated;
public final boolean migrated;

public LocalPolicies() {
bundles = defaultBundle();
bookieAffinityGroup = null;
namespaceAntiAffinityGroup = null;
migrated = false;
}

public LocalPolicies(BundlesData data,
BookieAffinityGroupData bookieAffinityGroup,
String namespaceAntiAffinityGroup) {
this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false);
}

public LocalPolicies(BundlesData data,
BookieAffinityGroupData bookieAffinityGroup,
String namespaceAntiAffinityGroup,
boolean migrated) {
bundles = data;
this.bookieAffinityGroup = bookieAffinityGroup;
this.namespaceAntiAffinityGroup = namespaceAntiAffinityGroup;
this.migrated = migrated;
}

}

0 comments on commit 0f15a36

Please sign in to comment.