Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][txn] extend admin tools for transaction component. #15675

Merged
merged 6 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,16 @@ protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBu
.thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
}

protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative,
boolean lowWaterMarks) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionBufferStats());
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks));
}

protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
boolean authoritative, String subName) {
boolean authoritative, String subName, boolean lowWaterMarks) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionPendingAckStats(subName));
.thenApply(topic -> topic.getTransactionPendingAckStats(subName, lowWaterMarks));
}

protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("lowWaterMarks") @DefaultValue("false")
boolean lowWaterMarks) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative)
internalGetTransactionBufferStats(authoritative, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down Expand Up @@ -192,11 +194,12 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName) {
@PathParam("subName") String subName,
@QueryParam("lowWaterMarks") @DefaultValue("false") boolean lowWaterMarks) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(authoritative, subName)
internalGetPendingAckStats(authoritative, subName, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,8 @@ public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
}

public TransactionPendingAckStats getTransactionPendingAckStats() {
return this.pendingAckHandle.getStats();
public TransactionPendingAckStats getTransactionPendingAckStats(boolean lowWaterMarks) {
return this.pendingAckHandle.getStats(lowWaterMarks);
}

public boolean checkAndUnblockIfStuck() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3110,12 +3110,12 @@ public boolean checkSubscriptionTypesEnable(SubType subType) {
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}

public TransactionBufferStats getTransactionBufferStats() {
return this.transactionBuffer.getStats();
public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks) {
return this.transactionBuffer.getStats(lowWaterMarks);
}

public TransactionPendingAckStats getTransactionPendingAckStats(String subName) {
return this.subscriptions.get(subName).getTransactionPendingAckStats();
public TransactionPendingAckStats getTransactionPendingAckStats(String subName, boolean lowWaterMarks) {
return this.subscriptions.get(subName).getTransactionPendingAckStats(lowWaterMarks);
}

public PositionImpl getMaxReadPosition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public interface TransactionBuffer {
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
*/
TransactionBufferStats getStats();
TransactionBufferStats getStats(boolean lowWaterMarks);

/**
* Wait TransactionBuffer Recovers completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,16 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
TransactionBufferStats transactionBufferStats = new TransactionBufferStats();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition = this.maxReadPosition.toString();
if (lowWaterMarks) {
transactionBufferStats.lowWaterMarks = this.lowWaterMarks;
}
transactionBufferStats.ongoingTxnSize = ongoingTxns.size();

return transactionBufferStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
}

@Override
public TransactionBufferStats getStats() {
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
*
* @return the stats of this pending ack handle.
*/
TransactionPendingAckStats getStats();
TransactionPendingAckStats getStats(boolean lowWaterMarks);

/**
* Close the pending ack handle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
}

@Override
public TransactionPendingAckStats getStats() {
public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,17 @@ public CompletableFuture<PendingAckHandle> pendingAckHandleFuture() {
}

@Override
public TransactionPendingAckStats getStats() {
public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
TransactionPendingAckStats transactionPendingAckStats = new TransactionPendingAckStats();
transactionPendingAckStats.state = this.getState().name();
if (lowWaterMarks) {
transactionPendingAckStats.lowWaterMarks = this.lowWaterMarks;
}
if (individualAckOfTransaction != null) {
transactionPendingAckStats.ongoingTxnSize = individualAckOfTransaction.size();
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
} else {
transactionPendingAckStats.ongoingTxnSize = 0;
}
return transactionPendingAckStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testGetTransactionInBufferStats() throws Exception {
}

@Test(timeOut = 20000)
public void testGetTransactionPendingAckStats() throws Exception {
public void testGetTransactionInPendingAckStats() throws Exception {
initTransaction(2);
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
Expand Down Expand Up @@ -341,6 +341,9 @@ public void testGetTransactionBufferStats() throws Exception {
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId() + 1).toString());
assertTrue(transactionBufferStats.lastSnapshotTimestamps > currentTime);
assertNull(transactionBufferStats.lowWaterMarks);
transactionBufferStats = admin.transactions().getTransactionBufferStats(topic, true);
assertNotNull(transactionBufferStats.lowWaterMarks);
}

@DataProvider(name = "ackType")
Expand Down Expand Up @@ -387,18 +390,61 @@ public void testGetPendingAckStats(String ackType) throws Exception {

TransactionImpl transaction = (TransactionImpl) getTransaction();
if (ackType.equals("individual")) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction);
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
} else {
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction);
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction).get();
}
transaction.commit().get();

transactionPendingAckStats = admin.transactions().
getPendingAckStatsAsync(topic, subName).get();

assertNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(transactionPendingAckStats.state, "Ready");
}

@Test
public void testTransactionGetStats() throws Exception {
initTransaction(1);
final String topic = "persistent://public/default/testGetPendingAckStats";
final String subName = "test1";

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();

Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
Transaction transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction1).get();
for (int i = 0; i < 5; i++) {
producer.newMessage(transaction1).send();
}
transaction1.commit().get();
message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction2).get();
producer.newMessage(transaction2).send();

TransactionBufferStats transactionBufferStats =
admin.transactions().getTransactionBufferStats(topic, true);
assertEquals(transactionBufferStats.ongoingTxnSize, 1);
assertNotNull(transactionBufferStats.lowWaterMarks);

TransactionPendingAckStats transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName, true);

assertEquals(transactionPendingAckStats.ongoingTxnSize, 1);
assertNotNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(admin.transactions().getCoordinatorStatsById(0).ongoingTxnSize, 1);
}

@Test(timeOut = 20000)
public void testGetSlowTransactions() throws Exception {
initTransaction(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{

TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer2.getStats().state, "Ready"));
assertEquals(buffer2.getStats(false).state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");

doAnswer(invocation -> {
Expand All @@ -577,7 +577,7 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
managedCursors.add(managedCursor);
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats().state, "Ready"));
assertEquals(buffer3.getStats(false).state, "Ready"));
persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
assertTrue(internalStats.cursors.isEmpty());
});
Expand Down Expand Up @@ -628,7 +628,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle1.getStats().state, "Ready"));
assertEquals(pendingAckHandle1.getStats(false).state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
Expand All @@ -638,7 +638,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle2.getStats().state, "Ready"));
assertEquals(pendingAckHandle2.getStats(false).state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
Expand All @@ -649,7 +649,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
PendingAckHandleImpl pendingAckHandle3 = new PendingAckHandleImpl(persistentSubscription);

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats().state, "Ready"));
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ private void checkTopicTransactionBufferState(boolean clientEnableTransaction,
Awaitility.await().until(() -> {
if (clientEnableTransaction) {
// recover success, client enable transaction will change to Ready State
return topicTransactionBuffer.getStats().state.equals(Ready.name());
return topicTransactionBuffer.getStats(false).state.equals(Ready.name());
} else {
// recover success, client disable transaction will change to NoSnapshot State
return topicTransactionBuffer.getStats().state.equals(NoSnapshot.name());
return topicTransactionBuffer.getStats(false).state.equals(NoSnapshot.name());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,72 @@ TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the future stats of transaction buffer in topic.
*/
CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic);
CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the future stats of transaction buffer in topic.
*/
default CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic) {
return getTransactionBufferStatsAsync(topic, false);
}

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction buffer.
* @return the stats of transaction buffer in topic.
*/
TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException;
TransactionBufferStats getTransactionBufferStats(String topic, boolean lowWaterMarks) throws PulsarAdminException;

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the stats of transaction buffer in topic.
*/
default TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException {
return getTransactionBufferStats(topic, false);
}

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName,
boolean lowWaterMarks);

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @return the stats of transaction pending ack.
*/
default CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName) {
return getPendingAckStatsAsync(topic, subName, false);
}

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @param lowWaterMarks Whether to get information about lowWaterMarks stored in transaction pending ack.
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName);
TransactionPendingAckStats getPendingAckStats(String topic, String subName, boolean lowWaterMarks)
throws PulsarAdminException;

/**
* Get transaction pending ack stats.
Expand All @@ -149,7 +195,9 @@ TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String
* @param subName the subscription name of this transaction pending ack stats
* @return the stats of transaction pending ack.
*/
TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException;
default TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException {
return getPendingAckStats(topic, subName, false);
}

/**
* Get slow transactions by coordinator id.
Expand Down
Loading