-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl #19622
Changes from 3 commits
42c23e6
c856344
02c9274
0d88959
1ac2868
0d245a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.broker.loadbalance.extensions.manager; | ||
|
||
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure; | ||
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; | ||
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; | ||
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter; | ||
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision; | ||
|
||
/** | ||
* Split manager. | ||
*/ | ||
@Slf4j | ||
public class SplitManager implements StateChangeListener { | ||
|
||
record InFlightSplitRequest(SplitDecision splitDecision, CompletableFuture<Void> future) { | ||
} | ||
|
||
private final Map<String, InFlightSplitRequest> inFlightSplitRequests; | ||
|
||
private final SplitCounter counter; | ||
|
||
public SplitManager(SplitCounter splitCounter) { | ||
this.inFlightSplitRequests = new ConcurrentHashMap<>(); | ||
this.counter = splitCounter; | ||
} | ||
|
||
private void complete(String serviceUnit, Throwable ex) { | ||
inFlightSplitRequests.computeIfPresent(serviceUnit, (__, inFlightSplitRequest) -> { | ||
var future = inFlightSplitRequest.future; | ||
if (!future.isDone()) { | ||
if (ex != null) { | ||
counter.update(Failure, Unknown); | ||
future.completeExceptionally(ex); | ||
log.error("Failed the bundle split event: {}", serviceUnit, ex); | ||
} else { | ||
counter.update(inFlightSplitRequest.splitDecision); | ||
future.complete(null); | ||
log.info("Completed the bundle split event: {}", serviceUnit); | ||
} | ||
} | ||
return null; | ||
}); | ||
} | ||
|
||
public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture, | ||
String bundle, | ||
SplitDecision decision, | ||
long timeout, | ||
TimeUnit timeoutUnit) { | ||
return eventPubFuture | ||
.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> { | ||
log.info("Published the bundle split event for bundle:{}. " | ||
+ "Waiting the split event to complete. Timeout: {} {}", | ||
bundle, timeout, timeoutUnit); | ||
CompletableFuture<Void> future = new CompletableFuture<>(); | ||
future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> { | ||
if (ex != null) { | ||
inFlightSplitRequests.remove(bundle); | ||
log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); | ||
} | ||
}); | ||
return new InFlightSplitRequest(decision, future); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we don't need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Updated. |
||
}).future) | ||
.exceptionally(e -> { | ||
log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); | ||
counter.update(Failure, Unknown); | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using exceptionally will cause the returned future to lose the exception info. We should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
}); | ||
} | ||
|
||
@Override | ||
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { | ||
ServiceUnitState state = ServiceUnitStateData.state(data); | ||
if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) { | ||
this.complete(serviceUnit, t); | ||
return; | ||
} | ||
switch (state) { | ||
case Deleted, Owned, Init -> this.complete(serviceUnit, t); | ||
default -> { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Handling {} for service unit {}", data, serviceUnit); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public void close() { | ||
inFlightSplitRequests.forEach((bundle, inFlightSplitRequest) -> { | ||
if (!inFlightSplitRequest.future.isDone()) { | ||
String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle); | ||
log.warn(msg); | ||
inFlightSplitRequest.future.completeExceptionally(new IllegalStateException(msg)); | ||
} | ||
}); | ||
inFlightSplitRequests.clear(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we update the counter here, when complete exception it will updates twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving the counter update to
waitAsync
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify how we are updating this failure counter twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see. I think we need to clean this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Updated.