Skip to content

Commit

Permalink
remove unnecessary changes
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Sep 19, 2022
1 parent 7a7d1c8 commit bf7c69e
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private MetricsGenerator metricsGenerator;

private TransactionMetadataStoreProvider transactionMetadataStoreProvider;
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
private TransactionBufferClient transactionBufferClient;
Expand Down Expand Up @@ -460,13 +459,11 @@ public CompletableFuture<Void> closeAsync() {
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
CompletableFuture<Void> brokerCloseFuture = this.brokerService.closeAsync();
if (this.transactionMetadataStoreService != null || transactionPendingAckStoreProvider != null) {
if (this.transactionMetadataStoreService != null) {
asyncCloseFutures.add(brokerCloseFuture.whenComplete((__, ___) -> {
// close transactionMetadataStoreService after the broker has been closed
if (transactionMetadataStoreService != null) {
this.transactionMetadataStoreService.close();
this.transactionMetadataStoreService = null;
}
this.transactionMetadataStoreService.close();
this.transactionMetadataStoreService = null;
}));
} else {
asyncCloseFutures.add(brokerCloseFuture);
Expand Down Expand Up @@ -835,10 +832,10 @@ public void start() throws PulsarServerException {
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());
transactionMetadataStoreProvider = TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName());
transactionMetadataStoreService = new TransactionMetadataStoreService(transactionMetadataStoreProvider,
this, transactionBufferClient, transactionTimer);

transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);

transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,4 @@ static TransactionPendingAckStoreProvider newProvider(String providerClassName)
* if the operation succeeds.
*/
CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.Beta;
import io.netty.util.Timer;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -73,5 +72,4 @@ CompletableFuture<TransactionMetadataStore> openStore(
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator,
TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
return txnLog.initialize().thenCompose(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator)
.init(recoverTracker));
mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker));
}

private static class MLTransactionMetadataStoreBufferedWriterMetrics extends TxnLogBufferedWriterMetricsStats {
Expand Down

0 comments on commit bf7c69e

Please sign in to comment.