From eb300b854bd07b0ddf45157ca07854ec4335176a Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 18 Nov 2022 11:11:00 +0800 Subject: [PATCH] address comments --- .../bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java | 8 +++----- .../java/org/apache/pulsar/broker/service/Producer.java | 3 +++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 28a427677581c6..ab8815264373a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -93,13 +93,10 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); } sourceLedgersStat = stat; - // Fails if init with empty ledger. Very small chance here, since shadow topic is - // created when source topic exists. if (mlInfo.getLedgerInfoCount() == 0) { + // Small chance here, since shadow topic is created after source topic exists. log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, mlInfo, stat); -// callback.initializeFailed(new ManagedLedgerException.ManagedLedgerSourceNotReadyException( -// "Source managed ledger " + sourceMLName + " is not ready yet.")); ShadowManagedLedgerImpl.super.initialize(callback, ctx); return; } @@ -281,7 +278,8 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] new SourceManagedLedgerInfo:{}", name, sourceMLName, mlInfo); + log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, prevStat={},stat={}", name, sourceMLName, mlInfo, + sourceLedgersStat, stat); } sourceLedgersStat = stat; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index e6957e86e76719..fe2249679a84cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -348,6 +348,9 @@ public TransportCnx getCnx() { return this.cnx; } + /** + * MessagePublishContext implements Position because that ShadowManagedLedger need to know the source position info. + */ private static final class MessagePublishContext implements PublishContext, Runnable, Position { /* * To store context information built by message payload