Skip to content

Commit

Permalink
MINOR: Push down logic from TransactionManager to TxnPartitionEntry (a…
Browse files Browse the repository at this point in the history
…pache#14591)

And encapsulate TxnPartitionEntry state.

This makes it easier to understand the behavior and the paths through
which the state is updated.

Reviewers: Justine Olshan <[email protected]>
  • Loading branch information
ijuma authored Oct 28, 2023
1 parent abde0e0 commit fa36a7f
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,11 +473,11 @@ public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequ
recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}

public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
this.baseSequence(), this.topicPartition, baseSequence);
reopened = true;
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ private boolean shouldBackoff(boolean hasLeaderChanged, final ProducerBatch batc
}

private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
ProducerIdAndEpoch producerIdAndEpoch = null;
ProducerIdAndEpoch producerIdAndEpoch;
if (transactionManager != null) {
if (!transactionManager.isSendToPartitionAllowed(tp))
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
Expand Down Expand Up @@ -84,15 +83,13 @@
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.function.Supplier;

/**
* A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
*/
public class TransactionManager {
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;

private final Logger log;
private final String transactionalId;
Expand Down Expand Up @@ -271,7 +268,7 @@ public TransactionManager(final LogContext logContext,
this.partitionsWithUnresolvedSequences = new HashMap<>();
this.partitionsToRewriteSequences = new HashSet<>();
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap();
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
}

Expand Down Expand Up @@ -521,7 +518,7 @@ private void resetIdempotentProducerId() {
}

private void resetSequenceForPartition(TopicPartition topicPartition) {
txnPartitionMap.topicPartitions.remove(topicPartition);
txnPartitionMap.remove(topicPartition);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}

Expand Down Expand Up @@ -572,28 +569,25 @@ synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
/**
* Returns the next sequence number to be written to the given TopicPartition.
*/
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).nextSequence;
synchronized int sequenceNumber(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).nextSequence();
}

/**
* Returns the current producer id/epoch of the given TopicPartition.
*/
synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch;
return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch();
}

synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
Integer currentSequence = sequenceNumber(topicPartition);

currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment);
txnPartitionMap.get(topicPartition).nextSequence = currentSequence;
txnPartitionMap.get(topicPartition).incrementSequence(increment);
}

synchronized void addInFlightBatch(ProducerBatch batch) {
if (!batch.hasSequence())
throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch);
txnPartitionMap.get(batch.topicPartition).addInflightBatch(batch);
}

/**
Expand All @@ -606,33 +600,21 @@ synchronized void addInFlightBatch(ProducerBatch batch) {
synchronized int firstInFlightSequence(TopicPartition topicPartition) {
if (!hasInflightBatches(topicPartition))
return RecordBatch.NO_SEQUENCE;

SortedSet<ProducerBatch> inflightBatches = txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
if (inflightBatches.isEmpty())
return RecordBatch.NO_SEQUENCE;
else
return inflightBatches.first().baseSequence();
ProducerBatch batch = nextBatchBySequence(topicPartition);
return batch == null ? RecordBatch.NO_SEQUENCE : batch.baseSequence();
}

synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
SortedSet<ProducerBatch> queue = txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
return queue.isEmpty() ? null : queue.first();
return txnPartitionMap.nextBatchBySequence(topicPartition);
}

synchronized void removeInFlightBatch(ProducerBatch batch) {
if (hasInflightBatches(batch.topicPartition)) {
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch);
}
if (hasInflightBatches(batch.topicPartition))
txnPartitionMap.removeInFlightBatch(batch);
}

private int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
int lastAckedSequence = lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER);
if (sequence > lastAckedSequence) {
txnPartitionMap.get(topicPartition).lastAckedSequence = sequence;
return sequence;
}

return lastAckedSequence;
return txnPartitionMap.maybeUpdateLastAckedSequence(topicPartition, sequence);
}

synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
Expand All @@ -647,18 +629,7 @@ private void updateLastAckedOffset(ProduceResponse.PartitionResponse response, P
if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
return;
long lastOffset = response.baseOffset + batch.recordCount - 1;
OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition);
// It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid
// response for this. This can happen only if the producer is only idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we have to insert one.
if (!lastAckedOffset.isPresent() && !isTransactional()) {
txnPartitionMap.getOrCreate(batch.topicPartition);
}
if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
txnPartitionMap.get(batch.topicPartition).lastAckedOffset = lastOffset;
} else {
log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset);
}
txnPartitionMap.updateLastAckedOffset(batch.topicPartition, isTransactional(), lastOffset);
}

public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
Expand Down Expand Up @@ -724,50 +695,18 @@ synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException except
if (!isTransactional()) {
requestEpochBumpForPartition(batch.topicPartition);
} else {
adjustSequencesDueToFailedBatch(batch);
txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
}
}
}
}

// If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
// so that they don't fail with the OutOfOrderSequenceException.
//
// This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
// ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!txnPartitionMap.contains(batch.topicPartition))
// Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
// reset due to a previous OutOfOrderSequenceException.
return;
log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}",
batch.producerId(), batch.topicPartition, batch.recordCount);
int currentSequence = sequenceNumber(batch.topicPartition);
currentSequence -= batch.recordCount;
if (currentSequence < 0)
throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative: " + currentSequence);

setNextSequence(batch.topicPartition, currentSequence);

txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> {
if (inFlightBatch.baseSequence() < batch.baseSequence())
return;

int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
if (newSequence < 0)
throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence()
+ " for partition " + batch.topicPartition + " is going to become negative: " + newSequence);

inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional());
});
}

synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
return !txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty();
return txnPartitionMap.getOrCreate(topicPartition).hasInflightBatches();
}

synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition topicPartition) {
return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch);
return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch());
}

synchronized boolean hasUnresolvedSequences() {
Expand Down Expand Up @@ -817,7 +756,7 @@ synchronized void maybeResolveSequences() {
// For the idempotent producer, bump the epoch
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
"Going to bump epoch and reset sequence numbers.", topicPartition,
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
requestEpochBumpForPartition(topicPartition);
}

Expand All @@ -828,11 +767,7 @@ synchronized void maybeResolveSequences() {
}

private boolean isNextSequence(TopicPartition topicPartition, int sequence) {
return sequence - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
}

private void setNextSequence(TopicPartition topicPartition, int sequence) {
txnPartitionMap.get(topicPartition).nextSequence = sequence;
return sequence - lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
}

private boolean isNextSequenceForUnresolvedPartition(TopicPartition topicPartition, int sequence) {
Expand Down Expand Up @@ -983,7 +918,7 @@ synchronized boolean canRetry(ProduceResponse.PartitionResponse response, Produc
// come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
// reset the sequence numbers to the beginning.
return true;
} else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
} else if (lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
// The head of the log has been removed, probably due to the retention time elapsing. In this case,
// we expect to lose the producer state. For the transactional producer, reset the sequences of all
// inflight batches to be from the beginning and retry them, so that the transaction does not need to
Expand Down
Loading

0 comments on commit fa36a7f

Please sign in to comment.