Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix ExtensibleLoadManager flaky test #14

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

Demogorgon314
Copy link
Owner

@Demogorgon314 Demogorgon314 commented Jun 23, 2023

Run unit test only

1. Fix the deadlock by use thenComposeAsync

"metadata-store-9-1" #26 prio=5 os_prio=0 cpu=227.69ms elapsed=109.96s tid=0x000056447f5a14c0 nid=0xec waiting on condition  [0x00007f9db07d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for  <0x00001000048e8830> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:252)
at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1866)
at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3463)
at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3434)
at java.util.concurrent.CompletableFuture.timedGet([email protected]/CompletableFuture.java:1939)
at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2095)
at org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(LoadManagerShared.java:403)
at org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper.filter(AntiAffinityGroupPolicyHelper.java:46)
at org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter.filter(AntiAffinityGroupPolicyFilter.java:43)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$selectAsync$16(ExtensibleLoadManagerImpl.java:494)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$933/0x000000080187dfb8.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:480)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:473)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$getOwnerAsync$9(ExtensibleLoadManagerImpl.java:396)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$834/0x0000000801738890.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.getOwnerAsync(ExtensibleLoadManagerImpl.java:388)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$7(ExtensibleLoadManagerImpl.java:380)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl$$Lambda$656/0x00000008016a3b20.apply(Unknown Source)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409)
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.dedupeLookupRequest(ExtensibleLoadManagerImpl.java:461)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:374)
at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66)
at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:198)
at org.apache.pulsar.broker.namespace.NamespaceService$$Lambda$741/0x00000008016fd8d8.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$1(NamespaceService.java:191)
at org.apache.pulsar.broker.namespace.NamespaceService$$Lambda$740/0x00000008016fd690.apply(Unknown Source)
at java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at org.apache.pulsar.broker.namespace.NamespaceService.getBrokerServiceUrlAsync(NamespaceService.java:189)
at org.apache.pulsar.broker.lookup.TopicLookupBase.lambda$internalLookupTopicAsync$6(TopicLookupBase.java:85)
at org.apache.pulsar.broker.lookup.TopicLookupBase$$Lambda$927/0x000000080187d228.apply(Unknown Source)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire([email protected]/CompletableFuture.java:1150)
at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:510)
at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2147)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$existsFromStore$9(ZKMetadataStore.java:349)
at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$279/0x000000080138a950.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)

@@ -320,7 +320,9 @@ public synchronized void start() throws PulsarServerException {
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
tableview.listen((key, value) -> handle(key, value));
tableview.listen((key, value) -> {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a separate thread to handle the notification.

@Demogorgon314 Demogorgon314 force-pushed the Demogorgon314/fix-extensible-load-manager-flaky-test branch from efa4bae to 09aa452 Compare June 25, 2023 15:03
@Demogorgon314 Demogorgon314 self-assigned this Jun 26, 2023
@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jul 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant