-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy #19045
[improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy #19045
Conversation
d4b4e85
to
3e7c2bf
Compare
.../apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
Outdated
Show resolved
Hide resolved
…s in ServiceUnitStateCompactionTest
271bee2
to
e348be3
Compare
Codecov Report
@@ Coverage Diff @@
## master #19045 +/- ##
=============================================
+ Coverage 47.20% 60.71% +13.51%
- Complexity 10645 26081 +15436
=============================================
Files 709 1884 +1175
Lines 69421 137392 +67971
Branches 7449 15106 +7657
=============================================
+ Hits 32769 83424 +50655
- Misses 32984 46262 +13278
- Partials 3668 7706 +4038
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@Technoboy- Hi, can you take a look again? |
private boolean checkBrokers = true; | ||
|
||
public ServiceUnitStateCompactionStrategy() { | ||
schema = JSONSchema.of(ServiceUnitStateData.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use SchemaJSON()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@@ -385,6 +385,7 @@ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader, | |||
promise.completeExceptionally(e); | |||
return; | |||
} | |||
outstanding.release(MAX_OUTSTANDING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this after promise.complete(null);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I found a deadlock if we don't release this semaphore before completing the future.
deadLock example
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.junit.Test;
public class SemaphoreTest {
@Test
public void deadlockTest() {
Semaphore sm = new Semaphore(2);
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = new CompletableFuture();
CompletableFuture.supplyAsync(() -> {
System.out.println("starting");
try {
sm.acquire(2);
System.out.println("acquired 2");
} catch (InterruptedException e) {
}
try {
Thread.sleep(1000*5);
} catch (InterruptedException e) {
}
System.out.println("return!");
future.complete(""); // hangs here.
System.out.println("proceeding");
sm.release(2);
System.out.println("released 2");
return "";
}, executor);
future.thenCompose(x -> {
try {
System.out.println("acquiring one!");
sm.acquire(); // hangs here.
System.out.println("acquired 1");
sm.release();
} catch (InterruptedException e) {
}
return CompletableFuture.completedFuture("");
}).join();
}
}
compactionScheduler = Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); | ||
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); | ||
schema = JSONSchema.of(ServiceUnitStateData.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema.JSON
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
e0c7cfb
to
bf665d3
Compare
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", | ||
serviceUnit, stateData); | ||
serviceUnitTombstoneErrorCnt.incrementAndGet(); | ||
} | ||
}); | ||
serviceUnitTombstoneCnt.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help explain why do we need modify here ?
we update serviceUnitTombstoneErrorCnt
and serviceUnitTombstoneCnt
in the async method. these values could both be 0. and let line 643 pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It relies on producer.flush()
to persist all outstanding messages before returning this call.
Yes, because these values could be 0, I updated this code path here. Also, I further cleaned the metrics computation code to be cleaner.
3618416
to
cb61a15
Compare
cb61a15
to
c818108
Compare
@Technoboy- ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Show resolved
Hide resolved
@@ -1563,6 +1571,8 @@ public void checkCompaction() { | |||
} | |||
|
|||
if (backlogEstimate > compactionThreshold) { | |||
log.info("topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering compaction", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a debug-level log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this topic compaction log is useful. Could you share your concerns about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose you have many topics, like 50k topics per broker. By default, we will have 50k logs per minute. And we already have logs after the compaction task started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will make this debug lvl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Master Issue: #16691
Motivation
This PR adds ServiceUnitStateCompactionStrategy.
Modifications
For the PIP-192, this PR adds the
ServiceUnitStateCompactionStrategy
and its unit tests.Also, this PR updates the related classes to enable ServiceUnitStateCompactionStrategy:
strategicCompactor
member in PulsarService.ServiceUnitStateCompactionStrategy
for thetableview
member in ServiceUnitStateChannelImpl.strategicCompactionMap
member in PersistentTopicVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
We will have separate PRs to update the Doc later.
Matching PR in forked repository
PR in forked repository: heesung-sn#18