-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker] PIP-180 Part VI: Add ShadowManagedLedgerImpl #18265
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
Codecov Report
@@ Coverage Diff @@
## master #18265 +/- ##
============================================
- Coverage 47.51% 47.06% -0.46%
+ Complexity 10520 10457 -63
============================================
Files 698 698
Lines 68079 68323 +244
Branches 7280 7332 +52
============================================
- Hits 32351 32157 -194
- Misses 32151 32565 +414
- Partials 3577 3601 +24
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
Show resolved
Hide resolved
LGTM with minor comments |
9295d45
to
eb300b8
Compare
|
||
public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, | ||
MetaStore store, ManagedLedgerConfig config, | ||
OrderedScheduler scheduledExecutor, | ||
String name, final Supplier<Boolean> mlOwnershipChecker) { | ||
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); | ||
this.shadowSource = TopicName.get(config.getShadowSource()); | ||
this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); | ||
if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the name
is not able to be used? It's a little confusing that we need to introduce the topic name in ManagedLedgerConfig
Or maybe we can introduce a properties
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the
name
is not able to be used? It's a little confusing that we need to introduce the topic name inManagedLedgerConfig
Good point.
- We need to know the
TopicName
of the source topic in the shadow topic. But thename
here is in the format ofPersistenceNamingEncoding
. - I get that we'd better not introduce the concept of "topic" into managed-ledger. I will move this logic to the "broker" layer. So that the source topic name is in
properties
and the managedLedgerName of the source topic is stored in the new fieldManagedLedgerConfig.shadowSourceName
.
sourceLedgersStat, stat); | ||
} | ||
|
||
sourceLedgersStat = stat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the data version?
Maybe the watcher gets a notification first, and then we get the response from get managed ledger info operation. Will we add the deleted ledger back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Added a version check.
sourceLedgersStat = stat; | ||
|
||
if (mlInfo.hasTerminatedPosition()) { | ||
state = State.Terminated; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need Terminated state for a shadow ledger? If the source topic changes to the terminated state, the shadow topic will not get any new messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed actually, just keeping the original logic at first.
After another thought, we should remove this, because the source topic can be terminated but the replication to the shadow topic should go on.
So comes these updates:
- Removed Terminated state from ShadowManagedLedger.
- Overrides
asyncTerminate
and return fail directly.
if (state == State.Terminated) { | ||
addOperation.failed(new ManagedLedgerException.ManagedLedgerTerminatedException( | ||
"Managed ledger was already terminated")); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need to check if the topic is terminated.
And if the broker receives the zookeeper notification first, then the shadow topic changes to the terminated state. But the shadow replicator hasn't reached the end of the source topic. In this case, the consumer connected to the shadow topic will have a different view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly.
currentLedgerEntries = position.getEntryId(); | ||
currentLedgerSize += addOperation.data.readableBytes(); | ||
addOperation.initiateShadowWrite(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the position's ledger ID that is not equal to the current ledger ID? It looks like the addOperation
will never be complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, and add ut in ShadowManagedLedgerImplTest
@codelipenghui Comments all addressed. PTAL. |
- finish processSourceManagedLedgerInfo - finish internalAsyncAddEntry - ShadowTopicTest pass - ShadowReplicatorTest pass Add test cases
69b8222
to
536f10a
Compare
Hi @Jason918, do you have any updates for documenting this feature? |
@momo-jun I have communicated with @Jason918 , I'll help him update the documentation |
Thanks @Jason918 @StevenLuMT. Feel free to ping me if you need any assistance with docs. |
Thanks @momo-jun ,my WeChat ID is 863199780, add me, keep fast communication |
Master Issue: #16153
Motivation
After this PR, the basic function of
PIP-180 shadow topic
will be ready.Modifications
ShadowManagedLedgerImpl
Verifying this change
This change added tests and can be verified as follows:
ShadowTopicTest.java
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: Jason918#12