Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Add metrics for unload operation #19749

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -204,8 +206,8 @@ public void start() throws PulsarServerException {
});
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.unloadManager = new UnloadManager();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
Expand Down Expand Up @@ -265,7 +267,8 @@ public void start() throws PulsarServerException {
interval, TimeUnit.MILLISECONDS);

this.unloadScheduler = new UnloadScheduler(
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
pulsar, pulsar.getLoadManagerExecutor(), unloadManager,
context, serviceUnitStateChannel, antiAffinityGroupPolicyHelper, unloadCounter, unloadMetrics);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
Expand Down Expand Up @@ -401,16 +404,21 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
log.warn(msg);
throw new IllegalArgumentException(msg);
}
return unloadAsync(new Unload(sourceBroker, bundle.toString(), destinationBroker),
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker);
UnloadDecision unloadDecision =
new UnloadDecision(unload, Success, Admin);
return unloadAsync(unloadDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
});
}

private CompletableFuture<Void> unloadAsync(Unload unload,
private CompletableFuture<Void> unloadAsync(UnloadDecision unloadDecision,
long timeout,
TimeUnit timeoutUnit) {
Unload unload = unloadDecision.getUnload();
CompletableFuture<Void> future = serviceUnitStateChannel.publishUnloadEventAsync(unload);
return unloadManager.waitAsync(future, unload.serviceUnit(), timeout, timeoutUnit);
return unloadManager.waitAsync(future, unload.serviceUnit(), unloadDecision, timeout, timeoutUnit)
.thenRun(() -> unloadCounter.updateUnloadBrokerCount(1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;

/**
* Unload manager.
*/
@Slf4j
public class UnloadManager implements StateChangeListener {

private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;

public UnloadManager() {
public UnloadManager(UnloadCounter counter) {
this.counter = counter;
this.inFlightUnloadRequest = new ConcurrentHashMap<>();
}

Expand All @@ -43,14 +49,8 @@ private void complete(String serviceUnit, Throwable ex) {
if (!future.isDone()) {
if (ex != null) {
future.completeExceptionally(ex);
if (log.isDebugEnabled()) {
log.debug("Complete exceptionally unload bundle: {}", serviceUnit, ex);
}
} else {
future.complete(null);
if (log.isDebugEnabled()) {
log.debug("Complete unload bundle: {}", serviceUnit);
}
}
}
return null;
Expand All @@ -59,6 +59,7 @@ private void complete(String serviceUnit, Throwable ex) {

public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
String bundle,
UnloadDecision decision,
long timeout,
TimeUnit timeoutUnit) {

Expand All @@ -74,7 +75,15 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
}
});
return future;
}));
})).whenComplete((__, ex) -> {
if (ex != null) {
counter.update(Failure, Unknown);
log.warn("Failed to unload bundle: {}", bundle, ex);
return;
}
log.info("Complete unload bundle: {}", bundle);
counter.update(decision);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
Expand All @@ -30,11 +31,13 @@
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableLong;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.pulsar.common.stats.Metrics;

/**
Expand All @@ -45,36 +48,63 @@ public class UnloadCounter {
long unloadBrokerCount = 0;
long unloadBundleCount = 0;

final Map<UnloadDecision.Label, Map<UnloadDecision.Reason, MutableLong>> breakdownCounters;
@Getter
@VisibleForTesting
final Map<UnloadDecision.Label, Map<UnloadDecision.Reason, AtomicLong>> breakdownCounters;

@Getter
@VisibleForTesting
double loadAvg;
@Getter
@VisibleForTesting
double loadStd;

private volatile long updatedAt = 0;

public UnloadCounter() {
breakdownCounters = Map.of(
Success, Map.of(
Overloaded, new MutableLong(),
Underloaded, new MutableLong()),
Overloaded, new AtomicLong(),
Underloaded, new AtomicLong(),
Admin, new AtomicLong()),
Skip, Map.of(
Balanced, new MutableLong(),
NoBundles, new MutableLong(),
CoolDown, new MutableLong(),
OutDatedData, new MutableLong(),
NoLoadData, new MutableLong(),
NoBrokers, new MutableLong(),
Unknown, new MutableLong()),
Balanced, new AtomicLong(),
NoBundles, new AtomicLong(),
CoolDown, new AtomicLong(),
OutDatedData, new AtomicLong(),
NoLoadData, new AtomicLong(),
NoBrokers, new AtomicLong(),
Unknown, new AtomicLong()),
Failure, Map.of(
Unknown, new MutableLong())
Unknown, new AtomicLong())
);
}

public void update(UnloadDecision decision) {
var unloads = decision.getUnloads();
unloadBrokerCount += unloads.keySet().size();
unloadBundleCount += unloads.values().size();
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
loadAvg = decision.loadAvg;
loadStd = decision.loadStd;
if (decision.getLabel() == Success) {
unloadBundleCount++;
}
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
updatedAt = System.currentTimeMillis();
}

public void update(UnloadDecision.Label label, UnloadDecision.Reason reason) {
if (label == Success) {
unloadBundleCount++;
}
breakdownCounters.get(label).get(reason).incrementAndGet();
updatedAt = System.currentTimeMillis();
}

public void updateLoadData(double loadAvg, double loadStd) {
this.loadAvg = loadAvg;
this.loadStd = loadStd;
updatedAt = System.currentTimeMillis();
}

public void updateUnloadBrokerCount(int unloadBrokerCount) {
this.unloadBrokerCount += unloadBrokerCount;
updatedAt = System.currentTimeMillis();
}

public List<Metrics> toMetrics(String advertisedBrokerAddress) {
Expand Down Expand Up @@ -125,4 +155,8 @@ public List<Metrics> toMetrics(String advertisedBrokerAddress) {

return metrics;
}
}

public long updatedAt() {
return updatedAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,21 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.models;

import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Defines the information required to unload or transfer a service unit(e.g. bundle).
*/
@Data
@AllArgsConstructor
public class UnloadDecision {
Multimap<String, Unload> unloads;
Unload unload;
Label label;
Reason reason;
Double loadAvg;
Double loadStd;

public enum Label {
Success,
Skip,
Expand All @@ -55,62 +47,30 @@ public enum Reason {
OutDatedData,
NoLoadData,
NoBrokers,
Admin,
Unknown
}

public UnloadDecision() {
unloads = ArrayListMultimap.create();
unload = null;
label = null;
reason = null;
loadAvg = null;
loadStd = null;
}

public void clear() {
unloads.clear();
unload = null;
label = null;
reason = null;
loadAvg = null;
loadStd = null;
}

public void skip(int numOfOverloadedBrokers,
int numOfUnderloadedBrokers,
int numOfBrokersWithEmptyLoadData,
int numOfBrokersWithFewBundles) {
label = Skip;
if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) {
reason = Balanced;
} else if (numOfBrokersWithEmptyLoadData > 0) {
reason = NoLoadData;
} else if (numOfBrokersWithFewBundles > 0) {
reason = NoBundles;
} else {
reason = Unknown;
}
}

public void skip(Reason reason) {
label = Skip;
this.reason = reason;
}

public void succeed(
int numOfOverloadedBrokers,
int numOfUnderloadedBrokers) {

label = Success;
if (numOfOverloadedBrokers > numOfUnderloadedBrokers) {
reason = Overloaded;
} else {
reason = Underloaded;
}
}


public void fail() {
label = Failure;
reason = Unknown;
public void succeed(Reason reason) {
this.label = Success;
this.reason = reason;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ public boolean canUnload(
String bundle,
String srcBroker,
Optional<String> dstBroker) {



try {
var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;

import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;

Expand All @@ -38,8 +39,8 @@ public interface NamespaceUnloadStrategy {
* @param recentlyUnloadedBrokers The recently unloaded brokers.
* @return unloadDecision containing a list of the bundles that should be unloaded.
*/
UnloadDecision findBundlesForUnloading(LoadManagerContext context,
Map<String, Long> recentlyUnloadedBundles,
Map<String, Long> recentlyUnloadedBrokers);
Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
Map<String, Long> recentlyUnloadedBundles,
Map<String, Long> recentlyUnloadedBrokers);

}
Loading