From 3d54c1536752dc741ebf9217f57e6b47169cec51 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 30 Dec 2022 11:19:35 +0800 Subject: [PATCH] Update --- .../apache/pulsar/broker/PulsarService.java | 12 ++++--- .../broker/loadbalance/LoadManager.java | 7 ++-- .../extensions/ExtensibleLoadManagerImpl.java | 26 ++++++++------- .../ExtensibleLoadManagerWrapper.java | 13 +++++--- .../channel/ServiceUnitStateChannel.java | 4 +-- .../channel/ServiceUnitStateChannelImpl.java | 22 +++++++------ .../broker/namespace/NamespaceService.java | 20 +++++------- .../ExtensibleLoadManagerImplTest.java | 4 +-- .../channel/ServiceUnitStateChannelTest.java | 32 +++++++------------ 9 files changed, 70 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 520a76e32e9892..824641860ac11c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -89,7 +89,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; -import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; @@ -802,7 +802,7 @@ public void start() throws PulsarServerException { } brokerService.start(); - if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { // Init system namespace for extensible load manager this.createNamespaceIfNotExists(this.getConfiguration().getClusterName(), SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE); @@ -842,6 +842,9 @@ public void start() throws PulsarServerException { // By starting the Load manager service, the broker will also become visible // to the rest of the broker by creating the registration z-node. This needs // to be done only when the broker is fully operative. + // + // The load manager service and its service unit state channel need to be initialized first + // (namespace service depends on load manager) this.startLoadManagementService(); // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. @@ -1129,7 +1132,8 @@ protected void closeLocalMetadataStore() throws Exception { } protected void startLeaderElectionService() { - if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { + LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService."); return; } this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(), @@ -1236,7 +1240,7 @@ protected void startLoadManagementService() throws PulsarServerException { LOG.info("Starting load management service ..."); this.loadManager.get().start(); - if (config.isLoadBalancerEnabled()) { + if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { LOG.info("Starting load balancer"); if (this.loadReportTask == null) { long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 230d36cf938a42..f03b34337934d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; @@ -63,11 +64,11 @@ public interface LoadManager { default CompletableFuture> findBrokerServiceUrl( Optional topic, ServiceUnitId bundle) { - return null; + throw new UnsupportedOperationException(); } default CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundle) { - return null; + throw new UnsupportedOperationException(); } /** @@ -155,7 +156,7 @@ static LoadManager create(final PulsarService pulsar) { final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance); casted.initialize(pulsar); return casted; - } else if (loadManagerInstance instanceof ExtensibleLoadManagerImpl) { + } else if (loadManagerInstance instanceof ExtensibleLoadManager) { final LoadManager casted = new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance); casted.initialize(pulsar); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 3a6251639583c9..14b0d57fd51b5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; @@ -61,7 +60,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { @Getter private List brokerFilterPipeline; - private final AtomicBoolean started = new AtomicBoolean(false); + private volatile boolean started = false; private final ConcurrentOpenHashMap>> lookupRequests = ConcurrentOpenHashMap.(); + // TODO: Make brokerSelectionStrategy configurable. this.brokerSelectionStrategy = (brokers, bundle, context) -> { if (brokers.isEmpty()) { return Optional.empty(); @@ -81,9 +81,13 @@ public ExtensibleLoadManagerImpl() { }; } + public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { + return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName()); + } + @Override - public void start() throws PulsarServerException { - if (this.started.get()) { + public synchronized void start() throws PulsarServerException { + if (this.started) { return; } this.brokerRegistry = new BrokerRegistryImpl(pulsar); @@ -101,8 +105,7 @@ public void start() throws PulsarServerException { // TODO: Start load data reporter. // TODO: Start unload scheduler and bundle split scheduler - - this.started.set(true); + this.started = true; } @Override @@ -129,10 +132,11 @@ public CompletableFuture> assign(Optional { if (brokerOpt.isPresent()) { log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle); - return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()); + return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get()) + .thenApply(Optional::of); } else { throw new IllegalStateException( - "Failed to discover(select) the new owner broker for bundle: " + bundle); + "Failed to select the new owner broker for bundle: " + bundle); } }); } @@ -204,8 +208,8 @@ public CompletableFuture checkOwnershipAsync(Optional to } @Override - public void close() throws PulsarServerException { - if (!this.started.get()) { + public synchronized void close() throws PulsarServerException { + if (!this.started) { return; } try { @@ -214,7 +218,7 @@ public void close() throws PulsarServerException { throw new PulsarServerException(e); } this.serviceUnitStateChannel.close(); - this.started.set(false); + this.started = false; } private boolean isInternalTopic(String topic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 54c035f92766c2..87f6c7f63f447c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -96,42 +96,45 @@ public void stop() throws PulsarServerException { @Override public Optional getLeastLoaded(ServiceUnitId su) throws Exception { - return Optional.empty(); + throw new UnsupportedOperationException(); } @Override public LoadManagerReport generateLoadReport() { - return null; + throw new UnsupportedOperationException(); } @Override public void setLoadReportForceUpdateFlag() { - // No-op. + throw new UnsupportedOperationException(); } @Override public void writeLoadReportOnZookeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. + throw new UnsupportedOperationException(); } @Override public void writeResourceQuotasToZooKeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. + throw new UnsupportedOperationException(); } @Override public List getLoadBalancingMetrics() { + // TODO: Add metrics. return null; } @Override public void doLoadShedding() { - // No-op. + throw new UnsupportedOperationException(); } @Override public void doNamespaceBundleSplit() { - // No-op. + throw new UnsupportedOperationException(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 0e28a31fa54b27..fece425e75fd92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -116,7 +116,7 @@ public interface ServiceUnitStateChannel extends Closeable { * the future object will complete and return the owner broker. * Sub-case2: If the assigned broker does not take the ownership in time, * the future object will time out. - * Case 3: If none of them, it returns null. + * Case 3: If none of them, it returns Optional.empty(). */ CompletableFuture> getOwnerAsync(String serviceUnit); @@ -132,7 +132,7 @@ public interface ServiceUnitStateChannel extends Closeable { * case 2: If the assigned broker does not take the ownership in time, * the future object will time out. */ - CompletableFuture> publishAssignEventAsync(String serviceUnit, String broker); + CompletableFuture publishAssignEventAsync(String serviceUnit, String broker); /** * Asynchronously publishes the service unit unload event to the system topic in this channel. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 81bfc02ed79d6f..f9a10ae591da10 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -84,7 +84,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private final PulsarService pulsar; private final Schema schema; - private final ConcurrentOpenHashMap>> getOwnerRequests; + private final ConcurrentOpenHashMap> getOwnerRequests; private final String lookupServiceAddress; // TODO: define BrokerRegistry private final ConcurrentOpenHashMap> cleanupJobs; @@ -130,13 +130,13 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) { this.lookupServiceAddress = pulsar.getLookupServiceAddress(); this.schema = Schema.JSON(ServiceUnitStateData.class); this.getOwnerRequests = ConcurrentOpenHashMap.>>newBuilder().build(); + CompletableFuture>newBuilder().build(); this.cleanupJobs = ConcurrentOpenHashMap.>newBuilder().build(); this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS; this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS; this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS; this.leaderElectionService = new LeaderElectionService( - pulsar.getCoordinationService(), pulsar.getLookupServiceAddress(), + pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), state -> { if (state == LeaderElectionState.Leading) { log.debug("This broker:{} is the leader now.", lookupServiceAddress); @@ -242,7 +242,9 @@ public CompletableFuture> getChannelOwnerAsync() { // TODO: discard this protocol prefix removal // by a util func that returns lookupServiceAddress(serviceUrl) if (leader.isPresent()) { - return Optional.of(leader.get().getServiceUrl()); + String broker = leader.get().getServiceUrl(); + broker = broker.substring(broker.lastIndexOf('/') + 1); + return Optional.of(broker); } else { // When leader is empty, we should throw exception to notify is failed. String msg = "There is no channel owner now."; @@ -287,7 +289,7 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { return CompletableFuture.completedFuture(Optional.of(data.broker())); } case Assigned, Released -> { - return deferGetOwnerRequest(serviceUnit); + return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of); } default -> { String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data); @@ -299,8 +301,8 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { } } - public CompletableFuture> publishAssignEventAsync(String serviceUnit, String broker) { - CompletableFuture> getOwnerRequest = deferGetOwnerRequest(serviceUnit); + public CompletableFuture publishAssignEventAsync(String serviceUnit, String broker) { + CompletableFuture getOwnerRequest = deferGetOwnerRequest(serviceUnit); pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker)) .whenComplete((__, ex) -> { if (ex != null) { @@ -385,7 +387,7 @@ lookupServiceAddress, getLogEventTag(data), serviceUnit, private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { - getOwnerRequest.complete(Optional.of(data.broker())); + getOwnerRequest.complete(data.broker()); } if (isTargetBroker(data.broker())) { log(null, serviceUnit, data, null); @@ -470,10 +472,10 @@ private NamespaceBundle getNamespaceBundle(String bundle) { return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange); } - private CompletableFuture> deferGetOwnerRequest(String serviceUnit) { + private CompletableFuture deferGetOwnerRequest(String serviceUnit) { return getOwnerRequests .computeIfAbsent(serviceUnit, k -> { - CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS) .whenComplete((v, e) -> { if (e != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 3c67216e85b7de..7038cc5e927e29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -55,7 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; -import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -172,16 +172,12 @@ public void initialize() { } } - public boolean isExtensibleLoadManager(){ - return loadManager.get() instanceof ExtensibleLoadManagerWrapper; - } - public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) { long startTime = System.nanoTime(); CompletableFuture> future = getBundleAsync(topic) .thenCompose(bundle -> { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle); } else { return findBrokerServiceUrl(bundle, options); @@ -274,7 +270,7 @@ private CompletableFuture> internalGetWebServiceUrl(Optional { if (lookupResult.isPresent()) { @@ -1040,7 +1036,7 @@ public CompletableFuture isServiceUnitOwnedAsync(ServiceUnitId suName) } if (suName instanceof NamespaceBundle) { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return loadManager.get().checkOwnershipAsync(Optional.empty(), suName); } return CompletableFuture.completedFuture( @@ -1065,7 +1061,7 @@ public boolean isServiceUnitActive(TopicName topicName) { } public CompletableFuture isServiceUnitActiveAsync(TopicName topicName) { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return getBundleAsync(topicName) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); } @@ -1082,7 +1078,7 @@ private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception { } private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return getFullBundleAsync(fqnn) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle)); } @@ -1091,7 +1087,7 @@ private CompletableFuture isNamespaceOwnedAsync(NamespaceName fqnn) { } private CompletableFuture isTopicOwnedAsync(TopicName topic) { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return getBundleAsync(topic) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle)); } @@ -1099,7 +1095,7 @@ private CompletableFuture isTopicOwnedAsync(TopicName topic) { } public CompletableFuture checkTopicOwnership(TopicName topicName) { - if (isExtensibleLoadManager()) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { return getBundleAsync(topicName) .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 45365dbfbaa754..13f4e21e95b6b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -130,9 +130,7 @@ public void testAssignInternalTopic() throws Exception { FieldUtils.readField(channel1, "leaderElectionService", true); Optional currentLeader = leaderElectionService.getCurrentLeader(); assertTrue(currentLeader.isPresent()); - String webServiceUrl = brokerLookupData1.get().getWebServiceUrl(); - assertEquals(webServiceUrl.substring(webServiceUrl.lastIndexOf('/') + 1), - currentLeader.get().getServiceUrl()); + assertEquals(brokerLookupData1.get().getWebServiceUrl(), currentLeader.get().getServiceUrl()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 12e90300f21ac6..a212652c692ccf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -146,7 +146,7 @@ public void channelOwnerTest() throws Exception { assertEquals(newChannelOwner1, newChannelOwner2); assertNotEquals(channelOwner1, newChannelOwner1); - if (newChannelOwner1.equals(lookupServiceAddress1)) { + if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) { assertTrue(channel1.isChannelOwnerAsync().get(2, TimeUnit.SECONDS)); assertFalse(channel2.isChannelOwnerAsync().get(2, TimeUnit.SECONDS)); } else { @@ -269,7 +269,7 @@ public void assignmentTest() var owner2 = channel2.getOwnerAsync(bundle); assertTrue(owner1.get().isEmpty()); - assertTrue(owner1.get().isEmpty()); + assertTrue(owner2.get().isEmpty()); var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); @@ -277,12 +277,8 @@ public void assignmentTest() assertNotNull(assigned2); waitUntilOwnerChanges(channel1, bundle, null); waitUntilOwnerChanges(channel2, bundle, null); - Optional assignedAddrOpt1 = assigned1.get(5, TimeUnit.SECONDS); - Optional assignedAddrOpt2 = assigned2.get(5, TimeUnit.SECONDS); - assertTrue(assignedAddrOpt1.isPresent()); - assertTrue(assignedAddrOpt2.isPresent()); - String assignedAddr1 = assignedAddrOpt1.get(); - String assignedAddr2 = assignedAddrOpt2.get(); + String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS); + String assignedAddr2 = assigned2.get(5, TimeUnit.SECONDS); assertEquals(assignedAddr1, assignedAddr2); assertTrue(assignedAddr1.equals(lookupServiceAddress1) @@ -324,14 +320,12 @@ public void assignmentTestWhenOneAssignmentFails() assertTrue(owner1.get().isEmpty()); assertTrue(owner2.get().isEmpty()); - owner1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); - owner2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); - assertTrue(owner1.isCompletedExceptionally()); - assertNotNull(owner2); - Optional ownerAddrOpt2 = owner2.get(5, TimeUnit.SECONDS); - assertTrue(ownerAddrOpt2.isPresent()); - String ownerAddr2 = ownerAddrOpt2.get(); - assertEquals(ownerAddr2, lookupServiceAddress2); + var owner3 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); + var owner4 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); + assertTrue(owner3.isCompletedExceptionally()); + assertNotNull(owner4); + String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS); + assertEquals(ownerAddrOpt2, lookupServiceAddress2); waitUntilNewOwner(channel1, bundle, lookupServiceAddress2); assertEquals(0, getOwnerRequests1.size()); assertEquals(0, getOwnerRequests2.size()); @@ -725,10 +719,8 @@ private static void waitUntilNewChannelOwner(ServiceUnitStateChannel channel, St if (!owner.isDone()) { return false; } - if (oldOwner == null) { - return owner != null; - } - return !oldOwner.equals(owner); + + return !StringUtils.equals(oldOwner, owner.get().orElse(null)); }); }