Skip to content

Commit

Permalink
[improve][broker] PIP-192 Added Split Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 24, 2023
1 parent 389792b commit 73be2ba
Show file tree
Hide file tree
Showing 11 changed files with 854 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,30 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
+ "(only used in load balancer extension logics)"
)
private double loadBalancerBundleLoadReportPercentage = 10;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Service units'(bundles) split interval. Broker periodically checks whether "
+ "some service units(e.g. bundles) should split if they become hot-spots. "
+ "(only used in load balancer extension logics)"
)
private int loadBalancerSplitIntervalMinutes = 1;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
doc = "Max number of bundles to split to per cycle. "
+ "(only used in load balancer extension logics)"
)
private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
doc = "Threshold to the consecutive count of fulfilled split conditions. "
+ "If the split scheduler consecutively finds bundles that meet split conditions "
+ "many times bigger than this threshold, the scheduler will trigger splits on the bundles "
+ "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). "
+ "(only used in load balancer extension logics)"
)
private int loadBalancerNamespaceBundleSplitConditionThreshold = 5;

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
Expand Down Expand Up @@ -98,7 +98,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

@Getter
private final List<BrokerFilter> brokerFilterPipeline;

/**
* The load data reporter.
*/
Expand All @@ -108,6 +107,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private ScheduledFuture brokerLoadDataReportTask;
private ScheduledFuture topBundlesLoadDataReportTask;
private SplitScheduler splitScheduler;

private boolean started = false;

Expand Down Expand Up @@ -166,7 +166,6 @@ public void start() throws PulsarServerException {
.brokerLoadDataStore(brokerLoadDataStore)
.topBundleLoadDataStore(topBundlesLoadDataStore).build();


this.brokerLoadDataReporter =
new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);

Expand Down Expand Up @@ -198,9 +197,11 @@ public void start() throws PulsarServerException {
interval,
interval, TimeUnit.MILLISECONDS);

// TODO: Start bundle split scheduler.
this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel);
this.unloadScheduler.start();
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitCounter, splitMetrics, context);
this.splitScheduler.start();
this.started = true;
}

Expand Down Expand Up @@ -326,6 +327,7 @@ public void close() throws PulsarServerException {
this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
this.unloadScheduler.close();
this.splitScheduler.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
Expand Down Expand Up @@ -356,13 +358,6 @@ private void updateUnloadMetrics(UnloadDecision decision) {
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
}

private void updateSplitMetrics(List<SplitDecision> decisions) {
for (var decision : decisions) {
splitCounter.update(decision);
}
this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
}

public List<Metrics> getMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
return null;
});
}

public void handleMetadataSessionEvent(SessionEvent e) {
if (e == SessionReestablished || e == SessionLost) {
lastMetadataSessionEvent = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.pulsar.broker.loadbalance.extensions.models;

import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
Expand All @@ -32,39 +30,45 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableLong;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.common.stats.Metrics;

/**
* Defines the information required for a service unit split(e.g. bundle split).
*/
public class SplitCounter {

long splitCount = 0;

final Map<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> breakdownCounters;
private long splitCount = 0;
private final Map<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> breakdownCounters;
private volatile long updatedAt = 0;

public SplitCounter() {
breakdownCounters = Map.of(
Success, Map.of(
Topics, new MutableLong(),
Sessions, new MutableLong(),
MsgRate, new MutableLong(),
Bandwidth, new MutableLong(),
Admin, new MutableLong()),
Skip, Map.of(
Balanced, new MutableLong()
),
Topics, new AtomicLong(),
Sessions, new AtomicLong(),
MsgRate, new AtomicLong(),
Bandwidth, new AtomicLong(),
Admin, new AtomicLong()),
Failure, Map.of(
Unknown, new MutableLong())
Unknown, new AtomicLong())
);
}

public void update(SplitDecision decision) {
if (decision.label == Success) {
splitCount++;
}
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
updatedAt = System.currentTimeMillis();
}

public void update(SplitDecision.Label label, SplitDecision.Reason reason) {
if (label == Success) {
splitCount++;
}
breakdownCounters.get(label).get(reason).incrementAndGet();
updatedAt = System.currentTimeMillis();
}

public List<Metrics> toMetrics(String advertisedBrokerAddress) {
Expand All @@ -77,22 +81,26 @@ public List<Metrics> toMetrics(String advertisedBrokerAddress) {
m.put("brk_lb_bundles_split_total", splitCount);
metrics.add(m);

for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> etr

for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> etr
: breakdownCounters.entrySet()) {
var result = etr.getKey();
for (Map.Entry<SplitDecision.Reason, MutableLong> counter : etr.getValue().entrySet()) {
for (Map.Entry<SplitDecision.Reason, AtomicLong> counter : etr.getValue().entrySet()) {
var reason = counter.getKey();
var count = counter.getValue();
Map<String, String> breakdownDims = new HashMap<>(dimensions);
breakdownDims.put("result", result.toString());
breakdownDims.put("reason", reason.toString());
Metrics breakdownMetric = Metrics.create(breakdownDims);
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count);
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get());
metrics.add(breakdownMetric);
}
}

return metrics;
}

public long updatedAt() {
return updatedAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.models;

import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
import lombok.Data;

Expand All @@ -36,7 +34,6 @@ public class SplitDecision {

public enum Label {
Success,
Skip,
Failure
}

Expand All @@ -46,7 +43,6 @@ public enum Reason {
MsgRate,
Bandwidth,
Admin,
Balanced,
Unknown
}

Expand All @@ -62,11 +58,6 @@ public void clear() {
reason = null;
}

public void skip() {
label = Skip;
reason = Balanced;
}

public void succeed(Reason reason) {
label = Success;
this.reason = reason;
Expand Down
Loading

0 comments on commit 73be2ba

Please sign in to comment.