diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3bb30357930f68..84a1db26f33467 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -244,7 +244,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { private MetricsGenerator metricsGenerator; - private TransactionMetadataStoreProvider transactionMetadataStoreProvider; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; private TransactionBufferClient transactionBufferClient; @@ -460,13 +459,11 @@ public CompletableFuture closeAsync() { List> asyncCloseFutures = new ArrayList<>(); if (this.brokerService != null) { CompletableFuture brokerCloseFuture = this.brokerService.closeAsync(); - if (this.transactionMetadataStoreService != null || transactionPendingAckStoreProvider != null) { + if (this.transactionMetadataStoreService != null) { asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> { // close transactionMetadataStoreService after the broker has been closed - if (transactionMetadataStoreService != null) { - this.transactionMetadataStoreService.close(); - this.transactionMetadataStoreService = null; - } + this.transactionMetadataStoreService.close(); + this.transactionMetadataStoreService = null; })); } else { asyncCloseFutures.add(brokerCloseFuture); @@ -835,10 +832,10 @@ public void start() throws PulsarServerException { transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer, config.getTransactionBufferClientMaxConcurrentRequests(), config.getTransactionBufferClientOperationTimeoutInMills()); - transactionMetadataStoreProvider = TransactionMetadataStoreProvider - .newProvider(config.getTransactionMetadataStoreProviderClassName()); - transactionMetadataStoreService = new TransactionMetadataStoreService(transactionMetadataStoreProvider, - this, transactionBufferClient, transactionTimer); + + transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider + .newProvider(config.getTransactionMetadataStoreProviderClassName()), this, + transactionBufferClient, transactionTimer); transactionBufferProvider = TransactionBufferProvider .newProvider(config.getTransactionBufferProviderClassName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java index 2adbee3bbdec90..fae38ee6a8e89f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java @@ -64,5 +64,4 @@ static TransactionPendingAckStoreProvider newProvider(String providerClassName) * if the operation succeeds. */ CompletableFuture checkInitializedBefore(PersistentSubscription subscription); - } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java index 721c8173514e22..f67d2f3c90a836 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.Beta; import io.netty.util.Timer; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -73,5 +72,4 @@ CompletableFuture openStore( ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer); - } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index e3e1929f6699d9..80f8927b682327 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -70,8 +70,7 @@ public CompletableFuture openStore(TransactionCoordina // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. return txnLog.initialize().thenCompose(__ -> new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, - mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator) - .init(recoverTracker)); + mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker)); } private static class MLTransactionMetadataStoreBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats {