Skip to content

Commit

Permalink
[Improve][txn] extend admin tools for transaction component. (#15675)
Browse files Browse the repository at this point in the history
* [Improve][txn] extend admin tools for transaction buffer.
### Motivation
In order to better monitor the usage of transactions. And more convenient positioning problem, so we hope to get more information when getting TB, TP, TC status.

### Modification
1. Add lowWaterMarks (optional) in stats of TB, TP.
2. Add brokerOwnerUrl in stats of TB, TP
3. Add ongoingtxns in stats of TB, TP, TC.
  • Loading branch information
liangyepianzhou authored Jun 22, 2022
1 parent 33cf2d0 commit c505659
Show file tree
Hide file tree
Showing 22 changed files with 203 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,16 @@ protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBu
.thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
}

protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative,
boolean lowWaterMarks) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionBufferStats());
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks));
}

protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
boolean authoritative, String subName) {
boolean authoritative, String subName, boolean lowWaterMarks) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionPendingAckStats(subName));
.thenApply(topic -> topic.getTransactionPendingAckStats(subName, lowWaterMarks));
}

protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("lowWaterMarks") @DefaultValue("false")
boolean lowWaterMarks) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative)
internalGetTransactionBufferStats(authoritative, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down Expand Up @@ -192,11 +194,12 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName) {
@PathParam("subName") String subName,
@QueryParam("lowWaterMarks") @DefaultValue("false") boolean lowWaterMarks) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(authoritative, subName)
internalGetPendingAckStats(authoritative, subName, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,8 +1165,8 @@ public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
}

public TransactionPendingAckStats getTransactionPendingAckStats() {
return this.pendingAckHandle.getStats();
public TransactionPendingAckStats getTransactionPendingAckStats(boolean lowWaterMarks) {
return this.pendingAckHandle.getStats(lowWaterMarks);
}

public boolean checkAndUnblockIfStuck() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3099,12 +3099,12 @@ public boolean checkSubscriptionTypesEnable(SubType subType) {
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}

public TransactionBufferStats getTransactionBufferStats() {
return this.transactionBuffer.getStats();
public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks) {
return this.transactionBuffer.getStats(lowWaterMarks);
}

public TransactionPendingAckStats getTransactionPendingAckStats(String subName) {
return this.subscriptions.get(subName).getTransactionPendingAckStats();
public TransactionPendingAckStats getTransactionPendingAckStats(String subName, boolean lowWaterMarks) {
return this.subscriptions.get(subName).getTransactionPendingAckStats(lowWaterMarks);
}

public PositionImpl getMaxReadPosition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public interface TransactionBuffer {
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
*/
TransactionBufferStats getStats();
TransactionBufferStats getStats(boolean lowWaterMarks);

/**
* Wait TransactionBuffer Recovers completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,16 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition = this.maxReadPosition.toString();
if (lowWaterMarks) {
transactionBufferStats.lowWaterMarks = this.lowWaterMarks;
}
transactionBufferStats.ongoingTxnSize = ongoingTxns.size();

return transactionBufferStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
*
* @return the stats of this pending ack handle.
*/
TransactionPendingAckStats getStats();
TransactionPendingAckStats getStats(boolean lowWaterMarks);

/**
* Close the pending ack handle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
}

@Override
public TransactionPendingAckStats getStats() {
public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,17 @@ public CompletableFuture<PendingAckHandle> pendingAckHandleFuture() {
}

@Override
public TransactionPendingAckStats getStats() {
public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
TransactionPendingAckStats transactionPendingAckStats = new TransactionPendingAckStats();
transactionPendingAckStats.state = this.getState().name();
if (lowWaterMarks) {
transactionPendingAckStats.lowWaterMarks = this.lowWaterMarks;
}
if (individualAckOfTransaction != null) {
transactionPendingAckStats.ongoingTxnSize = individualAckOfTransaction.size();
} else {
transactionPendingAckStats.ongoingTxnSize = 0;
}
return transactionPendingAckStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testGetTransactionInBufferStats() throws Exception {
}

@Test(timeOut = 20000)
public void testGetTransactionPendingAckStats() throws Exception {
public void testGetTransactionInPendingAckStats() throws Exception {
initTransaction(2);
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
Expand Down Expand Up @@ -341,6 +341,9 @@ public void testGetTransactionBufferStats() throws Exception {
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId() + 1).toString());
assertTrue(transactionBufferStats.lastSnapshotTimestamps > currentTime);
assertNull(transactionBufferStats.lowWaterMarks);
transactionBufferStats = admin.transactions().getTransactionBufferStats(topic, true);
assertNotNull(transactionBufferStats.lowWaterMarks);
}

@DataProvider(name = "ackType")
Expand Down Expand Up @@ -387,18 +390,61 @@ public void testGetPendingAckStats(String ackType) throws Exception {

TransactionImpl transaction = (TransactionImpl) getTransaction();
if (ackType.equals("individual")) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction);
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
} else {
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction);
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction).get();
}
transaction.commit().get();

transactionPendingAckStats = admin.transactions().
getPendingAckStatsAsync(topic, subName).get();

assertNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(transactionPendingAckStats.state, "Ready");
}

@Test
public void testTransactionGetStats() throws Exception {
initTransaction(1);
final String topic = "persistent://public/default/testGetPendingAckStats";
final String subName = "test1";

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();

Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
Transaction transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction1).get();
for (int i = 0; i < 5; i++) {
producer.newMessage(transaction1).send();
}
transaction1.commit().get();
message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction2).get();
producer.newMessage(transaction2).send();

TransactionBufferStats transactionBufferStats =
admin.transactions().getTransactionBufferStats(topic, true);
assertEquals(transactionBufferStats.ongoingTxnSize, 1);
assertNotNull(transactionBufferStats.lowWaterMarks);

TransactionPendingAckStats transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName, true);

assertEquals(transactionPendingAckStats.ongoingTxnSize, 1);
assertNotNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(admin.transactions().getCoordinatorStatsById(0).ongoingTxnSize, 1);
}

@Test(timeOut = 20000)
public void testGetSlowTransactions() throws Exception {
initTransaction(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{

TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer2.getStats().state, "Ready"));
assertEquals(buffer2.getStats(false).state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");

doAnswer(invocation -> {
Expand All @@ -580,7 +580,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
managedCursors.add(managedCursor);
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats().state, "Ready"));
assertEquals(buffer3.getStats(false).state, "Ready"));
persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
assertTrue(internalStats.cursors.isEmpty());
});
Expand Down Expand Up @@ -631,7 +631,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle1.getStats().state, "Ready"));
assertEquals(pendingAckHandle1.getStats(false).state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
Expand All @@ -641,7 +641,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle2.getStats().state, "Ready"));
assertEquals(pendingAckHandle2.getStats(false).state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
Expand All @@ -652,7 +652,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats().state, "Ready"));
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ private void checkTopicTransactionBufferState(boolean clientEnableTransaction,
Awaitility.await().until(() -> {
if (clientEnableTransaction) {
// recover success, client enable transaction will change to Ready State
return topicTransactionBuffer.getStats().state.equals(Ready.name());
return topicTransactionBuffer.getStats(false).state.equals(Ready.name());
} else {
// recover success, client disable transaction will change to NoSnapshot State
return topicTransactionBuffer.getStats().state.equals(NoSnapshot.name());
return topicTransactionBuffer.getStats(false).state.equals(NoSnapshot.name());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,72 @@ TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the future stats of transaction buffer in topic.
*/
CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic);
CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks);

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the future stats of transaction buffer in topic.
*/
default CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic) {
return getTransactionBufferStatsAsync(topic, false);
}

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction buffer.
* @return the stats of transaction buffer in topic.
*/
TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException;
TransactionBufferStats getTransactionBufferStats(String topic, boolean lowWaterMarks) throws PulsarAdminException;

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the stats of transaction buffer in topic.
*/
default TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException {
return getTransactionBufferStats(topic, false);
}

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName,
boolean lowWaterMarks);

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @return the stats of transaction pending ack.
*/
default CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName) {
return getPendingAckStatsAsync(topic, subName, false);
}

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName);
TransactionPendingAckStats getPendingAckStats(String topic, String subName, boolean lowWaterMarks)
throws PulsarAdminException;

/**
* Get transaction pending ack stats.
Expand All @@ -149,7 +195,9 @@ TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String
* @param subName the subscription name of this transaction pending ack stats
* @return the stats of transaction pending ack.
*/
TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException;
default TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException {
return getPendingAckStats(topic, subName, false);
}

/**
* Get slow transactions by coordinator id.
Expand Down
Loading

0 comments on commit c505659

Please sign in to comment.